RDD的算子
1-算子是什么?就是函数、方法、API、行为
2-算子分几类?-transformation和action
3-transformation的特点:转换成新RDD,延迟加载
-transformation有哪些算子?- 见表格比如map filter等
-transformation继续分类
eg:glom-每个分区的元素
1-RDD的元素是单value
map、groupBy、filter、flatMap、distinct
北京订单案例:本地+standalone集群
2-双值类型-算子的入参也是RDD
union、intersection
3-RDD的元素是key_value
groupByKey、reduceByKey、sortByKey、
4-action的特点:立即执行,输出动作,是计算链条的最后一个环节。
-action有哪些算子?-见表格
eg:collecet、 reduce、 first、take、
takeSample、takeOrdered、top、count
算子的不同叫法【函数】、【方法】、【API】
Transformation
-
返回一个新的【RDD】,所有的transformation函数(算子)都是【lazy延迟加载】的,不会立即执行,比如wordcount中的【flatMap】,【map】,【reduceByKey】
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | 对rdd进行管道操作 |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 |
repartition(numPartitions) | 重新给 RDD 分区 |
常见算子的用法:
transformation算子
-
值类型valueType
-
map
-
groupBy
-
filter
-
flatMap
-
distinct
-
import os
from pyspark import SparkConf, SparkContext
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
conf = SparkConf().setAppName("2_rdd_from_external").setMaster("local[*]")
sc = SparkContext(conf=conf)
#练习,介绍glom函数,可以得到每个分区有哪些具体元素
rdd1=sc.parallelize([5,6,4,7,3,8,2,9,1,10])
rdd2=rdd1.glom()
print(rdd2.collect())
>>结果: [[5, 6], [4, 7], [3, 8], [2, 9, 1, 10]]
#默认并行度是4,因为local[*]机器有4个core
rdd1.getNumPartitions()
>>结果: 4
#可以指定分区数3
rdd1=sc.parallelize([1,2,3,4,5,6,7,8,9],3)
rdd1.getNumPartitions()
>>结果: 3
#1、map算子,方式1
rdd2=rdd1.map(lambda x:x+1)
print(rdd2.collect())
>>结果:[2, 3, 4, 5, 6, 7, 8, 9, 10]
#1、map算子,方式2
def add(x):
return x+1
rdd2=rdd1.map(add)
print(rdd2.collect())
>>结果: [2, 3, 4, 5, 6, 7, 8, 9, 10]
#2、groupBy算子,
rdd1=sc.parallelize([1,2,3,4])
rdd2=rdd1.groupBy(lambda x: 'even' if x%2==0 else 'odd')
print(rdd2.collect())
>>结果: [('even', <pyspark.resultiterable.ResultIterable object at 0x7f9e1c0e33d0>), ('odd', <pyspark.resultiterable.ResultIterable object at 0x7f9e0e2ae0d0>)]
rdd3=rdd2.mapValues(lambda x:list(x))
print(rdd3.collect())
>>结果: [('even', [2, 4]), ('odd', [1, 3])]
#3、filter算子
rdd1=sc.parallelize([1,2,3,4,5,6,7,8,9])
rdd2=rdd1.filter(lambda x:True if x>4 else False)
print(rdd2.collect())
>>结果: [5, 6, 7, 8, 9]
#4、flatMap算子
rdd1=sc.parallelize(["a b c","d e f","h i j"])
rdd2=rdd1.flatMap(lambda line:line.split(" "))
print(rdd2.collect())
>>结果: ['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']
#4、distinct算子
rdd1 = sc.parallelize([1,2,3,3,3,5,5,6])
rdd1.distinct().collect()
>>结果: [1, 5, 2, 6, 3]
双值类型DoubleValueType
-
union
-
intersection
#union算子
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
print(rdd1.collect())
>>结果:[('a', 1), ('b', 2)]
rdd2 = sc.parallelize([("c",1),("b",3)])
print(rdd2.collect())
>>结果:[('c', 1), ('b', 3)]
rdd3=rdd1.union(rdd2)
print(rdd3.collect())
>>结果:[('a', 1), ('b', 2), ('c', 1), ('b', 3)]
#intersection算子
rdd2 = sc.parallelize([("a",1),("b",3)])
rdd3=rdd1.intersection(rdd2)
rdd3.collect()
>>结果: [('a', 1)]
Key-Value值类型
-
groupByKey
-
reduceByKey
-
sortByKey
#groupByKey算子1
rdd = sc.parallelize([("a",1),("b",2),("c",3),("d",4)])
rdd.groupByKey().collect()
>>结果:
[('b', <pyspark.resultiterable.ResultIterable at 0x7f9e1c0e33a0>),
('c', <pyspark.resultiterable.ResultIterable at 0x7f9e0e27d430>),
('a', <pyspark.resultiterable.ResultIterable at 0x7f9e0e27df10>),
('d', <pyspark.resultiterable.ResultIterable at 0x7f9e0e27d340>)]
result=rdd.groupByKey().collect()
result[1]
>>结果: ('c', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2ae3d0>)
result[1][1]
>>结果: <pyspark.resultiterable.ResultIterable at 0x7f9e0e2ae3d0>
list(result[1][1])
>>结果: [3]
#groupByKey算子2,额外补充案例
rdd = sc.parallelize([("M",'zs'),("F",'ls'),("M",'ww'),("F",'zl')])
rdd2=rdd.groupByKey()
rdd2.collect()
>>结果:
[('M', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2bc550>),
('F', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2bcfa0>)]
ite=rdd2.collect()
for x in ite : print('性别是',x[0],'人们是:',list(x[1]))
>>结果: 性别是 M 人们是: ['zs', 'ww']
性别是 F 人们是: ['ls', 'zl']
#groupByKey算子3
sc.parallelize([('hadoop', 1), ('hadoop', 5), ('spark', 3), ('spark', 6)])
>>结果: ParallelCollectionRDD[60] at readRDDFromFile at PythonRDD.scala:274
rdd1=sc.parallelize([('hadoop', 1), ('hadoop', 5), ('spark', 3), ('spark', 6)])
rdd2=rdd1.groupByKey()
rdd2.collect()
>>结果:
[('hadoop', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2bc490>),
('spark', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2bc670>)]
rdd2.mapValues(lambda value:sum(list(value)))
>>结果: PythonRDD[67] at RDD at PythonRDD.scala:53
rdd3=rdd2.mapValues(lambda value:sum(list(value)))
rdd3.collect()
>>结果: [('hadoop', 6), ('spark', 9)]
#reduceByKey算子
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(lambda x,y:x+y).collect()
>>结果: [('b', 1), ('a', 2)]
#sortByKey算子
sc.parallelize([('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)])
>>结果: ParallelCollectionRDD[75] at readRDDFromFile at PythonRDD.scala:274
rdd1=sc.parallelize([('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)])
rdd2=rdd1.sortByKey()
rdd2.collect()
>>结果: [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
print(rdd1.sortByKey(False))
PythonRDD[90] at RDD at PythonRDD.scala:53
print(rdd1.sortByKey(False).collect())
[('d', 4), ('b', 2), ('a', 1), ('2', 5), ('1', 3)]
print(rdd1.sortByKey(True,2).glom().collect())
[[('1', 3), ('2', 5), ('a', 1)], [('b', 2), ('d', 4)]]
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5), ('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]
rdd1=sc.parallelize(tmp2)
rdd2=rdd1.sortByKey(True,1,keyfunc=lambda k:k.upper())
rdd2.collect()
>>结果:
[('a', 3),
('fleece', 7),
('had', 2),
('lamb', 5),
('little', 4),
('Mary', 1),
('was', 8),
('white', 9),
('whose', 6)]
Action
-
返回的【不是】RDD,可以将结果保存输出,所有的Action算子【立即】执行,比如wordcount中的【saveAsTextFile】。
动作 | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
#countByValue算子
x = sc.parallelize([1, 3, 1, 2, 3])
y = x.countByValue()
print(type(y))
<class 'collections.defaultdict'>
print(y)
>>结果: defaultdict(<class 'int'>, {1: 2, 3: 2, 2: 1})
#collect算子
rdd = sc.parallelize([1,3,5,2,6,7,11,9,10],3)
rdd.map(lambda x: x + 1).collect()
>>结果: [2, 4, 6, 3, 7, 8, 12, 10, 11]
x=rdd.map(lambda x: x + 1).collect()
print(type(x))
>>结果: <class 'list'>
#reduce算子
rdd1 = sc.parallelize([1,2,3,4,5])
rdd1.collect()
>>结果: [1, 2, 3, 4, 5]
x=rdd1.reduce(lambda x,y:x+y)
print(x)
15
#fold算子
rdd1 = sc.parallelize([1,2,3,4,5], 3)
rdd.glom().collect()
>>结果: [[1, 3, 5], [2, 6, 7], [11, 9, 10]]
rdd1 = sc.parallelize([1,2,3,4,5], 3)
rdd1.glom().collect()
>>结果: [[1], [2, 3], [4, 5]]
rdd1.fold(10,lambda x,y:x+y)
>>结果: 55
#first算子
sc.parallelize([2, 3, 4]).first()
>>结果: 2
#take算子
sc.parallelize([2, 3, 4, 5, 6]).take(2)
>>结果: [2, 3]
sc.parallelize([2, 3, 4, 5, 6]).take(10)
>>结果: [2, 3, 4, 5, 6]
sc.parallelize([5,3,1,1,6]).take(2)
>>结果: [5, 3]
sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
>>结果: [91, 92, 93]
#top算子
x = sc.parallelize([1, 3, 1, 2, 3])
x.top(3)
>>结果: [3, 3, 2]
#count算子
sc.parallelize([2, 3, 4]).count()
>>结果: 3
#foreach算子
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark and python"] )
words.foreach(lambda x:print(x))
>>结果:
pyspark
pyspark and spark and python
akka
spark vs hadoop
hadoop
spark
scala
java
#saveAsTextFile算子
data = sc.parallelize([1,2,3], 2)
data.glom().collect()
>>结果: [[1], [2, 3]]
data.saveAsTextFile("hdfs://node1:8020/output/file1")
-
collecet
-
reduce
-
first
-
take
-
takeSample
-
takeOrdered
-
top
-
count 上面Executor都会将执行的结果统一发送回Driver
唯独foreach和saveAsTextFile是不会统一发送回Driver的.