简介
下面主要了解一下shuffle过程生成的index和data文件, 里面存储什么样的数据, 怎么读取
maven
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
reduce测试
@Test
public void reduceTest() {
JavaSparkContext context = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("LocalTest"));
//目录结构 C:\\Users\\xx\\AppData\\Local\\Temp\\spark-2ff5b976-7017-4622-8f19-ba09c545740c\\userFiles-c98c2979-d9cd-4b9b-b511-3782e30c19cc
System.out.println("driver目录:" + context.env().driverTmpDir().get());
ArrayList<Tuple2<Integer, String>> datas = Lists.newArrayList();
for (int i = 0; i < 10000; i++) {
datas.add(new Tuple2<>(new Random().nextInt(100), "list_" + i));
}
for (int i = 0; i < 5000; i++) {
datas.add(new Tuple2<>(new Random().nextInt(50), "list_" + i));
}
List<Tuple2<Integer, String>> collect = context.parallelizePairs(datas,5).reduceByKey(((v1, v2) -> v1 + "==>" + v2)).collect();
//List<Tuple2<Integer, String>> collect2 = context.parallelizePairs(datas,5).reduceByKey(((v1, v2) -> v1 + "-->" + v2)).collect();
while (true) {
}
}
运行该测试用例会在本地的临时目录生成spark运行时的文件,结构如下. 在blockmgr目录下会.data和.index文件
index文件读取
@Test
public void readShuffleIndex() throws IOException {
String dir = "C:\\Users\\xx\\Desktop\\新建文件夹";
File[] indexFiles = new File(dir).listFiles((f) -> f.getName().endsWith(".index"));
for (File indexFile : indexFiles) {
DataInputStream ds = new DataInputStream(new FileInputStream(indexFile));
System.out.printf("%s, offset:%s -> %s -> %s %n", indexFile.getName(), ds.readLong(), ds.readLong(),ds.readLong());
}
}
index的文件是二进制格式, 需要使用对象流进行读取,这里把临时目录下的这两类文件单拎出来放到同个目录下(方便测试). 源码位置IndexShuffleBlockResolver.getBlockData()
测试用例运行结果
data文件读取
@Test
public void readShuffleData() throws IOException {
String dir = "C:\\Users\\xx\\Desktop\\新建文件夹";
JavaSparkContext context = new JavaSparkContext(new SparkConf().setMaster("local[2]").setAppName("LocalTest"));
SerializerManager sm = context.env().blockManager().serializerManager();
File[] indexFiles = new File(dir).listFiles((f) -> f.getName().endsWith(".data"));
BlockId apply = BlockId.apply(indexFiles[0].getName().replace(".data",""));
Iterator<Object> objectIterator = sm.dataDeserializeStream(apply, new FileInputStream(indexFiles[0]), null);
objectIterator.foreach(v -> {
System.out.println(v);
return null;
});
}
data文件会进行压缩, 需要使用spark环境提供的序列化工具进行读取
参考
MapReduce Shuffle 和 Spark Shuffle 原理概述
Spark2.3.2源码解析:Shuffle 过程写入的 数据文件&索引文件
彻底搞懂spark的shuffle过程(shuffle write)