0
点赞
收藏
分享

微信扫一扫

python实战spark(二) RDD常见操作


文章目录

  • ​​RDD操作​​
  • ​​Transformations​​
  • ​​Actions​​
  • ​​Shuffle 操作​​
  • ​​RDD持久化​​
  • ​​level的选择​​
  • ​​删除分区​​
  • ​​共享变量​​
  • ​​广播变量​​
  • ​​累加器​​
  • ​​部署到集群​​
  • ​​Java / Scala启动Spark作业​​
  • ​​单元测试​​

RDD操作

接第一篇的更新。

Transformations

  1. ​map(func)​​ 传递每一个源的数据,返回一个新的分布式的数据集。
  2. ​filter(func)​​ 选择func返回true的源元素来返回一个新的数据集。
  3. ​flatMap(func)​​​ 与map类似,但是每个输入项可以映射到0或多个输出项(func返回一个Seq,而不是单个项),​​flatMap(func)​​可以看作将返回的迭代器“拍扁”。
  4. ​mapPartitions(func)​​​ 类似于map,在RDD的每个分区(块)上单独运行,所以在类型为T的RDD上运行时,func的类型必须是​​Iterator<T> => Iterator<U>​​。
  5. ​mapPartitionsWithIndex(func)​​​ 类似于​​mappartition​​​,但也为func提供了一个表示分区索引的整数值,所以func的类型必须是​​(Int, Iterator<T>) =>Iterator<U>​​,当运行在类型为T的RDD上。
  6. ​sample(withReplacement, fraction, seed)​​​使用给定的随机数生成器种子对数据的一部分进行采样,​​withReplacement​​表示是否进行替换。
  7. ​union(otherDataset)​​返回一个新数据集,该数据集包含源数据集中的元素和参数的数据集的并集,该操作不会对数据去重。
  8. ​intersection(otherDataset)​​返回一个新的RDD,其中包含源数据集中的元素和参数数据集的交集,该操作会移除重复的元素。
  9. ​distinct([numPartitions]))​​返回一个包含源数据集的不同元素的新数据集。
  10. ​groupByKey([numPartitions])​​​当调用一个(K, V)对的数据集时,返回一个(K,Iterable)对的数据集。注意:如果分组是为了对每个键执行聚合(例如求和或平均值),那么使用​​reduceByKey​​​或​​aggregateByKey​​​将产生更好的性能。注意:默认情况下,输出中的并行度取决于父RDD的分区数。您可以传递一个可选的​​numpartition​​参数来设置不同数量的任务。
  11. ​reduceByKey(func, [numPartitions])​​​调用时(K,V)的数据集对,返回一个数据集(K、V)对,其中每个键的value通过​​func​​聚合,task的数量通过一个可选的第二个参数是配置
  12. ​aggregateByKey(zeroValue)(seqOp, combOp,[numPartitions])​​​时,返回一个(K, U)对的数据集,其中每个键的值使用给定的combine函数和一个中立的“0”值进行聚合。允许与输入值类型不同的聚合值类型,以避免不必要的分配。与​​groupByKey​​​类似,​​reduce​​ task的数量可以通过第二个可选参数配置。
  13. ​sortByKey([ascending], [numPartitions])​​返回按键升序或降序的(K, V)对数据集。
  14. ​join(otherDataset, [numPartitions])​​​当调用类型(K, V)和(K, W)的数据集时,返回一个 (K, (V, W))对。通过​​leftOuterJoin, rightOuterJoin, and fullOuterJoin​​​支持​​Outer joins​
  15. ​cogroup(otherDataset, [numPartitions])​​​ 返回(K,(Iterable,Iterable))元组的数据集。这个操作也称为​​groupWith​
  16. ​cartesian(otherDataset)​​计算T和U两个数据集的笛卡尔积,返回一个(T, U)对(所有对元素)的数据集。
  17. ​pipe(command, [envVars])​​通过shell命令将RDD的每个分区导入管道,例如Perl或bash脚本。将RDD元素写入进程的stdin,并将输出到stdout的行作为字符串的RDD返回。
  18. ​coalesce(numPartitions)​​将RDD中的分区数量减少到numpartition。适用于过滤大型数据集后更有效地运行操作。
  19. ​repartition(numPartitions)​​​随机地重新​​shuffle​​RDD中的数据,以创建更多或更少的分区,并在它们之间进行平衡。这总是通过网络打乱所有的数据。
  20. ​repartitionAndSortWithinPartitions(partitioner)​​​根据给定的分区程序重新划分RDD,并在每个结果分区中根据键对记录进行排序。这比调用重划分然后在每个分区内排序更有效,因为它可以将排序向下推到​​shuffle​​机制中。

Actions

