0
点赞
收藏
分享

微信扫一扫

Hadoop支持的文件格式之Avro

菜菜捞捞 2022-01-31 阅读 67



文章目录


  • ​​0x00 文章内容​​
  • ​​0x01 行存储与列存储​​

  • ​​1. 行存储​​
  • ​​2. 列存储​​
  • ​​2. Avro与Parquet​​

  • ​​0x02 编码实现Avro格式的读写​​

  • ​​1. 编码实现读写Avro文件​​
  • ​​2. 查看读写Avro文件结果​​
  • ​​3. 编码实现读写Avro文件(HDFS)​​
  • ​​4. 查看读写Avro文件结果(HDFS)​​

  • ​​0xFF 总结​​


0x00 文章内容


  1. 行存储与列存储
  2. 编码实现Avro格式的读写

0x01 行存储与列存储

比如现在有一张表,数据如下:

Hadoop支持的文件格式之Avro_java

分别用行存储于列存储。

1. 行存储

a. 行存储的存储方式

Hadoop支持的文件格式之Avro_java_02

传统数据库就是行存储,如MySQL等。

2. 列存储

a. 列存储的存储方式

Hadoop支持的文件格式之Avro_文件格式_03

其中,这里对行进行了一个split,两行为一个split。思想与HBase的Region分区类似。


  • HBase理论参考文章:​​浅显易懂入门大数据系列:四、HBase(超详细)​​​的 ​​五、HBase的存储结构​
  • 并且了解行存储与列存储的优缺点。

2. Avro与Parquet

a. Avro是行存储,Parquet是列存储。

b. 还需要清楚的是Avro与Parquet格式都是有Schema的,即结构。类似于我们传统数据库的字段,所以在写的时候需要指定。

0x02 编码实现Avro格式的读写

1. 编码实现读写Avro文件

a. 引入Avro相关jar包

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.0</version>
</dependency>

b. 引入Avro的Schema文件,编辑,放于​​src/main/data​​​目录下,命名为:​​person.avsc​

{"namespace": "com.shaonaiyi.hadoop.filetype.avro",
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}

我们准备使用此定义好结构的文件生成一个对应的Java实体类,所以这里定义了实体类存放的位置,这里是:​​com.shaonaiyi.hadoop.filetype.avro​

Hadoop支持的文件格式之Avro_java_04

c. 我们准备使用Maven插件工具生成Java类,此处引入插件:

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.7</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/data</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

Hadoop支持的文件格式之Avro_文件格式_05

d. 生成Java类(​​clean​​->​​compile​​)

Hadoop支持的文件格式之Avro_文件格式_06

执行完,会发现已经生成了一个​​Person​​类,可能会报错,我们将​​@Override​​注释掉即可,因为之前写过一些代码,所以报错了,不管它。

Hadoop支持的文件格式之Avro_apache_07

e. ​​Person​​类里面这行就是我们所需要的​​Schema​​,对应着我们的​​person.avsc​

public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"com.shaonaiyi.hadoop.filetype.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");

g. 完整的写Avro文件代码

package com.shaonaiyi.hadoop.filetype.avro;

import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;

import java.io.File;
import java.io.IOException;

/**
* @Author shaonaiyi@163.com
* @Date 2019/12/17 16:17
* @Description 编码实现写Avro文件
*/
public class AvroFileWriter {

public static void main(String[] args) throws IOException {
GenericRecord record1 = new GenericData.Record(Person.SCHEMA$);
record1.put("name", "shaonaiyi");
record1.put("age", 18);
record1.put("favorite_number", 7);
record1.put("favorite_color", "red");

GenericRecord record2 = new GenericData.Record(Person.SCHEMA$);
record2.put("name", "shaonaier");
record2.put("age", 17);
record2.put("favorite_number", 1);
record2.put("favorite_color", "yellow");

File file = new File("person.avro");
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(Person.SCHEMA$);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.create(Person.SCHEMA$, file);
dataFileWriter.append(record1);
dataFileWriter.append(record2);
dataFileWriter.close();

}

}

h. 完整的读Avro文件代码

package com.shaonaiyi.hadoop.filetype.avro;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;

import java.io.File;
import java.io.IOException;
/**
* @Author shaonaiyi@163.com
* @Date 2019/12/17 16:48
* @Description 编码实现读Avro文件
*/
public class AvroFileReader {

public static void main(String[] args) throws IOException {

File file = new File("person.avro");
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, reader);

GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next();
System.out.println("name:" + record.get("name").toString());
System.out.println("age:" + record.get("age").toString());
System.out.println("favorite_number:" + record.get("favorite_number").toString());
System.out.println("favorite_color:" + record.get("favorite_color"));
System.out.println("-----------------------------------");
}

}

}
2. 查看读写Avro文件结果

a. 写Avro文件

Hadoop支持的文件格式之Avro_文件格式_08

b. 读Avro文件

Hadoop支持的文件格式之Avro_hadoop_09

3. 编码实现读写Avro文件(HDFS)

