目录
前言
本文是作者大数据系列中的一文,专栏地址:
https://blog.csdn.net/joker_zjn/category_12631789.html?spm=1001.2014.3001.5482
该系列会成体系的聊一聊整个大数据的技术栈,绝对干货,欢迎订阅。
1.下载安装
前置环境:
- Hadoop 3.1.3
- Java JDK 1.8
下载地址:
Downloads | Apache Spark
往下拉找到Spark release archives.
由于前面我们已经搭建好了hadoop环境,所以这里选择with out hadoop的版本。
配置config目录下有一个配置模板spark-env.sh.template:
将这个模板修改或者复制为spark-env.sh然后在里面:
因为Spark只是个计算引擎,具体要去操作对应的分部署文件系统的,所以将Spark的类路径指向了hadoop。也就是通过这个配置将Spark要操作的数据源设置为了HDFS。
启动:
bin目录下:
./run-exmaple SparkPi
这是一个Spark自带的demo,如果跑起来不报错,说明就没什么问题了。
2.RDD操作
可以用Spark自带的Spark shell来进行RDD操作:
./bin/spark-shell
RDD操作分为两类:
- 转换,就是只是返回中间数据集的操作。
- 动作,就是有具体单个返回值的操作。
map - 应用于RDD的每个元素,产生一个新的RDD。
filter - 根据函数条件过滤RDD中的元素。
flatMap - 对RDD中的每个元素应用函数并展平结果。
mapPartitions - 对每个分区应用一个函数。
union - 合并两个RDD。
distinct - 返回RDD中不重复的元素。
join - 对两个键值对RDD进行内连接。
reduce - 通过函数聚合RDD中的所有元素。
collect - 返回RDD的所有元素到Driver作为数组。
count - 返回RDD中元素的数量。
first - 返回RDD的第一个元素。
take(n) - 返回RDD的前n个元素。
saveAsTextFile - 将RDD的内容保存为文本文件。
foreach - 对RDD的每个元素应用函数,常用于副作用操作。
3.JAVA编程示例
依赖:
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
编码:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class WordCountFromHDFS {
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("Usage: WordCountFromHDFS <input path>");
System.exit(1);
}
// 初始化Spark配置
SparkConf conf = new SparkConf().setAppName("WordCountFromHDFS").setMaster("local"); // 本地模式运行,根据实际情况可改为yarn等
// 创建SparkContext实例
JavaSparkContext sc = new JavaSparkContext(conf);
// HDFS文件路径,这里直接从命令行参数获取
String inputPath = args[0];
// 从HDFS读取文件内容
JavaRDD<String> lines = sc.textFile(inputPath);
// 每行分割成单词,然后扁平化,最后统计每个单词出现的次数
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());
JavaPairRDD<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
// 收集结果并打印
List<Tuple2<String, Integer>> results = wordCounts.collect();
for (Tuple2<String, Integer> result : results) {
System.out.println(result._1() + ": " + result._2());
}
// 停止SparkContext
sc.stop();
}
}
4.Spark SQL
park SQL是Spark的一个组件,它从Spark 1.3.0版本开始被引入,并在后续版本中不断得到增强和发展。Spark SQL允许用户使用SQL或者DataFrame API来处理结构化和半结构化的数据。下面做个小小的演示。
假设我们有一个CSV文件位于HDFS上,我们可以用以下命令加载它:
创建临时视图:
执行sql:
连表查询:
当然Spark SQL也有对应的JAVA API,支持编程的方式来操作,用到的时候查一下就是,此处就不展开了。