0
点赞
收藏
分享

微信扫一扫

微信小程序中的两种页面跳转方式

目录

有界数据和无界数据

有界数据

 无界数据

 结构化流

基本介绍

入门案例

结构化流的编程模型

数据结构

数据源(Source)

File Source

Kafka Source(Spark 和 Kafka 整合)

整合Kafka准备工作

从kafka中读取数据

流式处理

批处理

 数据写入Kafka中

流式处理

批处理


有界数据和无界数据

有界数据

 无界数据

 结构化流

基本介绍

  结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL ....

Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次

真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集)

入门案例

代码测试操作步骤

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder \
        .config("spark.sql.shuffle.partitions", 1) \
        .appName('structured_streaming_wordcount') \
        .master('local[*]') \
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream \
        .format("socket") \
        .option("host", "192.168.88.161") \
        .option("port", "55555") \
        .load()

    # 3- 数据处理
    result_df = init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').agg(
        F.count('word').alias('cnt')
    )

    #在结构化流中不能调用show()方法
    # init_df.show()

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()

结构化流的编程模型

数据结构

    在结构化流中,我们可以将DataFrame称为误解的DataFrame或者无界的二维表

数据源(Source)

    结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作,目前提供了如下数据源:

File Source

        将目录中写入的文件作为数据流读取,文件的文件格式为:text,csv.json,orc,parquet...

相关参数:

option参数参数说明
maxFilesPerTrigger每次触发时要考虑的最大新文件数(默认 no max)
latestFirst是否先处理最新的新文件,当有大量文件积压时有用
fileNameOnly

是否检查新文件只有文件名而不是完整路径(默认值:False)将此设置为true时,以下文件将被视为同一个文件,文件名"datadset.txt"相同

“file:///dataset.txt” “s3://a/dataset.txt "                              "s3n://a/b/dataset.txt"                                                    "s3a://a/b/c/dataset.txt"

读取代码通用格式

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
    # 结构化流数据来源
    print('结构化流数据来源_file_source')
    # 创建SparkSession顶级对象
    spark = SparkSession \
        .builder \
        .config('spark.sql.shuffle.partitions', 1) \
        .appName("structiured_streaming_file_source") \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # 复杂API
    init_df = spark \
        .readStream \
        .format('csv') \
        .option('path', 'file:///export/data/pyspark_projects/04_Structured_Streaming/data') \
        .option('sep', ',') \
        .option('encoding', 'utf8') \
        .schema("id int,name string") 


    init_df = spark.readStream.csv(
        path='file:///export/data/pyspark_projects/04_Structured_Streaming/data/child',
        schema="id int,name string",
        sep=',',
        encoding='utf8'
    )
    # 数据处理
    # 数据输出
    # 启动结构化流
    init_df.writeStream.format('console').outputMode('append').start().awaitTermination()

Kafka Source(Spark 和 Kafka 整合)

         Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次(仅且只会处理一次)的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表

整合Kafka准备工作
从kafka中读取数据

spark和kafka集成官网文档:

https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html

流式处理

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
    # 结构化流数据来源
    print('结构化流数据来源_file_source')
    # 创建SparkSession顶级对象
    spark = SparkSession \
        .builder \
        .config('spark.sql.shuffle.partitions', 1) \
        .appName("structiured_streaming_file_source") \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    #订阅一个topic,并且指定header信息
    init_df = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribe', 'search-log-topic') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()

    # 订阅符合规则的topic,并且指定header信息
    init_df = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribePattern', 'topic.*') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()


    #订阅多个topic,从最新的消息开始消费
    init_df = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribe', 'test01,test02') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()
    init_df.writeStream.format('console').outputMode('append').start().awaitTermination()

批处理

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
    # 结构化流数据来源
    print('结构化流数据来源_file_source')
    # 创建SparkSession顶级对象
    spark = SparkSession \
        .builder \
        .config('spark.sql.shuffle.partitions', 1) \
        .appName("structiured_streaming_file_source") \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # 订阅一个topic,并且指定header信息
    init_df = spark \
        .read \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribe', 'search-log-topic') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.show()
    print('=' * 50)

    # 订阅符合规则的topic,并且指定header信息
    init_df = spark \
        .read \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribePattern', 'topic.*') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.show()

    print('=' * 50)
    # 订阅多个topic,从最新的消息开始消费
    init_df = spark \
        .read \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribe', 'test01,test02') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.show()

    # 释放资源
    spark.stop()

 数据写入Kafka中

流式处理
# 数据写入Kafka中
# 写出到指定Topic
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的python解释器


os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    # 创建sparksession对象
    spark = SparkSession \
        .builder \
        .appName('spark_read_kafka_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # 默认从最新的地方消费
    init_df = spark.readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1.itcast.cn:9092,node2.itcast.cn:9092') \
        .option('topic', 'test01') \
        .option('subscribe', 'test01') \
        .load()
    # 数据处理
    result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))

    # 启动流式任务
    result_df.writeStream.format('kafka') \
        .option('kafka.bootstrap.servers', 'node1.itcast.cn:9092,node2.itcast.cn:9092') \
        .option('topic', 'test01') \
        .option("checkpointLocation", "hdfs://node1:8020/day10/chk") \
        .start() \
        .awaitTermination()

从数据内容中解析得到Topic,然后写入Kafka

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_multi_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribePattern","test.*")\
        .load()

    # 3- 数据处理
    # 错误写法:缺少topic字段
    # result_df = init_df.select(F.expr("topic as new_topic"),F.expr("concat(cast(value as string),'_',topic) as value"))
    result_df = init_df.select("topic",F.expr("concat(cast(value as string),'_',topic) as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format("console").outputMode("append").start()

    result_df.writeStream.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("checkpointLocation", "hdfs://node1:8020/day10/chk")\
        .start()\
        .awaitTermination()
批处理
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.read\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test02")\
        .load()

    # 3- 数据处理
    result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.write.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("topic","test02")\
        .option("checkpointLocation", "hdfs://node1:8020/day10/chk")\
        .save()

 

举报

相关推荐

0 条评论