0
点赞
收藏
分享

微信扫一扫

Spark算子

自由的美人鱼 2022-01-11 阅读 103

Transformation算子

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

列举部分算子:

转换含义
map(func)返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func)返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func)类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func)类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]
sample(withReplacement, fraction, seed)根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset)对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset)对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks]))对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks])在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值zeroValue:中立值,定义返回value的类型,并参与运算seqOp:用来在同一个partition中合并值combOp:用来在不同partiton中合并值
sortByKey([ascending], [numTasks])在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks])与sortByKey类似,但是更灵活
join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset)笛卡尔积
pipe(command, [envVars])将一些shell命令用于Spark中生成新的RDD
coalesce(numPartitions**)**重新分区
repartition(numPartitions)重新分区
repartitionAndSortWithinPartitions(partitioner)重新分区和排序

Action算子

在RDD上运行计算,并返回结果给Driver或写入文件系统

动作含义
reduce(func)通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect()在驱动程序中,以数组的形式返回数据集的所有元素
count()返回RDD的元素个数
first()返回RDD的第一个元素(类似于take(1))
take(n)返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed])返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])takeOrdered和top类似,只不过以和top相反的顺序返回元素
saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func)在数据集的每一个元素上,运行函数func进行更新。

RDD算子比较

map flatMap mapPartitions mapPartitionsWithIndex

Spark中,最基本的原则,就是每个task处理一个RDD的partition。

MapPartitions操作的优点:

如果是普通的map,比如一个partition中有1万条数据;ok,那么你的function要执行和计算1万次。

但是,使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有
的partition数据。只要执行一次就可以了,性能比较高。

MapPartitions的缺点:可能会OOM。

如果是普通的map操作,一次function的执行就处理一条数据;那么如果内存不够用的情况下,
比如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从
内存里面垃圾回收掉,或者用其他方法,腾出空间来吧。

mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。

所以说普通的map操作通常不会导致内存的OOM异常。

在项目中,自己先去估算一下RDD的数据量,以及每个partition的量,还有自己分配给每个executor
的内存资源。看看一下子内存容纳所有的partition数据,行不行。如果行,可以试一下,能跑通就好。
性能肯定是有提升的。

//map和partition的区别:
scala> val rdd2 = rdd1.mapPartitions(_.map(_*10))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] ...

scala> rdd2.collect
res1: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70)

scala> rdd1.map(_ * 10).collect
res3: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70)

介绍mapPartition和map的区别,引出下面的内容:

mapPartitionsWithIndex
val func = (index: Int, iter: Iterator[(Int)]) => {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect 

4.2.2 reduceByKey和groupByKey,aggregateByKey

reduceByKey(func, numPartitions=None)

Merge the values for each key using an associative reduce function. This will also perform the merginglocally on each mapperbefore sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified.

也就是,reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。

groupByKey(numPartitions=None)

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.** Note**: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.

也就是,groupByKey也是对每个key进行操作,但只生成一个sequence。需要特别注意“Note”中的话,它告诉我们:如果需要对sequence进行aggregation操作(注意,groupByKey本身不能自定义操作函数),那么,选择reduceByKey/aggregateByKey更好。这是因为groupByKey不能自定义函数,我们需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。

val words = Array("one", "two", "two", "three", "three", "three")  
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))  
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)  
val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))

reduceByKey 先在分区上进行合并,然后shuffle,最终得到一个结果。

4.2.3 coalesce, repartition

这两个算子都是用来调整分区个数的,其中repartition等价于 coalesce(numPartitions, shuffle = true).

def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]
def repartition ( numPartitions : Int ): RDD [T]

使用

scala> val rdd1 = sc.parallelize(1 to 10,10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27    
scala> rdd1.partitions.length
res0: Int = 10

scala> val rdd2 = rdd1.coalesce(3,false)
rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at <console>:29

scala> rdd2.partitions.length
res1: Int = 3


repartition(重新分配分区), coalesce((合并)重新分配分区并设置是否shuffle),
	partitionBy(根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区)
val rdd1 = sc.parallelize(1 to 10, 10)
rdd1.repartition(5) --分区调整为5个
rdd1.partitions.length =5
coalesce:调整分区数量,参数一:要合并成几个分区,参数二:是否shuffle,false不会shuffle
val rdd2 = rdd1.coalesce(2, false)
val rdd1 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)), 3)
var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2.partitions.length
举报

相关推荐

0 条评论