1、触发算子
1)count
2) foreach算子
3)saveAsTextFile算子
4)first 算子
5)take 算子
6)collect 算子 --收集,类似于吹哨
7) reduce算子 --规约,聚集
# top N
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
rdd = sc.parallelize(list01)
# top 是一个触发算子,不返回rdd类型
# 为什么 有时 用foreach打印,有时用print 打印
# 对于转换算子的结果,还是rdd,对于rdd 使用foreach 1) rdd 循环打印 2) foreach 是触发算子
# 对于触发算子的结果,一般不返回rdd,而是一个正常的返回值,使用print 打印即可
print(rdd.top(3))
# takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
print(rdd.takeOrdered(3))
8)top算子:求排好序之后的最大的几个值
9)takeOrdered : 求排好序之后的最小的几个值
10)collectAsMap 算子
11)foreachPartition 算子
12)max 算子
13)min 算子
14)mean 算子
15)sum 算子
2、转换算子
1)map算子
举例说明:
# 需求:计算每个元素的立方
# 原始数据 1 2 3 4 5 6
# 目标结果 1 8 27 64 125 216
list01 = [1,2,3,4,5,6]
listRdd = sc.parallelize(list01)
mapRdd = listRdd.map(lambda x: math.pow(x,3))
mapRdd.foreach(lambda x: print(x))
2) flatMap算子
3)filter算子
4)union算子
5) distinct算子
6)分组聚合算子:groupByKey、 reduceByKey
分类:xxxByKey算子,只有KV类型的RDD才能调用
7)排序算子:sortBy、sortByKey
sortBy算子:
sortByKey算子:
8) 重分区算子:repartition、coalesce
repartition算子:
coalesce算子:
9)keys算子 : 获取所有的key
10)values算子 : 获取所有rdd中的value
11)mapValues算子:
将所有的value拿到之后进行map转换,转换后还是元组,只是元组中的value,进行了变化
12)join方面的算子 :
join / fullOuterJoin / leftOuterJoin / rightOuterJoin
举例说明:
import os
from pyspark import SparkContext, SparkConf
"""
------------------------------------------
Description :
SourceFile : Join
Author : dcc
Date : 2024/10/31
-------------------------------------------
"""
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
sc = SparkContext(conf=conf)
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],
numSlices=2)
rdd_singer_music = sc.parallelize(
[("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),
("动力火车", "当")], numSlices=2)
# join 是 转换算子 join 可以理解为内连接
joinRdd = rdd_singer_age.join(rdd_singer_music)
joinRdd.foreach(print)
print("*"*100)
leftRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music)
leftRdd.foreach(print)
print("*"*100)
rightRdd = rdd_singer_age.rightOuterJoin(rdd_singer_music)
rightRdd.foreach(print)
print("*"*100)
fullRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music)
fullRdd.foreach(print)
# join 关联的是两个kv类型的rdd
# union 关联的是单个元素的rdd
# 关闭sc
sc.stop()
13)mapPartitions算子:
3、哪些算子能触发shuffle过程:
1)分组聚合算子:groupByKey、 reduceByKey
2)排序算子:sortBy、sortByKey
3)重分区算子:repartition、coalesce(根据情况)
4)join方面的算子 :join / fullOuterJoin / leftOuterJoin / rightOuterJoin