0
点赞
收藏
分享

微信扫一扫

【技术干货】代码示例:使用 Apache Flink 连接 TDengine

mjjackey 2022-05-30 阅读 51

小 T 导读:想用 Flink 对接 TDengine?保姆级教程来了。

0、前言

TDengine 是由涛思数据开发并开源的一款高性能、分布式、支持 SQL 的时序数据库(Time-Series Database)。

除了核心的时序数据库功能外,TDengine 还提供缓存、数据订阅、流式计算等大数据平台所需要的系列功能。但是很多小伙伴出于架构的考虑,还是需要将数据导出到 Apache Flink、Apache Spark 等平台进行计算分析。

为了帮助大家对接,我们特别推出了保姆级课程,包学包会。
4.png

1、技术实现

Apache Flink 提供了 SourceFunction 和 SinkFunction,用来提供 Flink 和外部数据源的连接,其中 SouceFunction 为从数据源读取数据,SinkFunction 为将数据写入数据源。 与此同时,Flink 提供了 RichSourceFunction 和 RichSinkFunction 这两个类(继承自AbstractRichFunction),提供了额外的初始化(open(Configuration))和销毁方法(close())。 通过重写这两个方法,可以避免每次读写数据时都重新建立连接。

2、代码实现

完整源码:https://github.com/liuyq-617/TD-Flink
代码逻辑:

1) 自定义类 SourceFromTDengine

用途:数据源连接,数据读取
```package com.taosdata.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.taosdata.model.Sensor;
import java.sql.*;
import java.util.Properties;
public class SourceFromTDengine extends RichSourceFunction<Sensor> {
Statement statement;
private Connection connection;
private String property;
public SourceFromTDengine(){
super();
}

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    String driver = "com.taosdata.jdbc.rs.RestfulDriver";
    String host = "u05";
    String username = "root";
    String password = "taosdata";
    String prop = System.getProperty("java.library.path");
    Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class);
    LOG.info("java.library.path:{}", prop);
    System.out.println(prop);
    Class.forName( driver );
    Properties properties = new Properties();
    connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
            , properties);
    statement = connection.createStatement();
}
@Override
public void close() throws Exception {
    super.close();
    if (connection != null) {
        connection.close();
    }
    if (statement != null) {
        statement.close();
    }
}
@Override
public void run(SourceContext<Sensor> sourceContext) throws Exception {
    try {
        String sql = "select * from tt.meters";
        ResultSet resultSet = statement.executeQuery(sql);
        while (resultSet.next()) {
            Sensor sensor = new Sensor( resultSet.getLong(1),
                    resultSet.getInt( "vol" ),
                    resultSet.getFloat( "current" ),
                    resultSet.getString( "location" ).trim());
            sourceContext.collect( sensor );
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
@Override
public void cancel() {
}

}

### 2) 自定义类 SinkToTDengine
用途:数据源连接,数据写入

SinkToTDengine
```package com.taosdata.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.taosdata.model.Sensor;
import java.sql.*;
import java.util.Properties;
public class SinkToTDengine extends RichSinkFunction<Sensor> {
    Statement statement;
    private Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        String driver = "com.taosdata.jdbc.rs.RestfulDriver";
        String host = "TAOS-FQDN";
        String username = "root";
        String password = "taosdata";
        String prop = System.getProperty("java.library.path");
        System.out.println(prop);
        Class.forName( driver );
        Properties properties = new Properties();
        connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
                , properties);
        statement = connection.createStatement();

    }
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (statement != null) {
            statement.close();
        }
    }
    @Override
    public void invoke(Sensor sensor, Context context) throws Exception {
        try {
            String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)", 
                                sensor.getLocation(),
                                sensor.getLocation(),
                                sensor.getTs(),
                                sensor.getVal(),
                                sensor.getCurrent()
                                );
            statement.executeUpdate(sql);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3) 自定义类 Sensor

用途:定义数据结构,用来接受数据
```package com.taosdata.model;
public class Sensor {
public long ts;
public int val;
public float current;
public String location;
public Sensor() {
}
public Sensor(long ts, int val, float current, String location) {
this.ts = ts;
this.val = val;
this.current = current;
this.location = location;
}
public long getTs() {
return ts;
}
public void setTs(long ts) {
this.ts = ts;
}
public int getVal() {
return val;
}
public void setVal(int val) {
this.val = val;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;}
@Override
br/>}
@Override
return "Sensor{" +
"ts=" + ts +
", val=" + val +
", current=" + current +
", location='" + location + '\'' +
'}';
}
}

### 4) 主程序类 ReadFromTDengine
用途:调用 Flink 进行读取和写入数据
```package com.taosdata;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.taosdata.model.Sensor;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
public class ReadFromTDengine {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Sensor> SensorList = env.addSource( new com.taosdata.flink.SourceFromTDengine() );
        SensorList.print();
        SensorList.addSink( new com.taosdata.flink.SinkToTDengine() );       
        env.execute();
    }
}

3、简单测试 RESTful 接口

1) 环境准备:

a) Flink 安装&启动:

  • wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
  • tar zxf flink-1.14.3-bin-scala_2.12.tgz -C /usr/local
  • /usr/local/flink-1.14.3/bin/start-cluster.sh

b) TDengine Database 环境准备:
创建原始数据:

  • create database tt;
  • create table meters (ts TIMESTAMP,vol INT,current FLOAT) TAGS (location BINARY(20));
  • insert into beijing using meters tags(‘beijing’) values(now,220,30.2);

创建目标数据库表:

  • create database sinktest;
  • create table meters (ts TIMESTAMP,vol INT,current FLOAT) TAGS (location BINARY(20));

2) 打包编译:

源码位置: https://github.com/liuyq-617/TD-Flink

mvn clean package

3) 程序启动:

flink run target/test-flink-1.0-SNAPSHOT-dist.jar

读取数据

  • vi log/flink-root-taskexecutor-0-xxxxx.out
  • 查看到数据打印:Sensor{ts=1645166073101, val=220, current=5.7, location=’beijing’}

写入数据

  • show sinktest.tables;
  • 已经创建了beijing 子表
  • select * from sinktest.beijing;
  • 可以查询到刚插入的数据

4、使用 JNI 方式

举一反三的小伙伴此时已经猜到,只要把 JDBC URL 修改一下就可以了。

但是 Flink 每次分派作业时都在使用一个新的 ClassLoader,而我们在计算节点上就会得到“Native library already loaded in another classloader”错误。

为了避免此问题,可以将 JDBC 的 jar 包放到 Flink 的 lib 目录下,不去调用 dist 包就可以了。

  • cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib
  • flink run target/test-flink-1.0-SNAPSHOT.jar

5、小结

通过在项目中引入 SourceFromTDengine 和 SinkToTDengine 两个类,即可完成在 Flink 中对 TDengine 的读写操作。后面我们会有文章介绍 Spark 和 TDengine 的对接。

举报

相关推荐

0 条评论