Skip to main content

数据订阅

为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。

与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。

消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。

为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。

下面为关于数据订阅的一些说明,需要对TDengine的架构有一些了解,结合各个语言链接器的接口使用。(可使用时再了解)

  • 一个消费组消费同一个topic下的所有数据,不同消费组之间相互独立;
  • 一个消费组消费同一个topic所有的vgroup,消费组可由多个消费者组成,但一个vgroup仅被一个消费者消费,如果消费者数量超过了vgroup数量,多余的消费者不消费数据;
  • 在服务端每个vgroup仅保存一个offset,每个vgroup的offset是单调递增的,但不一定连续。各个vgroup的offset之间没有关联;
  • 每次poll服务端会返回一个结果block,该block属于一个vgroup,可能包含多个wal版本的数据,可以通过 offset 接口获得是该block第一条记录的offset;
  • 一个消费组如果从未commit过offset,当其成员消费者重启重新拉取数据时,均从参数auto.offset.reset设定值开始消费;在一个消费者生命周期中,客户端本地记录了最近一次拉取数据的offset,不会拉取重复数据;
  • 消费者如果异常终止(没有调用tmq_close),需等约12秒后触发其所属消费组rebalance,该消费者在服务端状态变为LOST,约1天后该消费者自动被删除;正常退出,退出后就会删除消费者;新增消费者,需等约2秒触发rebalance,该消费者在服务端状态变为ready;
  • 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配,消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作;
  • 消费者可利用 position 获得当前消费的offset,并seek到指定offset,重新消费;
  • seek将position指向指定offset,不执行commit操作,一旦seek成功,可poll拉取指定offset及以后的数据;
  • seek 操作之前须调用 assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法,如非法将报错;
  • position是获取当前的消费位置,是下次要取的位置,不是当前消费到的位置
  • commit是提交消费位置,不带参数的话,是提交当前消费位置(下次要取的位置,不是当前消费到的位置),带参数的话,是提交参数里的位置(也即下次退出重启后要取的位置)
  • seek是设置consumer消费位置,seek到哪,position就返回哪,都是下次要取的位置
  • seek不会影响commit,commit不影响seek,相互独立,两个是不同的概念
  • begin接口为wal 第一条数据的offset,end 接口为wal 最后一条数据的offset + 1
  • offset接口获取的是记录所在结果block块里的第一条数据的offset,当seek至该offset时,将消费到这个block里的全部数据。参见第四点;
  • 由于存在 WAL 过期删除机制,即使seek 操作成功,poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号,将会从WAL最小版本号消费;
  • 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 WAL_RETENTION_PERIODWAL_RETENTION_SIZE ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似;

本文档不对消息队列本身的知识做更多的介绍,如果需要了解,请自行搜索。

说明: 从3.2.0.0版本开始,数据订阅支持vnode迁移和分裂。 由于数据订阅依赖wal文件,而在vnode迁移和分裂的过程中,wal并不会同步过去,所以迁移或分裂后,之前没消费完的wal数据后消费不到。所以请保证之前把数据全部消费完后,再进行vnode迁移或分裂,否则,消费会丢失数据。

主要数据结构和 API

不同语言下, TMQ 订阅相关的 API 及数据结构如下(注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer):

void subscribe(Collection<String> topics) throws SQLException;

void unsubscribe() throws SQLException;

Set<String> subscription() throws SQLException;

ConsumerRecords<V> poll(Duration timeout) throws SQLException;

Set<TopicPartition> assignment() throws SQLException;
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;

void seek(TopicPartition partition, long offset) throws SQLException;
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;

void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;

void close() throws SQLException;

写入数据

首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:

DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');

创建 topic

TDengine 使用 SQL 创建一个 topic:

CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
  • topic创建个数有上限,通过参数 tmqMaxTopicNum 控制,默认 20 个

TMQ 支持多种订阅类型:

列订阅

语法:

CREATE TOPIC topic_name as subquery

通过 SELECT 语句订阅(包括 SELECT *,或 SELECT ts, c1 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:

  • 该类型 TOPIC 一旦创建则订阅数据的结构确定。
  • 被订阅或用于计算的列或标签不可被删除(ALTER table DROP)、修改(ALTER table MODIFY)。
  • 若发生表结构变更,新增的列不出现在结果中。

超级表订阅

语法:

CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]

SELECT * from stbName 订阅的区别是:

  • 不会限制用户的表结构变更。
  • 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
  • with meta 参数可选,选择时将返回创建超级表,子表等语句,主要用于taosx做超级表迁移
  • where_condition 参数可选,选择时将用来过滤符合条件的子表,订阅这些子表。where 条件里不能有普通列,只能是tag或tbname,where条件里可以用函数,用来过滤tag,但是不能是聚合函数,因为子表tag值无法做聚合。也可以是常量表达式,比如 2 > 1(订阅全部子表),或者 false(订阅0个子表)
  • 返回数据不包含标签。