a. 引入所需要的jar包

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.8.0</version>
</dependency>

b. 写Avro文件到HDFS完整代码

package com.shaonaiyi.hadoop.filetype.avro;

import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import java.io.IOException;

/**
* @Author shaonaiyi@163.com
* @Date 2019/12/17 17:15
* @Description 编码实现写Avro文件到HDFS
*/
public class MRAvroFileWriter {

public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {

//1 构建一个job实例
Configuration hadoopConf = new Configuration();
Job job = Job.getInstance(hadoopConf);

//2 设置job的相关属性
// job.setOutputKeyClass(NullWritable.class);
// job.setOutputValueClass(Text.class);
// job.setOutputFormatClass(TextOutputFormat.class);

//job.setOutputKeyClass(AvroKey.class);
//job.setOutputValueClass(Person.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
//AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
AvroJob.setOutputKeySchema(job, Person.SCHEMA$);

//3 设置输出路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro"));

//FileOutputFormat.setCompressOutput(job, true);
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

//4 构建JobContext
JobID jobID = new JobID("jobId", 123);
JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);

//5 构建taskContext
TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.REDUCE, 0, 0);
TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);

//6 构建OutputFormat实例
OutputFormat format = job.getOutputFormatClass().newInstance();

//7 设置OutputCommitter
OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
committer.setupJob(jobContext);
committer.setupTask(hadoopAttemptContext);

//8 获取writer写数据,写完关闭writer
RecordWriter<AvroKey, Person> writer = format.getRecordWriter(hadoopAttemptContext);

// writer.write(null, new Text("shao"));
// writer.write(null, new Text("nai"));
// writer.write(null, new Text("yi"));
// writer.write(null, new Text("bigdata-man"));

Person person = new Person();
person.setName("jeffy");
person.setAge(20);
person.setFavoriteNumber(10);


person.setFavoriteColor("red");
writer.write(new AvroKey(person), null);

writer.close(hadoopAttemptContext);

//9 committer提交job和task
committer.commitTask(hadoopAttemptContext);
committer.commitJob(jobContext);

}

}

与写Text格式(文章链接跳转:​​Hadoop支持的文件格式之Text​​)时类似,主要不同如下:

Hadoop支持的文件格式之Avro_java_10

c. 从HDFS上读Avro文件完整代码

package com.shaonaiyi.hadoop.filetype.avro;

import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;

/**
* @Author shaonaiyi@163.com
* @Date 2019/12/17 17:29
* @Description 编码实现从HDFS上读Avro文件
*/
public class MRAvroFileReader {

public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException {
//1 构建一个job实例
Configuration hadoopConf = new Configuration();

Job job = Job.getInstance(hadoopConf);

//2 设置需要读取的文件全路径
FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro");

//3 获取读取文件的格式
// TextInputFormat inputFormat = TextInputFormat.class.newInstance();
AvroKeyInputFormat inputFormat = AvroKeyInputFormat.class.newInstance();

//4 获取需要读取文件的数据块的分区信息
//4.1 获取文件被分成多少数据块了
JobID jobID = new JobID("jobId", 123);
JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);

List<InputSplit> inputSplits = inputFormat.getSplits(jobContext);

//读取每一个数据块的数据
inputSplits.forEach(new Consumer<InputSplit>() {
@Override
public void accept(InputSplit inputSplit) {
TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0);
TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
// RecordReader reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
RecordReader<AvroKey<Person>, NullWritable> reader = null;
try {
// reader.initialize(inputSplit, hadoopAttemptContext);
// System.out.println("<key,value>");
// System.out.println("-----------");
// while (reader.nextKeyValue()) {
// System.out.println("<"+reader.getCurrentKey() + "," + reader.getCurrentValue()+ ">" );
// }
reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
reader.initialize(inputSplit, hadoopAttemptContext);
while (reader.nextKeyValue()) {
Person person = reader.getCurrentKey().datum();
System.out.println("key=>" + person);
System.out.println("value=>" + reader.getCurrentValue());
}
reader.close();

} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

}
}

与读Text格式(文章链接跳转:​​Hadoop支持的文件格式之Text​​)时类似,主要不同如下:

Hadoop支持的文件格式之Avro_文件格式_11

4. 查看读写Avro文件结果(HDFS)

a. 写文件结果

Hadoop支持的文件格式之Avro_apache_12

b. 读文件结果,我们在代码里没有设置值

Hadoop支持的文件格式之Avro_hadoop_13

0xFF 总结

  1. Hadoop支持的文件格式系列:
    ​​​Hadoop支持的文件格式之Text​​​​Hadoop支持的文件格式之Avro​​​​Hadoop支持的文件格式之Parquet​​​​Hadoop支持的文件格式之SequenceFile​​

全栈工程师、市场洞察者、专栏编辑

| ​​公众号​​​ | ​​微信​​​ | ​​微博​​​ | ​​​简书​​ |

福利:

​​邵奈一的技术博客导航​​

​​邵奈一​​ 原创不易,如转载请标明出处。


举报

相关推荐

0 条评论