0
点赞
收藏
分享

微信扫一扫

Apache IoTDB开发系统整合之MapReduce TsFile

Xin_So 2023-09-04 阅读 38

TsFile-Hadoop-Connector User Guide

关于 TsFile-Hadoop-Connector

TsFile-Hadoop-Connector 实现了 Hadoop 对 Tsfile 类型的外部数据源的支持。这使用户能够通过Hadoop读取,写入和查询Tsfile。

使用此连接器,咱们就可以:

  • 将单个 TsFile 从本地文件系统或 hdfs 加载到 Hadoop 中
  • 将特定目录中的所有文件(从本地文件系统或HDFS加载到Hadoop中)
  • 将数据从 Hadoop 写入 TsFile

系统要求

Hadoop Version

Java Version

TsFile Version

2.7.3

1.8

0.10.0

数据类型对应

TsFile data type

Hadoop writable

BOOLEAN

BooleanWritable

INT32

IntWritable

INT64

LongWritable

FLOAT

FloatWritable

DOUBLE

DoubleWritable

TEXT

Text

TSFInput格式说明

TSFInputFormat 从 tsfile 中提取数据,并将其格式化为 .MapWritable

假设我们要提取名为该设备的数据,该设备具有三个名为 、、 的传感器。d1s1s2s3

s1的类型为 ,的类型为 ,的类型为 。BOOLEANs2DOUBLEs3TEXT

结构将如下所示:MapWritable

  1. {
  2. "time_stamp": 10000000,
  3. "device_id": d1,
  4. "s1": true,
  5. "s2": 3.14,
  6. "s3": "middle"
  7. }

在 Hadoop 的 Map 作业中,你可以按键获取任何你想要的值,如下所示:

mapwritable.get(new Text("s1"))

注意:中的所有键的类型均为 。MapWritableText

阅读示例:计算总和

首先,我们应该告诉 InputFormat 我们想要从 tsfile 获得什么样的数据。

  1. // configure reading time enable
  2. TSFInputFormat.setReadTime(job, true);
  3. // configure reading deviceId enable
  4. TSFInputFormat.setReadDeviceId(job, true);
  5. // configure reading which deltaObjectIds
  6. String[] deviceIds = {"device_1"};
  7. TSFInputFormat.setReadDeviceIds(job, deltaObjectIds);
  8. // configure reading which measurementIds
  9. String[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};
  10. TSFInputFormat.setReadMeasurementIds(job, measurementIds);

然后,应指定映射器和化简器的输出键和值

  1. // set inputformat and outputformat
  2. job.setInputFormatClass(TSFInputFormat.class);
  3. // set mapper output key and value
  4. job.setMapOutputKeyClass(Text.class);
  5. job.setMapOutputValueClass(DoubleWritable.class);
  6. // set reducer output key and value
  7. job.setOutputKeyClass(Text.class);
  8. job.setOutputValueClass(DoubleWritable.class);

然后,和类是你如何处理类产生的。mapperreducerMapWritableTSFInputFormat

  1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, DoubleWritable> {

  2. @Override
  3. protected void map(NullWritable key, MapWritable value,
  4. Mapper<NullWritable, MapWritable, Text, DoubleWritable>.Context context)
  5. throws IOException, InterruptedException {

  6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
  7. context.write(deltaObjectId, (DoubleWritable) value.get(new Text("sensor_3")));
  8. }
  9. }

  10. public static class TSReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

  11. @Override
  12. protected void reduce(Text key, Iterable<DoubleWritable> values,
  13. Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
  14. throws IOException, InterruptedException {

  15. double sum = 0;
  16. for (DoubleWritable value : values) {
  17. sum = sum + value.get();
  18. }
  19. context.write(key, new DoubleWritable(sum));
  20. }
  21. }

注意:有关完整的代码,请参阅以下链接:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb//hadoop/tsfile/TSFMRReadExample.java

写入示例:将平均值写入 Tsfile

除了 ,Hadoop-map-reduce 作业的其余配置代码几乎与上面相同。OutputFormatClass

  1. job.setOutputFormatClass(TSFOutputFormat.class);
  2. // set reducer output key and value
  3. job.setOutputKeyClass(NullWritable.class);
  4. job.setOutputValueClass(HDFSTSRecord.class);

然后,和类是你如何处理类产生的。mapperreducerMapWritableTSFInputFormat

  1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, MapWritable> {
  2. @Override
  3. protected void map(NullWritable key, MapWritable value,
  4. Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)
  5. throws IOException, InterruptedException {

  6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
  7. long timestamp = ((LongWritable)value.get(new Text("timestamp"))).get();
  8. if (timestamp % 100000 == 0) {
  9. context.write(deltaObjectId, new MapWritable(value));
  10. }
  11. }
  12. }

  13. /**
  14. * This reducer calculate the average value.
  15. */
  16. public static class TSReducer extends Reducer<Text, MapWritable, NullWritable, HDFSTSRecord> {

  17. @Override
  18. protected void reduce(Text key, Iterable<MapWritable> values,
  19. Reducer<Text, MapWritable, NullWritable, HDFSTSRecord>.Context context) throws IOException, InterruptedException {
  20. long sensor1_value_sum = 0;
  21. long sensor2_value_sum = 0;
  22. double sensor3_value_sum = 0;
  23. long num = 0;
  24. for (MapWritable value : values) {
  25. num++;
  26. sensor1_value_sum += ((LongWritable)value.get(new Text("sensor_1"))).get();
  27. sensor2_value_sum += ((LongWritable)value.get(new Text("sensor_2"))).get();
  28. sensor3_value_sum += ((DoubleWritable)value.get(new Text("sensor_3"))).get();
  29. }
  30. HDFSTSRecord tsRecord = new HDFSTSRecord(1L, key.toString());
  31. DataPoint dPoint1 = new LongDataPoint("sensor_1", sensor1_value_sum / num);
  32. DataPoint dPoint2 = new LongDataPoint("sensor_2", sensor2_value_sum / num);
  33. DataPoint dPoint3 = new DoubleDataPoint("sensor_3", sensor3_value_sum / num);
  34. tsRecord.addTuple(dPoint1);
  35. tsRecord.addTuple(dPoint2);
  36. tsRecord.addTuple(dPoint3);
  37. context.write(NullWritable.get(), tsRecord);
  38. }
  39. }

注意:有关完整的代码,请参阅以下链接:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb//hadoop/tsfile/TSMRWriteExample.java



举报

相关推荐

0 条评论