下表列出了Spark支持的一些常见​​action​​操作

  1. ​reduce(func)​​使用函数func(它接受两个参数并返回一个)聚合数据集的元素。这个函数应该是交换的和结合的,这样才能被正确地并行计算。
  2. ​collect()​​​ 在​​driver​​中以数组的形式返回数据集的所有元素。这通常应用在筛选器或其他操作返回足够小的数据子集之后。
  3. ​count()​​返回数据集元素的计数。
  4. ​first()​​返回数据集第一个元素。
  5. ​take(n)​​返回数组的前n个元素。
  6. ​takeSample(withReplacement,num,[seed])​​返回一个数组,其中包含数据集的随机num元素样本,替换or不替换,可以预先指定随机数生成器种子。
  7. ​takeOrdered(n, [ordering])​​使用RDD的自然顺序或自定义比较器返回RDD的前n个元素。
  8. ​saveAsTextFile(path)​​​将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS或任何其他hadoop支持的文件系统的目录中。Spark将对每个元素调用​​toString​​,将其转换为文件中的一行文本。
  9. ​saveAsSequenceFile(path)​​​(Java and Scala)将数据集的元素作为Hadoop ​​SequenceFile​​写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统的给定路径中。这在实现Hadoop的可写接口的键值对的RDDs上是可用的。在Scala中,它也可用于隐式转换为可写的类型(Spark包括对Int、Double、String等基本类型的转换)。
  10. ​saveAsObjectFile(path)​​​(Java and Scala) 使用Java序列化以简单的格式编写数据集的元素,然后可以使用​​SparkContext.objectFile()​​加载这些元素。
  11. ​countByKey()​​ 仅在类型(K, V)的RDDs上可用。返回(K, Int)对的hashmap和每个键的计数。
  12. ​foreach(func)​​​ 对数据集的每个元素运行函数func。这通常是为了解决诸如更新累加器或与外部存储系统交互等问题。
    注意:在foreach()之外修改除累加器之外的其他变量可能会导致未定义的行为。有关更多细节,请参见理解闭包。
    Spark RDD API还公开了一些​​​action​​​的异步版本,比如​​foreachAsync​​​ ,它立即向调用者返回一个​​FutureAction​​,而不是在操作完成时阻塞。这可以用于管理或等待操作的异步执行。

举两个常用的例子:
​​​aggregate()​​​,该函数可以把我们从返回值必须与新操作的RDD类型相同的限制中解放出来,也就是说​​aggregate()​​​可以代替​​map()​​​后面解​​fold()​​​的方式。
下面例子是计算平均值,使用​​​aggregate()​​第一个参数提供期待返回的类型的初始值,通过一个函数把RDD中的元素合并起来放入累加器,第二个函数将累加器两两合并。

sumCount = nums.aggregate((0,0),
(lambda acc,value: (acc[0]+value,acc[1]+1),
(lambda acc1,acc2: (acc1[0]+acc2[0],acc1[1]+acc2[1]))))
return sumCount[0]/sumCount[1]

​combineByKey()​​​ 第一步创建累加器的初始值(每个分区第一次出现某个key时发生)
第二步值的合并(合并当前值与该分区之前出现的值)
第三步分区的合并

sumCount = nums.combineByKey((lambda x: (x,1)),
(lambda x, y: (x[0]+y,x[1]+1)),
(lambda x,y: (x[0]+y[0],x[1]+y[1])))

Shuffle 操作

某些操作在触发事件称为shuffle。shuffle是Spark用于重新分布数据的机制,以便在分区之间以不同的方式进行分组。这通常涉及到跨执行器和机器复制数据,使shuffle成为一个复杂而昂贵的操作。
为了理解在shuffle期间会发生什么,我们可以考虑​​​reduceByKey​​​操作的例子。reduceByKey操作生成一个新的RDD,其中单个键的所有的value都被组合成一个tuple。挑战在于,单个键的所有值不一定都位于相同的分区或同一台机器上,但它们必须位于同一位置才能计算结果。
在Spark中,数据通常不会跨分区分布到特定操作所需的位置。在计算期间,单个任务将在单个分区上操作——因此,要组织单个reduceByKey reduce任务执行的所有数据,需要执行all-to-all操作。它必须从所有分区中读取所有键的值,然后将各个分区的值放在一起,以计算每个键的最终结果——称为shuffle。

尽管新shuffle的数据的每个分区中的元素集是确定的,分区本身的排序也是确定的,但是这些元素的排序是不确定的。如果一个人想要在洗牌之后得到可预测的有序数据,那么可以使用:
​​​mappartition​​​来对每个分区进行排序,例如.sorted
​​​repartitionAndSortWithinPartitions​​​可以有效地对分区进行排序,同时进行重新分区
​​​sortBy​​创建一个全局有序的RDD

