Skip to main content

数据订阅

为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。

数据订阅介绍

主题

与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 SELECT 语句,具体的语法参见 CREATE TOPIC。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。

如下图,每个 topic 涉及到的数据表可能分布在多个 vnode(相当于 kafka 里的 partition) 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中,WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。

img_5.png

TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 CREATE DATABASE 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。

对于 SELECT 语句形式的 topic,在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。

生产者

写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。

消费者

消费者组

消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode)。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。

为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode;有 3 个消费者的话,2 个消费者各消费 1 个 vnode,1 个消费者消费 2 个 vnode;有 5 个消费者的话,4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图:

img_6.png

在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。

一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。

消费进度

在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset),消费进度可以手动提交,也可以通过参数(auto.commit.interval.ms)设置为周期性自动提交。

首次消费数据时通过订阅参数(auto.offset.reset)来确定消费位置为最新数据(latest)还是最旧数据(earliest)。

消费进度在一个 vnode 上对于同一个 topic 和 消费者组是唯一的。所以如果同一个 topic 和 消费者组在一个 vnode 上的消费者退出了,并且提交了消费进度。然后同一个 topic 和 消费者组里重新建了一个新的消费者消费这个 vnode,那么这个新消费者将继承之前的消费进度继续消费。

如果之前的消费者没有提交消费进度,那个新的消费者将根据订阅参数(auto.offset.reset)设置的值来确定起始消费位置。

不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。

img_7.png

作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。

说明

从3.2.0.0版本开始,数据订阅支持vnode迁移和分裂。

由于数据订阅依赖wal文件,而在vnode迁移和分裂的过程中,wal并不会同步过去,所以迁移或分裂后,之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后,再进行vnode迁移或分裂,否则,消费会丢失数据。

数据订阅语法说明

具体的语法参见 数据订阅

数据订阅相关参数

消费参数主要用于消费者创建时指定,基础配置项如下表所示:

参数名称类型参数说明备注
td.connect.ipstring服务端的 IP 地址
td.connect.userstring用户名
td.connect.passstring密码
td.connect.portinteger服务端的端口号
group.idstring消费组 ID,同一消费组共享消费进度
必填项。最大长度:192。
每个topic最多可建立100个 consumer group
client.idstring客户端 ID最大长度:192。
auto.offset.resetenum消费组订阅的初始位置
earliest: default(version < 3.2.0.0);从头开始订阅;
latest: default(version >= 3.2.0.0);仅从最新数据开始订阅;
none: 没有提交的 offset 无法订阅
enable.auto.commitboolean是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit默认值为 true
auto.commit.interval.msinteger消费记录自动提交消费位点时间间隔,单位为毫秒默认值为 5000
msg.with.table.nameboolean是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true)默认关闭
enable.replayboolean是否开启数据回放功能默认关闭

数据订阅主要 API 接口

不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节数据订阅部分,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer):

void subscribe(Collection<String> topics) throws SQLException;

void unsubscribe() throws SQLException;

Set<String> subscription() throws SQLException;

ConsumerRecords<V> poll(Duration timeout) throws SQLException;

Set<TopicPartition> assignment() throws SQLException;
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;

void seek(TopicPartition partition, long offset) throws SQLException;
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;

void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;

void close() throws SQLException;

数据订阅示例

写入数据

首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:

DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');

创建 topic

使用 SQL 创建一个 topic:

CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;

创建消费者 consumer

对于不同编程语言,其设置方式如下:

对于 Java 程序,还可以使用如下配置项:

参数名称类型参数说明
td.connect.typestring连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni"
bootstrap.serversstring连接地址,如 localhost:6030
value.deserializerstring值解析方法,使用此方法应实现 com.taosdata.jdbc.tmq.Deserializer 接口或继承 com.taosdata.jdbc.tmq.ReferenceDeserializer
value.deserializer.encodingstring指定字符串解析的字符集

需要注意:此处使用 bootstrap.servers 替代 td.connect.iptd.connect.port,以提供与 Kafka 一致的接口。

Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");

TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);

/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}

上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。

订阅 topics

一个 consumer 支持同时订阅多个 topic。

List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);

消费

以下代码展示了不同语言下如何对 TMQ 消息进行消费。

while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
processMsg(meter);
}
}

结束消费

消费结束后,应当取消订阅。

/* 取消订阅 */
consumer.unsubscribe();

/* 关闭消费 */
consumer.close();

完整示例代码

以下是各语言的完整示例代码。

package com.taos.example;

import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;

public class SubscribeDemo {
private static final String TOPIC = "tmq_topic";
private static final String DB_NAME = "meters";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);

public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
Connection connection = DriverManager.getConnection(jdbcUrl);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME + " wal_retention_period 3600");
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}

// create consumer
Properties properties = new Properties();
properties.getProperty(TMQConstants.CONNECT_TYPE, "jni");
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
properties.setProperty(TMQConstants.GROUP_ID, "test1");
properties.setProperty(TMQConstants.CLIENT_ID, "1");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");

// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Meters> r : meters) {
Meters meter = r.value();
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}

查看源码

package com.taos.example;

import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}

查看源码

package com.taos.example;

import java.sql.Timestamp;

public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}

public float getCurrent() {
return current;
}

public void setCurrent(float current) {
this.current = current;
}

public int getVoltage() {
return voltage;
}

public void setVoltage(int voltage) {
this.voltage = voltage;
}

public int getGroupid() {
return groupid;
}

public void setGroupid(int groupid) {
this.groupid = groupid;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}

查看源码

数据订阅高级功能

数据回放

  • 订阅支持 replay 功能,按照数据写入的时间回放。 比如,如下时间写入三条数据
          2023/09/22 00:00:00.000
    2023/09/22 00:00:05.000
    2023/09/22 00:00:08.000
    则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。
  • 仅查询订阅支持数据回放
    • 回放需要保证独立时间线
    • 如果是子表订阅或者普通表订阅,只有一个vnode上有数据,保证是一个时间线
    • 如果超级表订阅,则需保证该 DB 只有一个vnode,否则报错(因为多个vnode上订阅出的数据不在一个时间线上)
  • 超级表和库订阅不支持回放
  • enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。
  • 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭
  • 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差