四、Action函数
不同于Transformation操作,Action操作代表一次计算的结束,不再产生新的RDD,将结果返回到Driver程序或者输出到外部。所以Transformation操作只是建立管理,而Action操作才是实际的执行者。每个Action操作都会调用SparkContext的runJob方法向集群正式提交请求,所以每个Action操作对应一个Job
-
五、Transformer算子
一、值类型ValueType
map:map(func)
:将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源码中的map算子相当于初始化一个RDD,新RDD叫做MappedRDD
[root@node1 /]# /export/server/spark/bin/pyspark \ > --master spark://node1:7077 \ > --executor-memory 1g \ > --total-executor-cores 2
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3) >>> rdd1.map(lambda x:x+1).collect() [2, 3, 4, 5, 6, 7, 8, 9, 10]
- groupBy
>>> x = sc.parallelize([1,2,3]) >>> y = x.groupBy(lambda x:'A' if (x%2 == 1) else 'B') >>> print(y.mapValues(list).collect()) [('A', [1, 3]), ('B', [2])]
Filter:filter(func)
:选出所有func返回值为true的元素,生成一个新的RDD返回
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3) >>> rdd2 = rdd1.map(lambda x:x*2) >>> rdd3 = rdd2.filter(lambda x:x>4) >>> rdd3.collect() [6, 8, 10, 12, 14, 16, 18]
flatMap()
:flatMap会先执行map的操作,再将所有对象合并为一个对象
>>> rdd1 = sc.parallelize(["a b c","d e f","h i j"]) >>> rdd2 = rdd1.flatMap(lambda x:x.split(" ")) >>> rdd2.collect() ['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']
二、双值类型DoubleValueType
Union
:对两个RDD求并集
>>> rdd1 = sc.parallelize([("a",1),("b",2)]) >>> rdd2 = sc.parallelize([("c",1),("b",3)]) >>> rdd3 = rdd1.union(rdd2) >>> rdd3.collect() [('a', 1), ('b', 2), ('c', 1), ('b', 3)]
intersection
:对两个RDD求交集
>>> rdd1 = sc.parallelize([("a",1),("b",2)]) >>> rdd2 = sc.parallelize([("c",1),("b",3)]) >>> rdd3 = rdd1.union(rdd2) >>> rdd4 = rdd3.intersection(rdd2) >>> rdd4.collect() [('b', 3), ('c', 1)]
groupByKey
:以元组中的第0个元素作为Key,进行分组,返回一个新的RDD
>>> rdd1 = sc.parallelize([("a",1),("b",2)]) >>> rdd2 = sc.parallelize([("c",1),("b",3)]) >>> rdd3 = rdd1.union(rdd2) >>> rdd4 = rdd3.groupByKey() >>> rdd4.collect() [('b', <pyspark.resultiterable.ResultIterable object at 0x7f4b070145b0>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7f4b07029190>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7f4b070291f0>)] >>> result = rdd4.collect() >>> result[0] ('b', <pyspark.resultiterable.ResultIterable object at 0x7f4b070292b0>) >>> result[0][1] <pyspark.resultiterable.ResultIterable object at 0x7f4b070292b0> >>> list(result[0][1]) [3, 2]
reduceByKey
:将key相同的键值对,按照Function进行计算
>>> rdd = sc.parallelize([("a",1),("b",1),("a",1)]) >>> rdd.reduceByKey(lambda x,y:x+y).collect() [('b', 1), ('a', 2)]
sortByKey
:根据key进行排序
>>> tmp = [('a',1),('b',2),('1',3),('d',4),('2',5)] >>> sc.parallelize(tmp).sortByKey().first() ('1', 3) >>> sc.parallelize(tmp).sortByKey(True,1).collect() [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] >>> sc.parallelize(tmp).sortByKey(True,2).collect() [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] >>> tmp2 = [('Mary',1),('had',2),('a',3),('little',4),('lamb',5)] >>> tmp2.extend([('whose',6),('fleece',7),('was',8),('white',9)]) >>> sc.parallelize(tmp2).sortByKey(True,3,keyfunc=lambda k:k.lower()).collect() [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
countByValue
>>> x = sc.parallelize([1,3,1,2,3]) >>> y = x.countByValue() >>> x.collect() [1, 3, 1, 2, 3] >>> y defaultdict(<class 'int'>, {1: 2, 3: 2, 2: 1})
六、Action算子
collect
:返回一个list,list中包含RDD中的所有元素,只有当数据量较小的时候使用Collect,因为所有的结果都会加载到内存中
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3) >>> rdd2 = rdd1.map(lambda x:x+1) >>> rdd2 PythonRDD[100] at RDD at PythonRDD.scala:53 >>> rdd2.collect() [2, 3, 4, 5, 6, 7, 8, 9, 10]
reduce
:将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止
>>> rdd1 = sc.parallelize([1,2,3,4,5]) >>> rdd1.reduce(lambda x,y:x+y) 15
first
:返回RDD的第一个元素
>>> sc.parallelize([2,3,4]).first() 2
take
:返回RDD的前N个元素
>>> sc.parallelize([2,3,4,5,6]).take(2) [2, 3] >>> sc.parallelize([2,3,4,5,6]).take(10) [2, 3, 4, 5, 6] >>> sc.parallelize(range(10),100).filter(lambda x:x>90).take(3) [] >>> sc.parallelize(range(100),100).filter(lambda x:x>90).take(3) [91, 92, 93] >>> sc.parallelize(range(90,100),100).filter(lambda x:x>90).take(3) [91, 92, 93] >>> sc.parallelize(range(90,100)).filter(lambda x:x>90).take(3) [91, 92, 93]
top
:排序取前几个,从大到小
>>> x = sc.parallelize([1,3,1,2,3]) >>> y = x.top(num=3) >>> x.collect() [1, 3, 1, 2, 3] >>> y [3, 3, 2]
count
:返回RDD中元素的个数
>>> sc.parallelize([2,3,4]).count() 3
takeSample
:抽样API
>>> rdd = sc.parallelize(range(0,10)) >>> rdd.takeSample(True,20,1) [0, 6, 3, 4, 3, 1, 3, 7, 3, 5, 3, 0, 0, 9, 6, 5, 7, 9, 4, 7] >>> rdd.takeSample(True,5,1) [8, 8, 0, 3, 6] >>> rdd.takeSample(True,5,1) [8, 8, 0, 3, 6] >>> rdd.takeSample(False,5,1) [6, 8, 9, 7, 5] >>> rdd.takeSample(False,5,2) [5, 9, 3, 4, 6] >>> rdd.takeSample(False,5,2) [5, 9, 3, 4, 6]
foreach
:仅返回满足foreach内函数条件元素。
def f(x): print(x) sc.parallelize([1,2,3,4,5]).foreach(f)