文章目录
- 0x00 文章内容
- 0x01 行存储与列存储
- 1. 行存储
- 2. 列存储
- 2. Avro与Parquet
- 0x02 编码实现Avro格式的读写
- 1. 编码实现读写Avro文件
- 2. 查看读写Avro文件结果
- 3. 编码实现读写Avro文件(HDFS)
- 4. 查看读写Avro文件结果(HDFS)
- 0xFF 总结
0x00 文章内容
- 行存储与列存储
- 编码实现Avro格式的读写
0x01 行存储与列存储
比如现在有一张表,数据如下:

分别用行存储于列存储。
1. 行存储
a. 行存储的存储方式

传统数据库就是行存储,如MySQL等。
2. 列存储
a. 列存储的存储方式

其中,这里对行进行了一个split,两行为一个split。思想与HBase的Region分区类似。
- HBase理论参考文章:浅显易懂入门大数据系列:四、HBase(超详细)的 五、HBase的存储结构
- 并且了解行存储与列存储的优缺点。
2. Avro与Parquet
a. Avro是行存储,Parquet是列存储。
b. 还需要清楚的是Avro与Parquet格式都是有Schema的,即结构。类似于我们传统数据库的字段,所以在写的时候需要指定。
0x02 编码实现Avro格式的读写
1. 编码实现读写Avro文件
a. 引入Avro相关jar包
<dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.0</version>
    </dependency>b. 引入Avro的Schema文件,编辑,放于src/main/data目录下,命名为:person.avsc
{"namespace": "com.shaonaiyi.hadoop.filetype.avro",
  "type": "record",
  "name": "Person",
  "fields": [
      {"name": "name", "type": "string"},
      {"name": "age",  "type": ["int", "null"]},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}
  ]
 }我们准备使用此定义好结构的文件生成一个对应的Java实体类,所以这里定义了实体类存放的位置,这里是:com.shaonaiyi.hadoop.filetype.avro

c. 我们准备使用Maven插件工具生成Java类,此处引入插件:
<plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.7.7</version>
        <executions>
            <execution>
                <phase>generate-sources</phase>
                <goals>
                    <goal>schema</goal>
                </goals>
                <configuration>
                    <sourceDirectory>${project.basedir}/src/main/data</sourceDirectory>
                    <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                </configuration>
            </execution>
        </executions>
    </plugin>
 d. 生成Java类(clean->compile)

 执行完,会发现已经生成了一个Person类,可能会报错,我们将@Override注释掉即可,因为之前写过一些代码,所以报错了,不管它。

 e. Person类里面这行就是我们所需要的Schema,对应着我们的person.avsc
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"com.shaonaiyi.hadoop.filetype.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");g. 完整的写Avro文件代码
package com.shaonaiyi.hadoop.filetype.avro;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import java.io.File;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 16:17
 * @Description 编码实现写Avro文件
 */
public class AvroFileWriter {
    public static void main(String[] args) throws IOException {
        GenericRecord record1 = new GenericData.Record(Person.SCHEMA$);
        record1.put("name", "shaonaiyi");
        record1.put("age", 18);
        record1.put("favorite_number", 7);
        record1.put("favorite_color", "red");
        GenericRecord record2 = new GenericData.Record(Person.SCHEMA$);
        record2.put("name", "shaonaier");
        record2.put("age", 17);
        record2.put("favorite_number", 1);
        record2.put("favorite_color", "yellow");
        File file = new File("person.avro");
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(Person.SCHEMA$);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer);
        dataFileWriter.create(Person.SCHEMA$, file);
        dataFileWriter.append(record1);
        dataFileWriter.append(record2);
        dataFileWriter.close();
    }
}h. 完整的读Avro文件代码
package com.shaonaiyi.hadoop.filetype.avro;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import java.io.File;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 16:48
 * @Description 编码实现读Avro文件
 */
public class AvroFileReader {
    public static void main(String[] args) throws IOException {
        File file = new File("person.avro");
        DatumReader<GenericRecord> reader = new GenericDatumReader<>();
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, reader);
        GenericRecord record = null;
        while (dataFileReader.hasNext()) {
            record = dataFileReader.next();
            System.out.println("name:" + record.get("name").toString());
            System.out.println("age:" + record.get("age").toString());
            System.out.println("favorite_number:" + record.get("favorite_number").toString());
            System.out.println("favorite_color:" + record.get("favorite_color"));
            System.out.println("-----------------------------------");
        }
    }
}2. 查看读写Avro文件结果
a. 写Avro文件

