0
点赞
收藏
分享

微信扫一扫

windows安装运行Apache James(基于spring的版本)

彪悍的鼹鼠 2024-01-11 阅读 9

一.Pandas简介

1、基本介绍

  • Pandas是Python的一个第三方包,也是商业和工程领域最流行的结构化数据工具集,用于数据清洗、处理以及分析

  • Pandas和Spark SQL中很多功能都类似,甚至使用方法都是相同的

  • Pandas适用场景

    • Pandas用于处理单机数据

    • 可以在数据ETL、查询分析、报表输出等环节使用

2.数据结构

Python中的Pandas的DataFrame数据结构:

DataFrame:表示一个二维表对象,就是表示整个表

字段,列,索引;Series表示一行或者一列 

 

二.Spark SQL函数定义

1.窗口函数

2.SQL函数分类

SQL函数,主要分为以下三大类:

①UDF函数:用户自定义函数

特点:一对一,输入一个得到一个

例如:split() substr()

②UDAF函数:用户自定义聚合函数

特点:多对一,输入多个得到一个

例如:sum()

③UDTF函数:用户自定义表数据生成函数

特点:一对多,输入一个得到多个

例如:explode()

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

思考:有这么多的内置函数,为啥还需要自定义函数呢?

 

在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。

在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。

Spark SQL原生存在的问题:大量的序列化和反序列

3.Spark原生自定义UDF函数

自定义函数流程:

4.Pandas的UDF函数

4.1 Apache Arrow框架基本介绍

Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

4.2 基于Arrow完成Pandas DataFrame和Spark DataFrame互转

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

 

4.3 基于Pandas完成UDF函数

基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

三.Spark on Hive

1.集成原理

2.在代码中集成Hive

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    """
        spark.sql.warehouse.dir:告诉Spark数据存储在什么地方。默认使用本地磁盘进行存储。推荐使用HDFS
        hive.metastore.uris:告诉Spark元数据信息去什么地方找MetaStore
        enableHiveSupport():开启SparkSQL和Hive的集成
    """
    spark = SparkSession.builder\
        .config("spark.sql.warehouse.dir","hdfs://node1:8020/user/hive/warehouse")\
        .config("hive.metastore.uris","thrift://node1.itcast.cn:9083")\
        .appName('sparksql_hive')\
        .master('local[*]')\
        .enableHiveSupport()\
        .getOrCreate()

    # 2- 数据输入
    # 3- 数据处理
    # 4- 数据输出
    spark.sql("show databases").show()

    spark.sql("""
        select 
            *
        from day07.student
        where id>=2
    """).show()

    # 5- 释放资源
    spark.stop()

 

举报

相关推荐

0 条评论