Spark入门第一步:WordCount之java版、Scala版
Spark入门系列,第一步,编写WordCount程序。
我们分别使用java和scala进行编写,从而比较二者的代码量
数据文件 通过读取下面的文件内容,统计每个单词出现的次数
java scala python android
spark storm spout bolt
kafka MQ
elasticsearch logstash kibana
hive hbase mysql oracle sqoop
hadoop hdfs map reduce
java scala python android
spark storm spout bolt
kafka MQ
java scala python android
spark storm spout bolt
kafka MQ
elasticsearch logstash kibana
hive hbase mysql oracle sqoop
hive hbase mysql oracle sqoop
hadoop hdfs map reduce
代码实现
- 使用java代码进行编写
package top.wintp.java_spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import scala.Tuple2;
/**
* @author: pyfysf
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/10/26
*/
public class SparkWordCount {
public static void main(String[] args) {
// 复杂模式
// 创建SparkConf
SparkConf conf = new SparkConf();
conf.setAppName("spark_demo_java");
conf.setMaster("local");
// 创建javaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取文件
JavaRDD<String> lines = sc.textFile("./data/words.txt");
// 截取单词
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split("\\s+")).iterator();
}
});
// 对单词进行计数
JavaPairRDD<String, Integer> pairWord = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
// 根据key进行计算
JavaPairRDD<String, Integer> result = pairWord.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i, Integer i2) throws Exception {
return i + i2;
}
});
//打印结果
result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2);
}
});
sc.stop();
}
}
- 利用lamda表达式简化java代码
package top.wintp.java_spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import scala.Tuple2;
/**
* @author: pyfysf
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/10/26
*/
public class SparkWordCount {
public static void main(String[] args) {
//lamda表达式
SparkConf conf = new SparkConf();
conf.setAppName("spark_demo_java");
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("./data/words.txt");
JavaRDD<String> words = lines.flatMap((String line) -> Arrays.asList(line.split("\\s+")).iterator());
JavaPairRDD<String, Integer> pairWords = words.mapToPair((String s) -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> result = pairWords.reduceByKey(Integer::sum);
result.foreach((Tuple2<String, Integer> res) -> System.out.println(res));
sc.stop();
}
}
- 使用scala代码编写
package top.wintp.scala_spark
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author: pyfysf
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/10/26
*/
object SparkWordCount {
def main(args: Array[String]): Unit = {
// 完整版
// 创建配置对象
val conf = new SparkConf()
// 设置运行模式
conf.setMaster("local")
// 设置任务名称
conf.setAppName("sparkTest")
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取文件
val lines = sc.textFile("./data/words.txt")
// 切割文件
val words = lines.flatMap((line: String) => {
line.split("\\s+")
})
// 对word进行计数
val pariWrod = words.map((tmp: String) => {
new Tuple2(tmp, 1)
})
// 根据key进行聚合
val result = pariWrod.reduceByKey((v1: Int, v2: Int) => {
v1 + v2
})
// 输出结果
result.foreach(println)
// 释放资源
sc.stop()
}
}
- 利用scala的特性简化代码
package top.wintp.scala_spark
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author: pyfysf
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/10/26
*/
object SparkWordCount {
def main(args: Array[String]): Unit = {
// 简洁版
val conf = new SparkConf().setAppName("sparkDemo").setMaster("local")
val sc = new SparkContext(conf)
val result = sc.textFile("./data/words.txt")
.flatMap(_.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
result.foreach(println)
sc.stop()
}
}
建议大家对于java版和scala版本的这两种方式都要掌握。特别是scala的一行代码版本。
附pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>top.wintp.spark</groupId>
<artifactId>SparkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>SparkDemo</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- Spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- SparkSQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- SparkSQL ON Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!--mysql依赖的jar包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--SparkStreaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- SparkStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- 向kafka 生产数据需要包 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<!--连接 Redis 需要的包-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.6.1</version>
</dependency>
<!-- Scala 包-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.7</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.7</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- maven 打jar包需要插件 -->
<!-- <plugin>-->
<!-- <artifactId>maven-assembly-plugin</artifactId>-->
<!-- <version>2.4</version>-->
<!-- <configuration>-->
<!-- <!– 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” –>-->
<!-- <!–<appendAssemblyId>false</appendAssemblyId>–>-->
<!-- <descriptorRefs>-->
<!-- <descriptorRef>jar-with-dependencies</descriptorRef>-->
<!-- </descriptorRefs>-->
<!-- <archive>-->
<!-- <manifest>-->
<!-- <mainClass>com.bjsxt.scalaspark.sql.windows.OverFunctionOnHive</mainClass>-->
<!-- </manifest>-->
<!-- </archive>-->
<!-- </configuration>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <id>make-assembly</id>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>assembly</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<!-- 以上assembly可以将依赖的包打入到一个jar包中,下面这种方式是使用maven原生的方式打jar包,不将依赖的包打入到最终的jar包中 -->
<!--<plugin>-->
<!--<groupId>org.apache.maven.plugins</groupId>-->
<!--<artifactId>maven-jar-plugin</artifactId>-->
<!--<version>2.4</version>-->
<!--<configuration>-->
<!--<archive>-->
<!--<manifest>-->
<!--<addClasspath>true</addClasspath>-->
<!--<!– 指定当前主类运行时找依赖的jar包时 所有依赖的jar包存放路径的前缀 –>-->
<!--<classpathPrefix>/alljars/lib</classpathPrefix>-->
<!--<mainClass>com.bjsxt.javaspark.sql.CreateDataSetFromHive</mainClass>-->
<!--</manifest>-->
<!--</archive>-->
<!--</configuration>-->
<!--</plugin>-->
<!-- 拷贝依赖的jar包到lib目录 -->
<!--<plugin>-->
<!--<groupId>org.apache.maven.plugins</groupId>-->
<!--<artifactId>maven-dependency-plugin</artifactId>-->
<!--<executions>-->
<!--<execution>-->
<!--<id>copy</id>-->
<!--<phase>package</phase>-->
<!--<goals>-->
<!--<goal>copy-dependencies</goal>-->
<!--</goals>-->
<!--<configuration>-->
<!--<outputDirectory>-->
<!--<!– 将依赖的jar 包复制到target/lib下–>-->
<!--${project.build.directory}/lib-->
<!--</outputDirectory>-->
<!--</configuration>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
</plugins>
</build>
</project>