0
点赞
收藏
分享

微信扫一扫

Spark中的RDD算子是什么

年迈的代码机器 2022-02-13 阅读 60

RDD的算子
1-算子是什么?就是函数、方法、API、行为
2-算子分几类?-transformation和action
3-transformation的特点:转换成新RDD,延迟加载
    -transformation有哪些算子?- 见表格比如map filter等
    -transformation继续分类
       eg:glom-每个分区的元素
        1-RDD的元素是单value
            map、groupBy、filter、flatMap、distinct
            北京订单案例:本地+standalone集群
        2-双值类型-算子的入参也是RDD
            union、intersection
        3-RDD的元素是key_value
            groupByKey、reduceByKey、sortByKey、
4-action的特点:立即执行,输出动作,是计算链条的最后一个环节。
    -action有哪些算子?-见表格
     eg:collecet、 reduce、 first、take、 
                takeSample、takeOrdered、top、count

算子的不同叫法【函数】、【方法】、【API】

Transformation

  • 返回一个新的【RDD】,所有的transformation函数(算子)都是【lazy延迟加载】的,不会立即执行,比如wordcount中的【flatMap】,【map】,【reduceByKey】

转换含义
map(func)返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func)返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func)类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func)类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed)根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset)对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset)对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks]))对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks])在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks])与sortByKey类似,但是更灵活
join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
cartesian(otherDataset)笛卡尔积
pipe(command, [envVars])对rdd进行管道操作
coalesce(numPartitions)减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作
repartition(numPartitions)重新给 RDD 分区

常见算子的用法:

transformation算子

  • 值类型valueType

    • map

    • groupBy

    • filter

    • flatMap

    • distinct

import os
from pyspark import SparkConf, SparkContext
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
conf = SparkConf().setAppName("2_rdd_from_external").setMaster("local[*]")
sc = SparkContext(conf=conf)
#练习,介绍glom函数,可以得到每个分区有哪些具体元素
rdd1=sc.parallelize([5,6,4,7,3,8,2,9,1,10])
rdd2=rdd1.glom()
print(rdd2.collect())
>>结果: [[5, 6], [4, 7], [3, 8], [2, 9, 1, 10]]
#默认并行度是4,因为local[*]机器有4个core
rdd1.getNumPartitions()
>>结果:  4
#可以指定分区数3
rdd1=sc.parallelize([1,2,3,4,5,6,7,8,9],3)
rdd1.getNumPartitions()
>>结果:  3
    
#1、map算子,方式1
rdd2=rdd1.map(lambda x:x+1)
print(rdd2.collect())
>>结果:[2, 3, 4, 5, 6, 7, 8, 9, 10]
    
#1、map算子,方式2
def add(x):
    return x+1
rdd2=rdd1.map(add)
print(rdd2.collect())
>>结果: [2, 3, 4, 5, 6, 7, 8, 9, 10]

#2、groupBy算子,
rdd1=sc.parallelize([1,2,3,4])
rdd2=rdd1.groupBy(lambda x: 'even' if x%2==0 else 'odd')
print(rdd2.collect())
>>结果: [('even', <pyspark.resultiterable.ResultIterable object at 0x7f9e1c0e33d0>), ('odd', <pyspark.resultiterable.ResultIterable object at 0x7f9e0e2ae0d0>)]
    
rdd3=rdd2.mapValues(lambda x:list(x))
print(rdd3.collect())
>>结果: [('even', [2, 4]), ('odd', [1, 3])]
  
#3、filter算子
rdd1=sc.parallelize([1,2,3,4,5,6,7,8,9])
rdd2=rdd1.filter(lambda x:True if x>4 else False)
print(rdd2.collect())
>>结果: [5, 6, 7, 8, 9]
    
#4、flatMap算子
rdd1=sc.parallelize(["a b c","d e f","h i j"])
rdd2=rdd1.flatMap(lambda line:line.split(" "))
print(rdd2.collect())
>>结果: ['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']
    
#4、distinct算子
rdd1 = sc.parallelize([1,2,3,3,3,5,5,6])
rdd1.distinct().collect()
>>结果:  [1, 5, 2, 6, 3]

双值类型DoubleValueType

  • union

  • intersection

#union算子
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
print(rdd1.collect())
>>结果:[('a', 1), ('b', 2)]
    
rdd2 = sc.parallelize([("c",1),("b",3)])
print(rdd2.collect())
>>结果:[('c', 1), ('b', 3)]
    
rdd3=rdd1.union(rdd2)
print(rdd3.collect())
>>结果:[('a', 1), ('b', 2), ('c', 1), ('b', 3)]
    
#intersection算子
rdd2 = sc.parallelize([("a",1),("b",3)])
rdd3=rdd1.intersection(rdd2)
rdd3.collect()
>>结果: [('a', 1)]

 

Key-Value值类型

  • groupByKey

  • reduceByKey

  • sortByKey

#groupByKey算子1
rdd = sc.parallelize([("a",1),("b",2),("c",3),("d",4)])
rdd.groupByKey().collect()
>>结果: 
[('b', <pyspark.resultiterable.ResultIterable at 0x7f9e1c0e33a0>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x7f9e0e27d430>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x7f9e0e27df10>),
 ('d', <pyspark.resultiterable.ResultIterable at 0x7f9e0e27d340>)]
result=rdd.groupByKey().collect()
result[1]
>>结果:  ('c', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2ae3d0>)
result[1][1]
>>结果:  <pyspark.resultiterable.ResultIterable at 0x7f9e0e2ae3d0>
list(result[1][1])
>>结果:  [3]
    
