小谈:
本篇将会讲到Spark RDD中的一些算子,这些算子都是关于映射的。
具体的是三个算子map,mapPartitions,MapPartitionWithIndex.这三个算子,讲的比较少,因为这两天一直在玩,先来一点开胃小菜,之后会将Spark中的读取外来文件的分区设定以及集合的分区设定在一篇文章里面讲一遍,其他的算子也会尽量每次讲解六七个。
map
我们可以用map做很多的事情,可以把URL集合中的每个URL对应的主机名字提取出来,也可以简单的只对各个数字求平均值,map()算子的返回值类型不需要和输入类型一样。
首先看一下map()算子的函数签名
def map[U: ClassTag](f: T => U): RDD[U]
参数f是一个函数,可以接受一个函数,当RDD执行map方法的时候,会遍历RDD中的每一个元素,并且每一个元素都会经过map这个操作,然后产生一个新的RDD。
举一个例子
创建一个1-4数组成的RDD,两个分区,将说有的元素*2得到一个新的RDD
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)
val value1 = value.map(_ * 2)
println(value1.partitions.size)
value1.collect().foreach(println(_))
需要注意的是,如果上面在Master设置的是Local,如果在读取List的时候没有进行指定分区个数,那么系统默认是1个分区,如果指定了分区个数,那么分区个数就是设置的那么分区个数。如果是Local[*] 那么不设置话,默认为本地的cpu核数,比如我的电脑cpu的核数是12,那么就会有12分区,在saveAsTextFile的时候就会有12个文件。
上面说了,map会一次只处理一个分区的里面的一个元素。
处理完之后,依旧是原来的分区。
在map的底层用到了MapPartitionsRDD
可以看到里面重写了getPartitions方法,
protected[spark] def firstParent[U: ClassTag]: RDD[U] = { dependencies.head.rdd.asInstanceOf[RDD[U]] }
上面的dependencies.head.rdd指的就是上级依赖,在RDD中还有依赖这一说,在之后博客会讲到,也就是新创的RDD依赖于之前的RDD。map转换之后的RDD依赖转换之前的RDD。新的RDD会获取到旧的RDD的分区,这就是为什么转换之后,分区的元素依旧没变
mapPartitions
mapPartitions会以整个分区进行运算,相对于map来说,可以一次性处理一整个分区的个数,但是如果数据过大,可能内存不够报错
看一下函数签名
要求输入的参数是迭代器类型。
在1-4数组里面筛选出来可以整除2的
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)
val value1 = value.mapPartitions(datas => { datas.filter(_ % 2 == 0) })
value1.collect().foreach(println(_))
上面这个只是入门,下面来一个训练,既然是按照分区来计算的,那么求每个分区的最大值
先图解一下,
两个分区,每调用一次就会对一个分区进行处理,找到该分区的最大值。根据思路,就可以写出代码
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)
val value1 = value.mapPartitions(iter => { List(iter.max).iterator })
value1.collect().foreach(println(_)) }
mapPartitions要求输入参数是iter,返回值类型也是iter。
可以对每一个iter,这里的iter是指一个分区里面的数字组成的iter。在iter里面找到最大值。找到最大值之后再封装成iter类型
MapPartitionsWithIndex
这个算子要求输入的参数是(index,iter)
第一个参数就是分区的下标,第二个参数就是每个分区里面的数据。
每一个分区下表和每一个分区里面的元素组成了一个元组
下面来一个案例,筛选出分区为2的元素
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
//1 2 一个分区 3 4 一个分区
val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)
val value1 = value.mapPartitionsWithIndex((index, iter) => {
//如果是第二个分区,那么就返回iter,如果不是,就返回空的
if (index == 1) { iter } else { Nil.iterator } })
value1.collect().foreach(println(_))
第二个分区,也就是 3 4 ,所以最后输出的结果就是3和4.
下面图解一下
上面图解所对应的代码如下,
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)
val value1 = value.mapPartitionsWithIndex((index, iter) => {
iter.map(num => (index, num)) })
value1.collect().foreach(println(_))
总结:
今天介绍的就是关于map的算子,关于Spark的算子后很多,介绍完Spark中的算子之后,就会讲解累加器和分区器。
后续就是Spark SQL了,关于SparkSQL应该就会在二十天之后了把。
到了Spark SQL的时候了,就会跟Hive集成了。这就是前面我们学习Hive的作用,不仅是为了数仓的学习,也是为了Spark的学习
明天也会更新Spark系列和Scala系列