0
点赞
收藏
分享

微信扫一扫

Hadoop支持的文件格式之Parquet



文章目录


  • ​​0x00 文章内容​​
  • ​​0x01 行存储与列存储​​
  • ​​1. Avro与Parquet​​
  • ​​0x02 编码实现Parquet格式的读写​​

  • ​​1. 编码实现读写Parquet文件​​
  • ​​2. 查看读写Parquet文件结果​​
  • ​​3. 编码实现读写Parquet文件(HDFS)​​
  • ​​4. 查看读写Parquet文件(HDFS)结果​​

  • ​​0x03 彩蛋​​
  • ​​0xFF 总结​​


0x00 文章内容


  1. 行存储与列存储
  2. 编码实现Parquet格式的读写
  3. 彩蛋

0x01 行存储与列存储

1. Avro与Parquet

a. 请参考文章:​​Hadoop支持的文件格式之Avro​​​的​​0x01 行存储与列存储​​

0x02 编码实现Parquet格式的读写

1. 编码实现读写Parquet文件

a. 引入Parquet相关jar包

<!--添加Parquet依赖-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.1</version>
</dependency>

b. 完整的写Parquet文件代码(写到HDFS)

package com.shaonaiyi.hadoop.filetype.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import java.io.IOException;

/**
* @Author shaonaiyi@163.com
* @Date 2019/12/18 10:14
* @Description 编码实现写Parquet文件
*/
public class ParquetFileWriter {

public static void main(String[] args) throws IOException {
MessageType schema = MessageTypeParser.parseMessageType("message Person {\n" +
" required binary name;\n" +
" required int32 age;\n" +
" required int32 favorite_number;\n" +
" required binary favorite_color;\n" +
"}");

Configuration configuration = new Configuration();
Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/parquet/data.parquet");
GroupWriteSupport writeSupport = new GroupWriteSupport();
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(path, writeSupport,
CompressionCodecName.SNAPPY,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
ParquetProperties.WriterVersion.PARQUET_1_0, configuration);

GroupFactory groupFactory = new SimpleGroupFactory(schema);
Group group = groupFactory.newGroup()
.append("name", "shaonaiyi")
.append("age", 18)
.append("favorite_number", 7)
.append("favorite_color", "red");

writer.write(group);

writer.close();
}

}

c. 完整的读Parquet文件代码(从HDFS读)

package com.shaonaiyi.hadoop.filetype.parquet;

import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;

import java.io.IOException;

/**
* @Author shaonaiyi@163.com
* @Date 2019/12/18 10:18
* @Description 编码实现读Parquet文件
*/
public class ParquetFileReader {

public static void main(String[] args) throws IOException {


Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/parquet/parquet-data.parquet");
GroupReadSupport readSupport = new GroupReadSupport();
ParquetReader<Group> reader = new ParquetReader<>(path, readSupport);

Group result = reader.read();
System.out.println("name:" + result.getString("name", 0).toString());
System.out.println("age:" + result.getInteger("age", 0));
System.out.println("favorite_number:" + result.getInteger("favorite_number", 0));
System.out.println("favorite_color:" + result.getString("favorite_color", 0));
}

}
2. 查看读写Parquet文件结果

a. 写Parquet文件

Hadoop支持的文件格式之Parquet_hadoop

b. 读Parquet文件

Hadoop支持的文件格式之Parquet_parquet_02

3. 编码实现读写Parquet文件(HDFS)

a. 引入Parquet与Avro关联的jar包

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.1</version>
</dependency>

从上面的代码我们可以看出,以下面这种方式定义Schema很不友好:

MessageType schema = MessageTypeParser.parseMessageType("message Person {\n" +
" required binary name;\n" +
" required int32 age;\n" +
" required int32 favorite_number;\n" +
" required binary favorite_color;\n" +
"}");

所以我们可以将Parquet与Avro关联,直接使用Avro的Schema即可。

b. 完整的写Parquet文件代码(HDFS)

package com.shaonaiyi.hadoop.filetype.parquet;

import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.parquet.avro.AvroParquetOutputFormat;

import java.io.IOException;

/**
* @Author shaonaiyi@163.com
* @Date 2019/12/18 10:47
* @Description 编码实现写Parquet文件(HDFS)
*/
public class MRAvroParquetFileWriter {

public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {
//1 构建一个job实例
Configuration hadoopConf = new Configuration();
Job job = Job.getInstance(hadoopConf);

//2 设置job的相关属性
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Person.class);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
//AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
AvroParquetOutputFormat.setSchema(job, Person.SCHEMA$);


//3 设置输出路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet"));

//4 构建JobContext
JobID jobID = new JobID("jobId", 123);
JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);

//5 构建taskContext
TaskAttemptID attemptId = new TaskAttemptID("attemptId", 123, TaskType.REDUCE, 0, 0);
TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);

