文章目录
- spark案例
- 1、sparkpi
- 2、单词计数
- spark函数
- map:文本映射成双精度
- collect:list转化成rdd
- filter:数据过滤
- flatMap:将单词转化成(key, value)对
- union:联合运算
- join:连接运算
- lookup:按key查找values
- groupByKey:按key值进行分组
- sortByKey:按key进行排序,参数false表示倒序
spark案例
1、sparkpi
hadoop-master:/opt/spark-2.1.0-bin-hadoop2.7$ vim SparkPi
hadoop-master:/opt/spark-2.1.0-bin-hadoop2.7$ run-example SparkPi 10 > SparkPi.txtobject SparkPi {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("Spark Pi")
      .getOrCreate()
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / (n - 1))
    spark.stop()
  }
}hadoop-master:/opt/spark-2.1.0-bin-hadoop2.7$ more SparkPi.txt 
Pi is roughly 3.14277114277114272、单词计数
对/opt/spark-2.1.0-bin-hadoop2.7/examples/src/main/java/org/apache/spark/examples目录中的JavaWordCount,java进行编译
详细请查阅:http://www.360doc.com/content/17/0323/17/41381374_639514974.shtml
运行
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ bin/spark-submit --ddai-master spark://ddai-master:7077 --class org.apache.spark.examples.JavaWordCount --executor-memory 2g examples/jars/spark-examples_2.11-2.1.0.jar /input > WordCount.txt
代码详解
public final class JavaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");
  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      System.err.println("Usage: JavaWordCount <file>");
      System.exit(1);
    }
    //SparkSession为用户提供了一个统一的切入点来使用Spark的各项功能,
    //SparkSession的设计遵循了工厂设计模式(factory design pattern)。
    SparkSession spark = SparkSession
      .builder()
      .appName("JavaWordCount")
      .getOrCreate();
    //创建完SparkSession之后,我们就可以使用它来读取数据,
    //textFile()读取数据,javaRDD()定义一个RDD,返回每一行作为一个元素的RDD。
    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
    //FlatMapFunction<String, String>第一个参数为传入的内容,
    //第二个参数为函数操作完后返回的结果类型,将每一行映射成多个单词。
    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      
      public Iterator<String> call(String s) {
        return Arrays.asList(SPACE.split(s)).iterator();
      }
    });
    //map操作:将单词转化成(word, 1)对
    //PairFunction<String, String, Integer>第一个参数为内容,
    //第三个参数为函数操作完后返回的结果类型。
    JavaPairRDD<String, Integer> ones = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<>(s, 1);
        }
      });
    //reduce操作: 分组并按键值添加对以产生计数。
    JavaPairRDD<String, Integer> counts = ones.reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    //完成单词集合计算,返回整个RDD。
    List<Tuple2<String, Integer>> output = counts.collect();
    for (Tuple2<?,?> tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());
    }
    //Spark服务关闭
    spark.stop();
  }
}spark函数

hadoop-master:~$ hdfs dfs -mkdir /spark
hadoop-master:~$ hdfs dfs -put spam.data /spark
hadoop-master:~$ hdfs dfs -text /spark/spam.data

map:文本映射成双精度
scala> val nums = inFile.map(x=>x.split(’ ').map(_.toDouble))
 nums: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[2] at map at <console>:26
scala> nums.first()
 res1: Array[Double] = Array(0.0, 0.64, 0.64, 0.0, 0.32, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.64, 0.0, 0.0, 0.0, 0.32, 0.0, 1.29, 1.93, 0.0, 0.96, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.778, 0.0, 0.0, 3.756, 61.0, 278.0, 1.0)
collect:list转化成rdd
scala> val rdd = sc.parallelize(List(1,2,3,4,5))
 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
 scala> val mapRdd = rdd.map(2*_)
 mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26
 scala> mapRdd.collect
 res2: Array[Int] = Array(2, 4, 6, 8, 10)
filter:数据过滤
scala> val filterRdd = mapRdd.filter(_>5)
 filterRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:28
 scala> filterRdd.collect
 res3: Array[Int] = Array(6, 8, 10)
flatMap:将单词转化成(key, value)对
scala> val rdd = sc.textFile("/input")
 rdd: org.apache.spark.rdd.RDD[String] = /input MapPartitionsRDD[30] at textFile at :24
 scala> rdd.cache
 res12: rdd.type = /input MapPartitionsRDD[30] at textFile at :24
 scala> val wordCount = rdd.flatMap(.split(’ ')).map(x=>(x,1)).reduceByKey(+_)
 wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at :26
 scala> wordCount.collect
 res13: Array[(String, Int)] = Array((little.,1), (between,1), (station,1), (too,2), (hadoop,3), (a,1), (good,1), (much,1), (is,1), (hello,2), (bye,1), (world,1), (Happiness,1), (and,1), (way,1))
 scala> wordCount.saveAsTextFile("/output/w")

union:联合运算
scala> val rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2)))
 rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
 scala> val rdd2 = sc.parallelize(List((‘b’,1),(‘b’,2)))
 rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
 scala> rdd1 union rdd2
 res4: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[8] at union at <console>:29
scala> res4.collect
 res7: Array[(Char, Int)] = Array((a,1), (a,2), (b,1), (b,2))
join:连接运算
scala> val rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2),(‘b’,3),(‘b’,4)))
 rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24
 scala> val rdd2 = sc.parallelize(List((‘a’,5),(‘a’,6),(‘b’,7),(‘b’,8)))
 rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
 scala> val rdd3 = rdd1 join rdd2
 rdd3: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[13] at join at <console>:28
 scala> rdd3.collect
 res8: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6)))
lookup:按key查找values
scala> var rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2),(‘b’,3),(‘b’,4)))
 rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:24
 scala> rdd1.lookup(‘a’)
 res9: Seq[Int] = WrappedArray(1, 2)
groupByKey:按key值进行分组
scala> var wc = sc.textFile("/input").flatMap(.split(’ ')).map((,1)).groupByKey
 wc: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[21] at groupByKey at <console>:24
 scala> wc.collect
 res10: Array[(String, Iterable[Int])] = Array((little.,CompactBuffer(1)), (between,CompactBuffer(1)), (station,CompactBuffer(1)), (too,CompactBuffer(1, 1)), (hadoop,CompactBuffer(1, 1, 1)), (a,CompactBuffer(1)), (good,CompactBuffer(1)), (much,CompactBuffer(1)), (is,CompactBuffer(1)), (hello,CompactBuffer(1, 1)), (bye,CompactBuffer(1)), (world,CompactBuffer(1)), (Happiness,CompactBuffer(1)), (and,CompactBuffer(1)), (way,CompactBuffer(1)))
sortByKey:按key进行排序,参数false表示倒序
scala> var wc = sc.textFile("/input").flatMap(.split(’ ')).map((,1)).sortByKey(false)
 wc: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at sortByKey at <console>:24
 scala> wc.collect
 res11: Array[(String, Int)] = Array((world,1), (way,1), (too,1), (too,1), (station,1), (much,1), (little.,1), (is,1), (hello,1), (hello,1), (hadoop,1), (hadoop,1), (hadoop,1), (good,1), (bye,1), (between,1), (and,1), (a,1), (Happiness,1))