数据库订阅

语法:

CREATE TOPIC topic_name [with meta] AS DATABASE db_name;

通过该语句可创建一个包含数据库所有表数据的订阅

  • with meta 参数可选,选择时将返回创建数据库里所有超级表,子表的语句,主要用于taosx做数据库迁移

创建消费者 consumer

消费者需要通过一系列配置选项创建,基础配置项如下表所示:

参数名称类型参数说明备注
td.connect.ipstring服务端的 IP 地址
td.connect.userstring用户名
td.connect.passstring密码
td.connect.portinteger服务端的端口号
group.idstring消费组 ID,同一消费组共享消费进度
必填项。最大长度:192。
每个topic最多可建立100个 consumer group
client.idstring客户端 ID最大长度:192。
auto.offset.resetenum消费组订阅的初始位置
earliest: default(version < 3.2.0.0);从头开始订阅;
latest: default(version >= 3.2.0.0);仅从最新数据开始订阅;
none: 没有提交的 offset 无法订阅
enable.auto.commitboolean是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit默认值为 true
auto.commit.interval.msinteger消费记录自动提交消费位点时间间隔,单位为毫秒默认值为 5000
msg.with.table.nameboolean是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true)默认关闭
enable.replayboolean是否开启数据回放功能默认关闭

对于不同编程语言,其设置方式如下:

对于 Java 程序,还可以使用如下配置项:

参数名称类型参数说明
td.connect.typestring连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni"
bootstrap.serversstring连接地址,如 localhost:6030
value.deserializerstring值解析方法,使用此方法应实现 com.taosdata.jdbc.tmq.Deserializer 接口或继承 com.taosdata.jdbc.tmq.ReferenceDeserializer
value.deserializer.encodingstring指定字符串解析的字符集

需要注意:此处使用 bootstrap.servers 替代 td.connect.iptd.connect.port,以提供与 Kafka 一致的接口。

Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");

TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);

/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}

上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。

数据回放功能说明:

  • 订阅增加 replay 功能,按照数据写入的时间回放。 比如,如下时间写入三条数据
      2023/09/22 00:00:00.000
    2023/09/22 00:00:05.000
    2023/09/22 00:00:08.000
    则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。
  • 仅列订阅支持数据回放
    • 回放需要保证独立时间线
    • 如果是子表订阅或者普通表订阅,只有一个vnode上有数据,保证是一个时间线
    • 如果超级表订阅,则需保证该 DB 只有一个vnode,否则报错(因为多个vnode上订阅出的数据不在一个时间线上)
  • 超级表和库订阅不支持回放
  • 增加 enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。
  • 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭
  • 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差

订阅 topics

一个 consumer 支持同时订阅多个 topic。

List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);

消费

以下代码展示了不同语言下如何对 TMQ 消息进行消费。

while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
processMsg(meter);
}
}

结束消费

消费结束后,应当取消订阅。

/* 取消订阅 */
consumer.unsubscribe();

/* 关闭消费 */
consumer.close();

删除 topic

如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。

/* 删除 topic */
DROP TOPIC topic_name;

状态查看

1、topics:查询已经创建的 topic

SHOW TOPICS;

2、consumers:查询 consumer 的状态及其订阅的 topic

SHOW CONSUMERS;

3、subscriptions:查询 consumer 与 vgroup 之间的分配关系

SHOW SUBSCRIPTIONS;

示例代码

以下是各语言的完整示例代码。

package com.taos.example;

import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;

public class SubscribeDemo {
private static final String TOPIC = "tmq_topic";
private static final String DB_NAME = "meters";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);

public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
Connection connection = DriverManager.getConnection(jdbcUrl);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME + " wal_retention_period 3600");
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}

// create consumer
Properties properties = new Properties();
properties.getProperty(TMQConstants.CONNECT_TYPE, "jni");
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
properties.setProperty(TMQConstants.GROUP_ID, "test1");
properties.setProperty(TMQConstants.CLIENT_ID, "1");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");

// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Meters> r : meters) {
Meters meter = r.value();
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}

查看源码

package com.taos.example;

import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}

查看源码

package com.taos.example;

import java.sql.Timestamp;

public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}

public float getCurrent() {
return current;
}

public void setCurrent(float current) {
this.current = current;
}

public int getVoltage() {
return voltage;
}

public void setVoltage(int voltage) {
this.voltage = voltage;
}

public int getGroupid() {
return groupid;
}

public void setGroupid(int groupid) {
this.groupid = groupid;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}



[查看源码](https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/Meters.java)