//6 构建OutputFormat实例
OutputFormat format = job.getOutputFormatClass().newInstance();

//7 设置OutputCommitter
OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
committer.setupJob(jobContext);
committer.setupTask(hadoopAttemptContext);

//8 获取writer写数据,写完关闭writer
RecordWriter<Void, Person> writer = format.getRecordWriter(hadoopAttemptContext);
Person person = new Person();
person.setName("shaonaiyi");
person.setAge(18);
person.setFavoriteNumber(7);
person.setFavoriteColor("red");
writer.write(null, person);
writer.close(hadoopAttemptContext);

//9 committer提交job和task
committer.commitTask(hadoopAttemptContext);
committer.commitJob(jobContext);
}

}

c. 完整的读Parquet文件代码(HDFS)

package com.shaonaiyi.hadoop.filetype.parquet;

import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.parquet.avro.AvroParquetInputFormat;

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
* @Author shaonaiyi@163.com
* @Date 2019/12/18 10:52
* @Description 编码实现读Parquet文件(HDFS)
*/
public class MRAvroParquetFileReader {

public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException {
//1 构建一个job实例
Configuration hadoopConf = new Configuration();

Job job = Job.getInstance(hadoopConf);

//2 设置需要读取的文件全路径
FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet");

//3 获取读取文件的格式
AvroParquetInputFormat inputFormat = AvroParquetInputFormat.class.newInstance();

AvroParquetInputFormat.setAvroReadSchema(job, Person.SCHEMA$);
//AvroJob.setInputKeySchema(job, Person.SCHEMA$);

//4 获取需要读取文件的数据块的分区信息
//4.1 获取文件被分成多少数据块了
JobID jobID = new JobID("jobId", 123);
JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);

List<InputSplit> inputSplits = inputFormat.getSplits(jobContext);

//读取每一个数据块的数据
inputSplits.forEach(new Consumer<InputSplit>() {
@Override
public void accept(InputSplit inputSplit) {
TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0);
TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
RecordReader<NullWritable, Person> reader = null;
try {
reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
reader.initialize(inputSplit, hadoopAttemptContext);
while (reader.nextKeyValue()) {
System.out.println(reader.getCurrentKey());
Person person = reader.getCurrentValue();
System.out.println(person);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

}

}
4. 查看读写Parquet文件(HDFS)结果

a. 写Parquet文件(HDFS)

Hadoop支持的文件格式之Parquet_文件格式_03

b. 读Parquet文件(HDFS),Key没有设置值

Hadoop支持的文件格式之Parquet_apache_04

0x03 彩蛋

  1. 编写读写Parquet文件Demo
package com.shaonaiyi.hadoop.filetype.parquet;

import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

/**
* @Author shaonaiyi@163.com
* @Date 2019/12/18 11:11
* @Description 编写读写Parquet文件Demo
*/
public class AvroParquetDemo {

public static void main(String[] args) throws IOException {
Person person = new Person();
person.setName("shaonaiyi");
person.setAge(18);
person.setFavoriteNumber(7);
person.setFavoriteColor("red");

Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet2");

ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
.withSchema(Person.SCHEMA$)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();

writer.write(person);

writer.close();

ParquetReader<Object> avroParquetReader = AvroParquetReader.builder(path).build();
Person record = (Person)avroParquetReader.read();
System.out.println("name:" + record.getName());
System.out.println("age:" + record.get("age").toString());
System.out.println("favorite_number:" + record.get("favorite_number").toString());
System.out.println("favorite_color:" + record.get("favorite_color"));

}


}

  1. 控制台可以读出文件
    Hadoop支持的文件格式之Parquet_文件格式_05
  2. HDFS上也有数据了
    Hadoop支持的文件格式之Parquet_hdfs_06

0xFF 总结

  1. 在MapReduce作业中如何使用:
job.setInputFormatClass(AvroParquetInputFormat.class);
AvroParquetInputFormat.setAvroReadSchema(job, Person.SCHEMA$);

job.setOutputFormatClass(ParquetOutputFormat.class);
AvroParquetOutputFormat.setSchema(job, Person.SCHEMA$);

  1. 文章:​​网站用户行为分析项目之会话切割(二)​​ 中 ​​9. 保存统计结果​​ 时就是以Parquet的格式保存下来的。
  2. Hadoop支持的文件格式系列:
    ​​Hadoop支持的文件格式之Text​​​​Hadoop支持的文件格式之Avro​​​​Hadoop支持的文件格式之Parquet​​​​Hadoop支持的文件格式之SequenceFile​​

全栈工程师、市场洞察者、专栏编辑

| ​​公众号​​​ | ​​微信​​​ | ​​微博​​​ | ​​​​​ ​​简书​​ |

福利:

​​邵奈一的技术博客导航​​

​​邵奈一​​ 原创不易,如转载请标明出处。


举报

相关推荐

0 条评论