0
点赞
收藏
分享

微信扫一扫

Spark中的常见算子

雅典娜的棒槌 2024-11-09 阅读 14

1、触发算子

1)count

2) foreach算子

 3)saveAsTextFile算子

 4)first 算子

 5)take 算子

 6)collect 算子 --收集,类似于吹哨

7) reduce算子 --规约,聚集 

# top N
    list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
    rdd = sc.parallelize(list01)
    # top 是一个触发算子,不返回rdd类型
    # 为什么 有时 用foreach打印,有时用print 打印
    # 对于转换算子的结果,还是rdd,对于rdd 使用foreach  1) rdd 循环打印  2) foreach 是触发算子
    # 对于触发算子的结果,一般不返回rdd,而是一个正常的返回值,使用print 打印即可
    print(rdd.top(3))
    # takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
    print(rdd.takeOrdered(3))

 8)top算子:求排好序之后的最大的几个值

 9)takeOrdered : 求排好序之后的最小的几个值

10)collectAsMap 算子

 11)foreachPartition 算子

 

12)max 算子

13)min 算子

14)mean 算子

15)sum 算子

 2、转换算子

1)map算子

举例说明:

# 需求:计算每个元素的立方
# 原始数据 1 2 3 4 5 6
# 目标结果 1 8 27 64 125 216

list01 = [1,2,3,4,5,6]
	listRdd = sc.parallelize(list01)
	mapRdd = listRdd.map(lambda x: math.pow(x,3))
	mapRdd.foreach(lambda x: print(x))

2) flatMap算子

 3)filter算子

 4)union算子

 5) distinct算子

 6)分组聚合算子:groupByKey、 reduceByKey

分类:xxxByKey算子,只有KV类型的RDD才能调用

7)排序算子:sortBy、sortByKey 

sortBy算子:

 sortByKey算子: 

8) 重分区算子:repartition、coalesce

repartition算子:

coalesce算子:

9)keys算子 : 获取所有的key 

10)values算子 : 获取所有rdd中的value 

 11)mapValues算子:

将所有的value拿到之后进行map转换,转换后还是元组,只是元组中的value,进行了变化

12)join方面的算子 :

join / fullOuterJoin / leftOuterJoin / rightOuterJoin

 举例说明:

import os
from pyspark import SparkContext, SparkConf

"""
------------------------------------------
  Description : 
  SourceFile : Join
  Author  : dcc
  Date  : 2024/10/31
-------------------------------------------
"""

if __name__ == '__main__':
    # 设置 任务的环境变量
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取sc 对象
    conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
    sc = SparkContext(conf=conf)
    rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],
                                    numSlices=2)
    rdd_singer_music = sc.parallelize(
        [("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),
         ("动力火车", "当")], numSlices=2)
    # join 是 转换算子  join 可以理解为内连接
    joinRdd = rdd_singer_age.join(rdd_singer_music)
    joinRdd.foreach(print)

    print("*"*100)
    leftRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music)
    leftRdd.foreach(print)
    print("*"*100)
    rightRdd = rdd_singer_age.rightOuterJoin(rdd_singer_music)
    rightRdd.foreach(print)
    print("*"*100)
    fullRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music)
    fullRdd.foreach(print)
	# join 关联的是两个kv类型的rdd
	# union 关联的是单个元素的rdd
    # 关闭sc
    sc.stop()


 13)mapPartitions算子

 3、哪些算子能触发shuffle过程:

  1)分组聚合算子:groupByKey、 reduceByKey

  2)排序算子:sortBy、sortByKey 

  3)重分区算子:repartition、coalesce(根据情况)

  4)join方面的算子 :join / fullOuterJoin / leftOuterJoin / rightOuterJoin

举报

相关推荐

0 条评论