可能导致混乱的操作包括​​repartition​​​和​​coalesce​​​操作、​​ByKey​​​操作(除计数外)(如​​groupByKey​​​和​​reduceByKey​​​)以及​​join​​​操作(如​​cogroup​​​和​​join​​)。

由于涉及到磁盘I/O、数据序列化和网络I/O,Shuffle是一项昂贵的操作。要为shuffle组织数据,Spark生成task集——map task用于组织数据,而reduce task集用于聚合数据。这个术语来自MapReduce,与Spark的map和reduce操作没有直接关系。

在内部,来自单个map任务的结果被保存在内存中,直到它们不能匹配为止。然后,根据目标分区对这些分区进行排序,并写入到单个文件。在reduce端,任务读取相关的已排序块。

某些shuffle操作会消耗大量堆内存,因为它们使用内存中的数据​​structure​​​来组织传输之前或之后的记录。具体来说,​​reduceByKey​​​和​​aggregateByKey​​​在map端创建这些​​structure​​​,而​​ByKey​​​操作在reduce端生成这些​​structure​​。当数据不适合内存时,Spark会将这些表溢出到磁盘,导致磁盘I/O的额外开销和增加的垃圾收集。

Shuffle还会在磁盘上生成大量的中间文件。从Spark 1.3开始,这些文件将一直保留到不再使用相应的RDDs并进行垃圾收集。这样做是为了在重新计算沿袭时不需要重新创建shuffle文件。如果应用程序保留对这些rds的引用,或者GC不经常启动,那么垃圾收集可能只会在很长一段时间之后才会发生。这意味着长时间运行的Spark作业可能会消耗大量磁盘空间。临时存储目录由​​spark.local.dir​​指定。配置Spark上下文时的配置参数。

可以通过调整各种配置参数来调整洗牌行为。请参阅Spark配置指南中的“Shuffle行为”一节。

