0
点赞
收藏
分享

微信扫一扫

spark数据分析



文章目录

  • ​​spark案例​​
  • ​​1、sparkpi​​
  • ​​2、单词计数​​
  • ​​spark函数​​
  • ​​map:文本映射成双精度​​
  • ​​collect:list转化成rdd​​
  • ​​filter:数据过滤​​
  • ​​flatMap:将单词转化成(key, value)对​​
  • ​​union:联合运算​​
  • ​​join:连接运算​​
  • ​​lookup:按key查找values​​
  • ​​groupByKey:按key值进行分组​​
  • ​​sortByKey:按key进行排序,参数false表示倒序​​

spark案例

1、sparkpi

hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ vim SparkPi
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ run-example SparkPi 10 > SparkPi.txt
object SparkPi {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Spark Pi")
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1))
spark.stop()
}
}
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ more SparkPi.txt 

Pi is roughly 3.1427711427711427

2、单词计数

对/opt/spark-2.1.0-bin-hadoop2.7/examples/src/main/java/org/apache/spark/examples目录中的JavaWordCount,java进行编译

详细请查阅:​​http://www.360doc.com/content/17/0323/17/41381374_639514974.shtml​​

运行


hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ bin/spark-submit --ddai-master spark://ddai-master:7077 --class org.apache.spark.examples.JavaWordCount --executor-memory 2g examples/jars/spark-examples_2.11-2.1.0.jar /input > WordCount.txt


代码详解

public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
//SparkSession为用户提供了一个统一的切入点来使用Spark的各项功能,
//SparkSession的设计遵循了工厂设计模式(factory design pattern)。
SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();
//创建完SparkSession之后,我们就可以使用它来读取数据,
//textFile()读取数据,javaRDD()定义一个RDD,返回每一行作为一个元素的RDD。
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
//FlatMapFunction<String, String>第一个参数为传入的内容,
//第二个参数为函数操作完后返回的结果类型,将每一行映射成多个单词。
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});
//map操作:将单词转化成(word, 1)对
//PairFunction<String, String, Integer>第一个参数为内容,
//第三个参数为函数操作完后返回的结果类型。
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
//reduce操作: 分组并按键值添加对以产生计数。
JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
//完成单词集合计算,返回整个RDD。
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
//Spark服务关闭
spark.stop();
}
}

spark函数

spark数据分析_java

hadoop@ddai-master:~$ hdfs dfs -mkdir /spark
hadoop@ddai-master:~$ hdfs dfs -put spam.data /spark
hadoop@ddai-master:~$ hdfs dfs -text /spark/spam.data

spark数据分析_spark_02

spark数据分析_java_03

map:文本映射成双精度


scala> val nums = inFile.map(x=>x.split(’ ').map(_.toDouble))
nums: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[2] at map at ​​<console>​​:26

scala> nums.first()
res1: Array[Double] = Array(0.0, 0.64, 0.64, 0.0, 0.32, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.64, 0.0, 0.0, 0.0, 0.32, 0.0, 1.29, 1.93, 0.0, 0.96, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.778, 0.0, 0.0, 3.756, 61.0, 278.0, 1.0)


collect:list转化成rdd


scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at ​​<console>​​:24
scala> val mapRdd = rdd.map(2*_)
mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at ​​<console>​​:26
scala> mapRdd.collect
res2: Array[Int] = Array(2, 4, 6, 8, 10)


filter:数据过滤


scala> val filterRdd = mapRdd.filter(_>5)
filterRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at ​​<console>​​:28
scala> filterRdd.collect
res3: Array[Int] = Array(6, 8, 10)


flatMap:将单词转化成(key, value)对


scala> val rdd = sc.textFile("/input")
rdd: org.apache.spark.rdd.RDD[String] = /input MapPartitionsRDD[30] at textFile at :24
scala> rdd.cache
res12: rdd.type = /input MapPartitionsRDD[30] at textFile at :24
scala> val wordCount = rdd.flatMap(.split(’ ')).map(x=>(x,1)).reduceByKey(+_)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at :26
scala> wordCount.collect
res13: Array[(String, Int)] = Array((little.,1), (between,1), (station,1), (too,2), (hadoop,3), (a,1), (good,1), (much,1), (is,1), (hello,2), (bye,1), (world,1), (Happiness,1), (and,1), (way,1))
scala> wordCount.saveAsTextFile("/output/w")


spark数据分析_spark_04

union:联合运算


scala> val rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[6] at parallelize at ​​<console>​​:24
scala> val rdd2 = sc.parallelize(List((‘b’,1),(‘b’,2)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[7] at parallelize at ​​<console>​​:24
scala> rdd1 union rdd2
res4: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[8] at union at ​​<console>​​:29

scala> res4.collect
res7: Array[(Char, Int)] = Array((a,1), (a,2), (b,1), (b,2))


join:连接运算


scala> val rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2),(‘b’,3),(‘b’,4)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[9] at parallelize at ​​<console>​​:24
scala> val rdd2 = sc.parallelize(List((‘a’,5),(‘a’,6),(‘b’,7),(‘b’,8)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at ​​<console>​​:24
scala> val rdd3 = rdd1 join rdd2
rdd3: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[13] at join at ​​<console>​​:28
scala> rdd3.collect
res8: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6)))


lookup:按key查找values


scala> var rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2),(‘b’,3),(‘b’,4)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[14] at parallelize at ​​<console>​​:24
scala> rdd1.lookup(‘a’)
res9: Seq[Int] = WrappedArray(1, 2)


groupByKey:按key值进行分组


scala> var wc = sc.textFile("/input").flatMap(.split(’ ')).map((,1)).groupByKey
wc: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[21] at groupByKey at ​​<console>​​:24
scala> wc.collect
res10: Array[(String, Iterable[Int])] = Array((little.,CompactBuffer(1)), (between,CompactBuffer(1)), (station,CompactBuffer(1)), (too,CompactBuffer(1, 1)), (hadoop,CompactBuffer(1, 1, 1)), (a,CompactBuffer(1)), (good,CompactBuffer(1)), (much,CompactBuffer(1)), (is,CompactBuffer(1)), (hello,CompactBuffer(1, 1)), (bye,CompactBuffer(1)), (world,CompactBuffer(1)), (Happiness,CompactBuffer(1)), (and,CompactBuffer(1)), (way,CompactBuffer(1)))


sortByKey:按key进行排序,参数false表示倒序


scala> var wc = sc.textFile("/input").flatMap(.split(’ ')).map((,1)).sortByKey(false)
wc: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at sortByKey at ​​<console>​​:24
scala> wc.collect
res11: Array[(String, Int)] = Array((world,1), (way,1), (too,1), (too,1), (station,1), (much,1), (little.,1), (is,1), (hello,1), (hello,1), (hadoop,1), (hadoop,1), (hadoop,1), (good,1), (bye,1), (between,1), (and,1), (a,1), (Happiness,1))




举报

相关推荐

0 条评论