0
点赞
收藏
分享

微信扫一扫

TDengine 中的数据订阅是如何实现的

小 T 导读:为方便业务场景,简化数据处理架构,TDengine 提供了数据订阅功能。本文将详细介绍这一功能如何使用。






一提到数据订阅,大家肯定第一时间就会想到 Apache Kafka。所以在使用时序数据库(Time Serie s Database,简称 TSDB)产品时,如果需要数据订阅功能,往往需要引入 Kafka,来搭建一套数据处理架构。






为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序来处理数据,TDengine 提供了类似消息队列产品的数据订阅和消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统就不再需要集成像 Kafka 这样的消息队列产品了,从而简化系统设计的复杂度,降低运营维护成本。






接下来,我们看看如何使用 TDengine 中的数据订阅功能。





















基本原理


















和 Kafka 一样,我们也需要定义 topic,但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 ​​SELECT​​ 语句。






用户可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及使用标量函数或自定义函数对数据进行计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是由应用完成,可以有效减少传输的数据量,并降低应用的复杂度。






消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group),一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但是,不同消费者组中的消费者,即使消费同一个 topic,也并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。


TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。


TDengine 利用 WAL (Write-Ahead-Log)  技术来提供基本的数据可靠性。从内部设计上讲,为了实现订阅功能,TDengine 会为 WAL 文件自动创建索引,以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小。通过以上方式,我们将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 的压缩率要远高于 WAL,为节省存储空间,我们不推荐保留太长时间,一般来说,不超过几天)。



对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。


代码示例




以 Java 语言为例,TDengine 提供了下列 API。



void subscribe(Collection<String> topics) throws SQLException;
void unsubscribe() throws SQLException;
Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitSync() throws SQLException;
void close() throws SQLException;



我们看一个完整的例子。



package com.taos.example;


import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;


public class SubscribeDemo {
private static final String TOPIC = "tmq_topic";
private static final String DB_NAME = "meters";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);


public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// 连接数据库
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
Connection connection = DriverManager.getConnection(jdbcUrl);
try (Statement statement = connection.createStatement()) {
// 在 TDengine 中创建 database 和超级表、子表,并插入示例数据
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME);
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// 创建 topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}


// 创建消费者
Properties properties = new Properties();
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, "test");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");


// 轮询数据
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}






逻辑是不是非常清晰和简单?复制示例代码,稍作修改,就可以为自己的业务实现数据订阅功能了。赶快下载 TDengine 3.0 并体验吧。

TDengine 中的数据订阅是如何实现的_数据


举报

相关推荐

0 条评论