#groupByKey算子2,额外补充案例
rdd = sc.parallelize([("M",'zs'),("F",'ls'),("M",'ww'),("F",'zl')])
rdd2=rdd.groupByKey()
rdd2.collect()
>>结果:  
[('M', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2bc550>),
 ('F', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2bcfa0>)]
ite=rdd2.collect()
for x in ite : print('性别是',x[0],'人们是:',list(x[1]))
>>结果: 性别是 M 人们是: ['zs', 'ww']
	    性别是 F 人们是: ['ls', 'zl']
      
#groupByKey算子3
sc.parallelize([('hadoop', 1), ('hadoop', 5), ('spark', 3), ('spark', 6)])
>>结果:  ParallelCollectionRDD[60] at readRDDFromFile at PythonRDD.scala:274
rdd1=sc.parallelize([('hadoop', 1), ('hadoop', 5), ('spark', 3), ('spark', 6)])
rdd2=rdd1.groupByKey()
rdd2.collect()
>>结果:  
[('hadoop', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2bc490>),
 ('spark', <pyspark.resultiterable.ResultIterable at 0x7f9e0e2bc670>)]
rdd2.mapValues(lambda value:sum(list(value)))
>>结果:  PythonRDD[67] at RDD at PythonRDD.scala:53
rdd3=rdd2.mapValues(lambda value:sum(list(value)))
rdd3.collect()
>>结果:  [('hadoop', 6), ('spark', 9)]
    
#reduceByKey算子
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(lambda x,y:x+y).collect()
>>结果:  [('b', 1), ('a', 2)]
    
#sortByKey算子
sc.parallelize([('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)])
>>结果:  ParallelCollectionRDD[75] at readRDDFromFile at PythonRDD.scala:274
rdd1=sc.parallelize([('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)])
rdd2=rdd1.sortByKey()
rdd2.collect()
>>结果:  [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
print(rdd1.sortByKey(False))
PythonRDD[90] at RDD at PythonRDD.scala:53
print(rdd1.sortByKey(False).collect())
[('d', 4), ('b', 2), ('a', 1), ('2', 5), ('1', 3)]
print(rdd1.sortByKey(True,2).glom().collect())
[[('1', 3), ('2', 5), ('a', 1)], [('b', 2), ('d', 4)]]
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5), ('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]

rdd1=sc.parallelize(tmp2)
rdd2=rdd1.sortByKey(True,1,keyfunc=lambda k:k.upper())
rdd2.collect()
>>结果:  
[('a', 3),
 ('fleece', 7),
 ('had', 2),
 ('lamb', 5),
 ('little', 4),
 ('Mary', 1),
 ('was', 8),
 ('white', 9),
 ('whose', 6)]

Action

  • 返回的【不是】RDD,可以将结果保存输出,所有的Action算子【立即】执行,比如wordcount中的【saveAsTextFile】。

动作含义
reduce(func)通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect()在驱动程序中,以数组的形式返回数据集的所有元素
count()返回RDD的元素个数
first()返回RDD的第一个元素(类似于take(1))
take(n)返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed])返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func)在数据集的每一个元素上,运行函数func进行更新。
foreachPartition(func)在数据集的每一个分区上,运行函数func

 

#countByValue算子
x = sc.parallelize([1, 3, 1, 2, 3])
y = x.countByValue()
print(type(y))
<class 'collections.defaultdict'>
print(y)
>>结果:  defaultdict(<class 'int'>, {1: 2, 3: 2, 2: 1})
    
#collect算子
rdd = sc.parallelize([1,3,5,2,6,7,11,9,10],3)
rdd.map(lambda x: x + 1).collect()
>>结果:  [2, 4, 6, 3, 7, 8, 12, 10, 11]
x=rdd.map(lambda x: x + 1).collect()
print(type(x))
>>结果:  <class 'list'>
    
#reduce算子
rdd1 = sc.parallelize([1,2,3,4,5])
rdd1.collect()
>>结果:   [1, 2, 3, 4, 5]
x=rdd1.reduce(lambda x,y:x+y)
print(x)
15

#fold算子
rdd1 = sc.parallelize([1,2,3,4,5], 3)
rdd.glom().collect()
>>结果:   [[1, 3, 5], [2, 6, 7], [11, 9, 10]]
rdd1 = sc.parallelize([1,2,3,4,5], 3)
rdd1.glom().collect()
>>结果:   [[1], [2, 3], [4, 5]]
rdd1.fold(10,lambda x,y:x+y)
>>结果:   55
    
#first算子
sc.parallelize([2, 3, 4]).first()
>>结果:   2
    
#take算子
sc.parallelize([2, 3, 4, 5, 6]).take(2)
>>结果:   [2, 3]
sc.parallelize([2, 3, 4, 5, 6]).take(10)
>>结果:   [2, 3, 4, 5, 6]
sc.parallelize([5,3,1,1,6]).take(2) 
>>结果:   [5, 3]
sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
>>结果:   [91, 92, 93]
    
#top算子
x = sc.parallelize([1, 3, 1, 2, 3])
x.top(3)
>>结果:   [3, 3, 2]
    
#count算子
sc.parallelize([2, 3, 4]).count()
>>结果:   3
    
#foreach算子
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark and python"] )
words.foreach(lambda x:print(x))
>>结果:  
pyspark
pyspark and spark and python
akka
spark vs hadoop
hadoop
spark
scala
java

#saveAsTextFile算子
data = sc.parallelize([1,2,3], 2)
data.glom().collect()
>>结果:   [[1], [2, 3]]
data.saveAsTextFile("hdfs://node1:8020/output/file1")
  • collecet

  • reduce

  • first

  • take

  • takeSample

  • takeOrdered

  • top

  • count 上面Executor都会将执行的结果统一发送回Driver

唯独foreach和saveAsTextFile是不会统一发送回Driver的.

举报

相关推荐

0 条评论