目录
Kafka Source(Spark 和 Kafka 整合)
有界数据和无界数据
有界数据
无界数据
结构化流
基本介绍
结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL ....
Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次
真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集)
入门案例
代码测试操作步骤
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 1) \
.appName('structured_streaming_wordcount') \
.master('local[*]') \
.getOrCreate()
# 2- 数据输入
init_df = spark.readStream \
.format("socket") \
.option("host", "192.168.88.161") \
.option("port", "55555") \
.load()
# 3- 数据处理
result_df = init_df.select(
F.explode(F.split('value', ' ')).alias('word')
).groupBy('word').agg(
F.count('word').alias('cnt')
)
#在结构化流中不能调用show()方法
# init_df.show()
# 4- 数据输出
# 5- 启动流式任务
result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
结构化流的编程模型
数据结构
在结构化流中,我们可以将DataFrame称为误解的DataFrame或者无界的二维表
数据源(Source)
结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作,目前提供了如下数据源:
File Source
将目录中写入的文件作为数据流读取,文件的文件格式为:text,csv.json,orc,parquet...
相关参数:
option参数 | 参数说明 |
maxFilesPerTrigger | 每次触发时要考虑的最大新文件数(默认 no max) |
latestFirst | 是否先处理最新的新文件,当有大量文件积压时有用 |
fileNameOnly | 是否检查新文件只有文件名而不是完整路径(默认值:False)将此设置为true时,以下文件将被视为同一个文件,文件名"datadset.txt"相同 “file:///dataset.txt” “s3://a/dataset.txt " "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" |
读取代码通用格式
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 结构化流数据来源
print('结构化流数据来源_file_source')
# 创建SparkSession顶级对象
spark = SparkSession \
.builder \
.config('spark.sql.shuffle.partitions', 1) \
.appName("structiured_streaming_file_source") \
.master('local[*]') \
.getOrCreate()
# 数据输入
# 复杂API
init_df = spark \
.readStream \
.format('csv') \
.option('path', 'file:///export/data/pyspark_projects/04_Structured_Streaming/data') \
.option('sep', ',') \
.option('encoding', 'utf8') \
.schema("id int,name string")
init_df = spark.readStream.csv(
path='file:///export/data/pyspark_projects/04_Structured_Streaming/data/child',
schema="id int,name string",
sep=',',
encoding='utf8'
)
# 数据处理
# 数据输出
# 启动结构化流
init_df.writeStream.format('console').outputMode('append').start().awaitTermination()
Kafka Source(Spark 和 Kafka 整合)
Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次(仅且只会处理一次)的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表
整合Kafka准备工作
从kafka中读取数据
spark和kafka集成官网文档:
https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html
流式处理
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 结构化流数据来源
print('结构化流数据来源_file_source')
# 创建SparkSession顶级对象
spark = SparkSession \
.builder \
.config('spark.sql.shuffle.partitions', 1) \
.appName("structiured_streaming_file_source") \
.master('local[*]') \
.getOrCreate()
# 数据输入
#订阅一个topic,并且指定header信息
init_df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribe', 'search-log-topic') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()
# 订阅符合规则的topic,并且指定header信息
init_df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribePattern', 'topic.*') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()
#订阅多个topic,从最新的消息开始消费
init_df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribe', 'test01,test02') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()
init_df.writeStream.format('console').outputMode('append').start().awaitTermination()
批处理
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 结构化流数据来源
print('结构化流数据来源_file_source')
# 创建SparkSession顶级对象
spark = SparkSession \
.builder \
.config('spark.sql.shuffle.partitions', 1) \
.appName("structiured_streaming_file_source") \
.master('local[*]') \
.getOrCreate()
# 数据输入
# 订阅一个topic,并且指定header信息
init_df = spark \
.read \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribe', 'search-log-topic') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.show()
print('=' * 50)
# 订阅符合规则的topic,并且指定header信息
init_df = spark \
.read \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribePattern', 'topic.*') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.show()
print('=' * 50)
# 订阅多个topic,从最新的消息开始消费
init_df = spark \
.read \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribe', 'test01,test02') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.show()
# 释放资源
spark.stop()
数据写入Kafka中
流式处理
# 数据写入Kafka中
# 写出到指定Topic
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
# 创建sparksession对象
spark = SparkSession \
.builder \
.appName('spark_read_kafka_demo') \
.master('local[*]') \
.getOrCreate()
# 数据输入
# 默认从最新的地方消费
init_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1.itcast.cn:9092,node2.itcast.cn:9092') \
.option('topic', 'test01') \
.option('subscribe', 'test01') \
.load()
# 数据处理
result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))
# 启动流式任务
result_df.writeStream.format('kafka') \
.option('kafka.bootstrap.servers', 'node1.itcast.cn:9092,node2.itcast.cn:9092') \
.option('topic', 'test01') \
.option("checkpointLocation", "hdfs://node1:8020/day10/chk") \
.start() \
.awaitTermination()
从数据内容中解析得到Topic,然后写入Kafka
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_multi_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribePattern","test.*")\
.load()
# 3- 数据处理
# 错误写法:缺少topic字段
# result_df = init_df.select(F.expr("topic as new_topic"),F.expr("concat(cast(value as string),'_',topic) as value"))
result_df = init_df.select("topic",F.expr("concat(cast(value as string),'_',topic) as value"))
# 4- 数据输出
# 5- 启动流式任务
result_df.writeStream.format("console").outputMode("append").start()
result_df.writeStream.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("checkpointLocation", "hdfs://node1:8020/day10/chk")\
.start()\
.awaitTermination()
批处理
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_1_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.read\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribe","test02")\
.load()
# 3- 数据处理
result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))
# 4- 数据输出
# 5- 启动流式任务
result_df.write.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("topic","test02")\
.option("checkpointLocation", "hdfs://node1:8020/day10/chk")\
.save()