RDD持久化

Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当您持久化一个RDD时,每个节点在内存中存储它计算的任何分区,并在数据集(或从中派生的数据集)的其他操作中复用。这使得将来的操作要快得多(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

可以通过​​persist()​​​或​​cache()​​方法将RDD标记为持久的。第一次在操作中计算它时,它将保存在节点的内存中。Spark的缓存是容错的——如果一个RDD的任何分区丢失了,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的RDD都可以使用不同的存储级别存储,例如,允许您将数据集持久化到磁盘上,将数据集持久化到内存中,但是作为序列化的Java对象(节省空间),可以跨节点复制数据集。通过传递一个​​StorageLevel​​​对象(Scala、Java、Python)来设置这些级别。​​cache()​​​方法是使用默认存储级别的简写,即​​StorageLevel.MEMORY_ONLY (store deserialized objects in memory).​​。存储级别的完整设置为:

  1. MEMORY_ONLY 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,那么一些分区将不会被缓存,而是在需要它们时动态地重新计算。这是默认级别。
  2. MEMORY_AND_DISK 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,那么将不适合的分区存储在磁盘上,并在需要时从那里读取它们。
  3. MEMORY_ONLY_SER 将RDD存储为序列化的Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用快速序列化器时,但读取时需要更多cpu。
  4. MEMORY_AND_DISK_SER 但是将不适合内存的分区溢出到磁盘,而不是每次需要时动态地重新计算它们。
  5. DISK_ONLY 存储RDD分区在磁盘
  6. MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与上述level相似,但复制每一个分区到两个集群节点
  7. OFF_HEAP (experimental) 与MEMORY_ONLY_SER相似,但存储数据到堆外内存,需要启用堆外内存。

注意:在Python中,存储的对象将始终使用Pickle库进行序列化,因此是否选择序列化级别并不重要。Python中可用的存储级别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY和DISK_ONLY_2。

甚至在没有用户调用persist的情况下,Spark也会自动持久化一些洗牌操作中的中间数据(例如reduceByKey)。这样做是为了避免在节点转移期间失败时重新计算整个输入。我们仍然建议用户在计划重用结果RDD时调用persist。

level的选择

Spark的存储级别意味着在内存使用和CPU效率之间提供不同的权衡。我们建议通过以下步骤来选择一个:

如果您的RDDs与默认存储级别(MEMORY_ONLY)匹配得很好,那么就让它们保持这种状态。这是cpu效率最高的选项,允许RDDs上的操作尽可能快地运行。

如果没有,尝试使用MEMORY_ONLY_SER并选择一个快速序列化库,使对象更节省空间,但访问速度仍然相当快。(Java和Scala)

除非计算数据集的函数非常昂贵,或者它们过滤了大量数据,否则不要泄漏数据到磁盘。否则,重新计算分区的速度可能与从磁盘读取分区的速度一样快。

如果需要快速的故障恢复,请使用复制的存储级别(例如,如果使用Spark为来自web应用程序的请求提供服务)。通过重新计算丢失的数据,所有存储级别都提供了完全的容错能力,复制的存储级别允许您在RDD上继续运行任务,而不必等待重新计算丢失的分区。

删除分区

Spark自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧的数据分区。如果您希望手动删除一个RDD,而不是等待它从缓存中消失,那么可以使用​​RDD.unpersist()​​方法。

共享变量

通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,而对远程机器上的变量的更新不会传播回驱动程序。在任务之间支持通用的读写共享变量是低效的。但是,Spark确实为两种常见的使用模式提供了两种有限的共享变量类型:广播变量和累加器。

广播变量

广播变量允许程序员在每台机器上缓存一个只读变量,而不是将其副本与任务一起发送。例如,可以使用它们以有效的方式为每个节点提供一个大型输入数据集的副本。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。

Spark操作通过一组stage执行,由分布式“shuffle”操作分隔。Spark自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以序列化的形式缓存,并在运行每个任务之前反序列化。这意味着,只有当跨多个stage的任务需要相同的数据,或者以反序列化的形式缓存数据很重要时,显式地创建广播变量才有用。

Broadcast变量是通过调用​​SparkContext.broadcast(v)​​从变量v创建的。broadcast变量是v的包装器,它的值可以通过调用value方法来访问。代码如下所示:

>>>broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>>broadcastVar.value
[1, 2, 3]

创建broadcast变量之后,应该使用广播变量而不是集群上运行的任何函数中的值v,这样v就不会被多次发送到节点。此外,对象v在广播后不应进行修改,以确保所有节点得到广播变量的相同值(例如,如果稍后将该变量发送到新节点)。

累加器

累加器是只能通过关联和交换操作“添加”的变量,因此可以有效地并行支持。它们可用于实现计数器(如在MapReduce中)或和。Spark本身支持数字类型的累加器,也可以添加对新类型的支持。

作为用户,您可以创建已命名或未命名的累加器。如下图所示,在web UI中将显示一个指定的累加器(在这个实例计数器中),修改该累加器的stage。Spark在“Tasks”表中显示由任务修改的每个累加器的值。
在UI中跟踪累加器对于理解运行阶段的进度是很有用的(不过目前不支持python)

通过调用​​SparkContext.accumulator(v)​​,从初始值v创建累加器。然后,可以使用add方法或+=操作符将运行在集群上的任务添加到集群中。然而,他们无法读取它的值。只有驱动程序可以读取累加器的值,使用它的值方法。

下面的代码显示了一个累加器,用于将数组中的元素相加:

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>>> accum.value
10

虽然这段代码使用了内置的对Int类型的累加器的支持,但是程序员也可以通过继承AccumulatorParam来创建他们自己的类型。AccumulatorParam接口有两个方法:zero用于为数据类型提供“零值”,addInPlace用于将两个值相加。例如,假设我们有一个向量类表示数学向量,我们可以这样写:
​​​class VectorAccumulatorParam (AccumulatorParam):​

class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)

def addInPlace(self, v1, v2):
v1 += v2
return v1
# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

对于仅在操作内部执行的累加器更新,Spark保证每个任务对累加器的更新只应用一次,即重新启动的任务不会更新值。在转换中,用户应该知道,如果重新执行任务或作业阶段,每个任务的更新可能会应用多次。

累加器不改变Spark的惰性评价模型。如果在RDD上的操作中更新它们,那么它们的值只更新一次,计算RDD作为操作的一部分。因此,不能保证在map()这样的延迟转换中执行累加器更新。下面的代码片段演示了这个属性:

accum = sc.accumulator(0)
def g(x):
accum.add(x)
return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.

部署到集群

应用程序提交指南描述了如何向集群提交应用程序。简而言之,一旦您将应用程序打包到JAR(用于Java/Scala)或一组.py或.zip文件(用于Python)中,​​bin/spark-submit​​脚本就允许您将其提交给任何受支持的集群管理器。

Java / Scala启动Spark作业

​org.apache.spark.launcher​​包提供了使用简单的Java API将Spark作业作为子进程启动的类。

单元测试

Spark对任何流行的单元测试框架都很友好。只需在您的测试中创建一个主URL设置为local的SparkContext,运行您的操作,然后调用​​SparkContext.stop()​​​将其销毁。确保在​​finally​​​块或测试框架的​​tearDown​​​方法中停止​​context​​,因为Spark不支持在同一个程序中同时运行两个上下文。

​​Spark官方文档​​


举报

相关推荐

0 条评论