0
点赞
收藏
分享

微信扫一扫

Spark编程

彭维盛 2022-09-14 阅读 243


进入Spark的命令:spark-shell

Spark编程_spark


RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点的存放元素的集合。RDD有三种不同的创建方法,1、对程序中存放的基本数据结构中的集合进行并行化,2、通过对已有RDD转化得到新的RDD,3、直接读取外部存储的数据集。

从内存中已有数据创建RDD

两种常用方法:1、转化Seq集合为RDD,2、从已有的RDD转化为新的RDD
SparkContext类中有两个方法:parallelize和makeRDD。通过这两种方法可将单机数据创建为分布式RDD。
parallelize
有两个参数输出,(1)要转化的集合,必须是Seq集合(2)分区数,不设分区数,则默认为该Application分配到的资源的CPU数

scala> val data=Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val disdata=sc.parallelize(data)
disdata: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> disdata.partitions.size
res0: Int = 1

默认分区数为1。
makeRDD
两种实现方法:第一种和parallelize完全一致。另一种接收的参数类型是Seq[(T,Seq[String])],生成的RDD中保存的是T的值。

从外部存储创建RDD

第一种方法常用语测试,这种方法常用于实践
从HDFS文件创建RDD
首先将本地文件上传到HDFS

hadoop fs -put /home/cloudera/Desktop/words.txt /user/hadoop
hadoop fs -ls /user/hadoop

读取文件

scala> val test = sc.textFile("/user/hadoop/words.txt")
test: org.apache.spark.rdd.RDD[String] = /user/hadoop/words.txt MapPartitionsRDD[3] at textFile at <console>:27

我把textFile打成了testFile,还看了半天找不出错。。。
从Linux本地文件创建RDD

scala> val test=sc.textFile("file:///home/cloudera/Desktop/words.txt")
test: org.apache.spark.rdd.RDD[String] = file:///home/cloudera/Desktop/words.txt MapPartitionsRDD[5] at textFile at <console>:27

使用map转换数据

val distData=sc.parallelize(List(1,3,45,3,76))
val sq_dist=distData.map(x=>x*x)

使用sortBy()排序

(1)第一个参数是一个函数f(T)=>K,左边是要被排序对象的每一个元素,右边是返回的值是元素中要进行排序的值。
(2)第二个参数是ascending,决定排序是升序还是降序,默认是true也就是升序。
(3)第三个参数是numPartitions,决定排序后的分区个数,默认排序后的分区个数和排序之前的个数相等。

scala> val data=sc.parallelize(List((1,3),(45,3),(7,6)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val sort_data=data.sortBy(x=>x._2,false,1)
sort_data: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at sortBy at <console>:29

使用collect()查询

collect()函数是一个行动操作,把RDD所有元素转换成数组并返回到Driver端,适用于小数据处理后的返回
(1)collect():Array[T]直接返回RDD中的所有元素,返回类型是一个数组

scala> data.collect
res3: Array[(Int, Int)] = Array((1,3), (45,3), (7,6))

scala> sort_data.collect
res4: Array[(Int, Int)] = Array((7,6), (1,3), (45,3))

(2)

collectU:ClassTag:RDD[U]

需要提供一个偏函数

scala> val one:PartialFunction[Int,String]={case 1 =>"one";case _ => "other"}
one: PartialFunction[Int,String] = <function1>

scala> val data=sc.parallelize(List(2,3,1))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27

scala> data.collect(one).collect
res5: Array[String] = Array(other, other, one)

将数组中数值为1的替换成one,其他替换成other

使用flatMap转换数据

flatMap的操作是将函数应用于RDD之中的每一个元素,将返回的迭代器(数组、列表等)中的所有元素构成新的RDD。使用flatMap就是先map再flat。

scala> val test=sc.parallelize(List("How are you","I am fine","What about you")) 
test: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27

scala> test.collect
res6: Array[String] = Array(How are you, I am fine, What about you)

scala> test.flatMap(x=>x.split(" ")).collect
res8: Array[String] = Array(How, are, you, I, am, fine, What, about, you)

scala> test.map(x=>x.split(" ")).collect
res9: Array[Array[String]] = Array(Array(How, are, you), Array(I, am, fine), Array(What, about, you))

使用take()方式查询某几个值

take(N)用来获取RDD的前N个元素,返回类型为数组。

scala> val data=sc.parallelize(1 to 10)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:27

scala> data.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> data.take(5)
res11: Array[Int] = Array(1, 2, 3, 4, 5)

使用union()合并多个RDD

union是一种转换操作,用于将两个RDD的元素合并成一个,不进行去重操作。

scala> val rdd1=sc.parallelize(List(('a',1),('b',2),('c',3)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:27

scala> val rdd2=sc.parallelize(List(('a',1),('d',4),('e',5)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:27

scala> rdd1.union(rdd2).collect
res12: Array[(Char, Int)] = Array((a,1), (b,2), (c,3), (a,1), (d,4), (e,5))

使用filter()进行过滤

filter()是一种转换操作,用来过滤RDD中的元素。filter需要一个参数,参数是一个用来过滤的函数,该函数的返回值是布尔类型。最后结果是一个存储符合过滤条件的所有元素的新RDD。

scala> rdd1.filter(x=>x._2>1).collect
res13: Array[(Char, Int)] = Array((b,2), (c,3))

使用distinct()进行去重

用于RDD的数据去重

scala> val rdd=sc.makeRDD(List(('a',1),('b',1),('a',1),('c',1)))
rdd: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[15] at makeRDD at <console>:27

scala> rdd.distinct.collect
res15: Array[(Char, Int)] = Array((b,1), (a,1), (c,1))

简单的集合操作

intersection()求并集

val rdd1=sc.parallelize(List(('a',1),('b',1),('a',1),('c',1)))
val rdd2=sc.parallelize(List(('a',1),('b',1),('d',1)))
rdd1.intersection(rdd2).collect

输出:

Spark编程_apache_02


subtract()求差集

rdd1.subtract(rdd2).collect

Spark编程_apache_03


cartesian()求笛卡尔积

val rdd1=sc.parallelize(List(1,3,5,3))
val rdd2=sc.parallelize(List(2,4,5,1))
rdd1.cartesian(rdd2).collect

Spark编程_scala_04


举报

相关推荐

0 条评论