b. 读Avro文件

3. 编码实现读写Avro文件(HDFS)
a. 引入所需要的jar包
<dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-mapred</artifactId>
        <version>1.8.0</version>
    </dependency>b. 写Avro文件到HDFS完整代码
package com.shaonaiyi.hadoop.filetype.avro;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 17:15
 * @Description 编码实现写Avro文件到HDFS
 */
public class MRAvroFileWriter {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置job的相关属性
//        job.setOutputKeyClass(NullWritable.class);
//        job.setOutputValueClass(Text.class);
//        job.setOutputFormatClass(TextOutputFormat.class);
        //job.setOutputKeyClass(AvroKey.class);
        //job.setOutputValueClass(Person.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        //AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
        AvroJob.setOutputKeySchema(job, Person.SCHEMA$);
        //3 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro"));
        //FileOutputFormat.setCompressOutput(job, true);
        //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        //4 构建JobContext
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        //5 构建taskContext
        TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.REDUCE, 0, 0);
        TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
        //6 构建OutputFormat实例
        OutputFormat format = job.getOutputFormatClass().newInstance();
        //7 设置OutputCommitter
        OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
        committer.setupJob(jobContext);
        committer.setupTask(hadoopAttemptContext);
        //8 获取writer写数据,写完关闭writer
        RecordWriter<AvroKey, Person> writer = format.getRecordWriter(hadoopAttemptContext);
//        writer.write(null, new Text("shao"));
//        writer.write(null, new Text("nai"));
//        writer.write(null, new Text("yi"));
//        writer.write(null, new Text("bigdata-man"));
        Person person = new Person();
        person.setName("jeffy");
        person.setAge(20);
        person.setFavoriteNumber(10);
        person.setFavoriteColor("red");
        writer.write(new AvroKey(person), null);
        writer.close(hadoopAttemptContext);
        //9 committer提交job和task
        committer.commitTask(hadoopAttemptContext);
        committer.commitJob(jobContext);
    }
}与写Text格式(文章链接跳转:Hadoop支持的文件格式之Text)时类似,主要不同如下:

c. 从HDFS上读Avro文件完整代码
package com.shaonaiyi.hadoop.filetype.avro;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 17:29
 * @Description 编码实现从HDFS上读Avro文件
 */
public class MRAvroFileReader {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置需要读取的文件全路径
        FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro");
        //3 获取读取文件的格式
//        TextInputFormat inputFormat = TextInputFormat.class.newInstance();
        AvroKeyInputFormat inputFormat = AvroKeyInputFormat.class.newInstance();
        //4 获取需要读取文件的数据块的分区信息
        //4.1 获取文件被分成多少数据块了
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        List<InputSplit> inputSplits = inputFormat.getSplits(jobContext);
        //读取每一个数据块的数据
        inputSplits.forEach(new Consumer<InputSplit>() {
            @Override
            public void accept(InputSplit inputSplit) {
                TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0);
                TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
//                RecordReader reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
                RecordReader<AvroKey<Person>, NullWritable> reader = null;
                try {
//                    reader.initialize(inputSplit, hadoopAttemptContext);
//                    System.out.println("<key,value>");
//                    System.out.println("-----------");
//                    while (reader.nextKeyValue()) {
//                        System.out.println("<"+reader.getCurrentKey() + "," + reader.getCurrentValue()+ ">" );
//                    }
                    reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
                    reader.initialize(inputSplit, hadoopAttemptContext);
                    while (reader.nextKeyValue()) {
                        Person person = reader.getCurrentKey().datum();
                        System.out.println("key=>" + person);
                        System.out.println("value=>" + reader.getCurrentValue());
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}与读Text格式(文章链接跳转:Hadoop支持的文件格式之Text)时类似,主要不同如下:

4. 查看读写Avro文件结果(HDFS)
a. 写文件结果

b. 读文件结果,我们在代码里没有设置值

0xFF 总结
- Hadoop支持的文件格式系列:
 Hadoop支持的文件格式之TextHadoop支持的文件格式之AvroHadoop支持的文件格式之ParquetHadoop支持的文件格式之SequenceFile
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | 简书 |
福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。










