数据订阅
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT
语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。
为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
下面为关于数据订阅的一些说明,需要对TDengine的架构有一些了解,结合各个语言链接器的接口使用。(可使用时再了解)
- 一个消费组消费同一个topic下的所有数据,不同消费组之间相互独立;
- 一个消费组消费同一个topic所有的vgroup,消费组可由多个消费者组成,但一个vgroup仅被一个消费者消费,如果消费者数量超过了vgroup数量,多余的消费者不消费数据;
- 在服务端每个vgroup仅保存一个offset,每个vgroup的offset是单调递增的,但不一定连续。各个vgroup的offset之间没有关联;
- 每次poll服务端会返回一个结果block,该block属于一个vgroup,可能包含多个wal版本的数据,可以通过 offset 接口获得是该block第一条记录的offset;
- 一个消费组如果从未commit过offset,当其成员消费者重启重新拉取数据时,均从参数auto.offset.reset设定值开始消费;在一个消费者生命周期中,客户端本地记录了最近一次拉取数据的offset,不会拉取重复数据;
- 消费者如果异常终止(没有调用tmq_close),需等约12秒后触发其所属消费组rebalance,该消费者在服务端状态变为LOST,约1天后该消费者自动被删除;正常退出,退出后就会删除消费者;新增消费者,需等约2秒触发rebalance,该消费者在服务端状态变为ready;
- 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配,消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作;
- 消费者可利用 position 获得当前消费的offset,并seek到指定offset,重新消费;
- seek将position指向指定offset,不执行commit操作,一旦seek成功,可poll拉取指定offset及以后的数据;
- seek 操作之前须调用 assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法,如非法将报错;
- position是获取当前的消费位置,是下次要取的位置,不是当前消费到的位置
- commit是提交消费位置,不带参数的话,是提交当前消费位置(下次要取的位置,不是当前消费到的位置),带参数的话,是提交参数里的位置(也即下次退出重启后要取的位置)
- seek是设置consumer消费位置,seek到哪,position就返回哪,都是下次要取的位置
- seek不会影响commit,commit不影响seek,相互独立,两个是不同的概念
- begin接口为wal 第一条数据的offset,end 接口为wal 最后一条数据的offset + 1
- offset接口获取的是记录所在结果block块里的第一条数据的offset,当seek至该offset时,将消费到这个block里的全部数据。参见第四点;
- 由于存在 WAL 过期删除机制,即使seek 操作成功,poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号,将会从WAL最小版本号消费;
- 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置
WAL_RETENTION_PERIOD
或WAL_RETENTION_SIZE
,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似;
本文档不对消息队列本身的知识做更多的介绍,如果需要了解,请自行搜索。
说明: 从3.2.0.0版本开始,数据订阅支持vnode迁移和分裂。 由于数据订阅依赖wal文件,而在vnode迁移和分裂的过程中,wal并不会同步过去,所以迁移或分裂后,之前没消费完的wal数据后消费不到。所以请保证之前把数据全部消费完后,再进行vnode迁移或分裂,否则,消费会丢失数据。
主要数据结构和 API
不同语言下, TMQ 订阅相关的 API 及数据结构如下(注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer):
- C
- Java
- Python
- Go
- Rust
- Node.JS
- C#
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
typedef struct tmq_topic_assignment {
int32_t vgId;
int64_t currentOffset;
int64_t begin;
int64_t end;
} tmq_topic_assignment;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code);
下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
void subscribe(Collection<String> topics) throws SQLException;
void unsubscribe() throws SQLException;
Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
Set<TopicPartition> assignment() throws SQLException;
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
void seek(TopicPartition partition, long offset) throws SQLException;
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;
void close() throws SQLException;
class Consumer:
def subscribe(self, topics):
pass
def unsubscribe(self):
pass
def poll(self, timeout: float = 1.0):
pass
def assignment(self):
pass
def seek(self, partition):
pass
def close(self):
pass
def commit(self, message):
pass
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
// 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
// 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
func (c *Consumer) Poll(timeoutMs int) tmq.Event
// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error
func (c *Consumer) Close() error
impl TBuilder for TmqBuilder
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
fn build(&self) -> Result<Self::Target, Self::Error>
impl AsAsyncConsumer for Consumer
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> Result<(), Self::Error>;
fn stream(
&self,
) -> Pin<
Box<
dyn '_
+ Send
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
>,
>,
>;
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
async fn unsubscribe(self);
可在 https://docs.rs/taos 上查看详细 API 说明。
function TMQConsumer(config)
function subscribe(topic)
function consume(timeout)
function subscription()
function unsubscribe()
function commit(msg)
function close()
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
virtual IConsumer Build()
Consumer(ConsumerBuilder builder)
void Subscribe(IEnumerable<string> topics)
void Subscribe(string topic)
ConsumeResult Consume(int millisecondsTimeout)
List<string> Subscription()
void Unsubscribe()
void Commit(ConsumeResult consumerResult)
void Close()
写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
创建 topic
TDengine 使用 SQL 创建一个 topic:
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
- topic创建个数有上限,通过参数 tmqMaxTopicNum 控制,默认 20 个
TMQ 支持多种订阅类型:
列订阅
语法:
CREATE TOPIC topic_name as subquery
通过 SELECT
语句订阅(包括 SELECT *
,或 SELECT ts, c1
等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
- 被订阅或用于计算的列或标签不可被删除(
ALTER table DROP
)、修改(ALTER table MODIFY
)。 - 若发生表结构变更,新增的列不出现在结果中。
超级表订阅
语法:
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
与 SELECT * from stbName
订阅的区别是:
- 不会限制用户的表结构变更。
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
- with meta 参数可选,选择时将返回创建超级表,子表等语句,主要用于taosx做超级表迁移
- where_condition 参数可选,选择时将用来过滤符合条件的子表,订阅这些子表。where 条件里不能有普通列,只能是tag或tbname,where条件里可以用函数,用来过滤tag,但是不能是聚合函数,因为子表tag值无法做聚合。也可以是常量表达式,比如 2 > 1(订阅全部子表),或者 false(订阅0个子表)
- 返回数据不包含标签。
数据库订阅
语法:
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
通过该语句可创建一个包含数据库所有表数据的订阅
- with meta 参数可选,选择时将返回创建数据库里所有超级表,子表的语句,主要用于taosx做数据库迁移
创建消费者 consumer
消费者需要通过一系列配置选项创建,基础配置项如下表所示:
参数名称 | 类型 | 参数说明 | 备注 |
---|---|---|---|
td.connect.ip | string | 服务端的 IP 地址 | |
td.connect.user | string | 用户名 | |
td.connect.pass | string | 密码 | |
td.connect.port | integer | 服务端的端口号 | |
group.id | string | 消费组 ID,同一消费组共享消费进度 | 必填项。最大长度:192。 每个topic最多可建立100个 consumer group |
client.id | string | 客户端 ID | 最大长度:192。 |
auto.offset.reset | enum | 消费组订阅的初始位置 | earliest : default(version < 3.2.0.0);从头开始订阅; latest : default(version >= 3.2.0.0);仅从最新数据开始订阅; none : 没有提交的 offset 无法订阅 |
enable.auto.commit | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
auto.commit.interval.ms | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
msg.with.table.name | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) | 默认关闭 |
enable.replay | boolean | 是否开启数据回放功能 | 默认关闭 |
对于不同编程语言,其设置方式如下:
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
对于 Java 程序,还可以使用如下配置项:
参数名称 | 类型 | 参数说明 |
---|---|---|
td.connect.type | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
bootstrap.servers | string | 连接地址,如 localhost:6030 |
value.deserializer | string | 值解析方法,使用此方法应实现 com.taosdata.jdbc.tmq.Deserializer 接口或继承 com.taosdata.jdbc.tmq.ReferenceDeserializer 类 |
value.deserializer.encoding | string | 指定字符串解析的字符集 |
需要注意:此处使用 bootstrap.servers
替代 td.connect.ip
和 td.connect.port
,以提供与 Kafka 一致的接口。
Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);
/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
conf := &tmq.ConfigMap{
"group.id": "test",
"auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
}
consumer, err := NewConsumer(conf)
let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1");
dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "latest");
let tmq = TmqBuilder::from_dsn(dsn)?;
let mut consumer = tmq.build()?;
Python 语言下引入 taos
库的 Consumer
类,创建一个 Consumer 示例:
from taos.tmq import Consumer
# Syntax: `consumer = Consumer(configs)`
#
# Example:
consumer = Consumer(
{
"group.id": "local",
"client.id": "1",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
}
)
// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
let consumer = taos.consumer({
'enable.auto.commit': 'true',
'auto.commit.interval.ms','1000',
'group.id': 'tg2',
'td.connect.user': 'root',
'td.connect.pass': 'taosdata',
'auto.offset.reset','latest',
'msg.with.table.name': 'true',
'td.connect.ip','127.0.0.1',
'td.connect.port','6030'
});
using TDengineTMQ;
// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、
// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数
var cfg = new ConsumerConfig
{
EnableAutoCommit = "true"
AutoCommitIntervalMs = "1000"
GourpId = "TDengine-TMQ-C#",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
AutoOffsetReset = "latest"
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
TDConnectPort = "6030"
};
var consumer = new ConsumerBuilder(cfg).Build();
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
数据回放功能说明:
- 订阅增加 replay 功能,按照数据写入的时间回放。
比如,如下时间写入三条数据则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。
2023/09/22 00:00:00.000
2023/09/22 00:00:05.000
2023/09/22 00:00:08.000 - 仅列订阅支持数据回放
- 回放需要保证独立时间线
- 如果是子表订阅或者普通表订阅,只有一个vnode上有数据,保证是一个时间线
- 如果超级表订阅,则需保证该 DB 只有一个vnode,否则报错(因为多个vnode上订阅出的数据不在一个时间线上)
- 超级表和库订阅不支持回放
- 增加 enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。
- 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭
- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差
订阅 topics
一个 consumer 支持同时订阅多个 topic。
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
consumer.subscribe(["tmq_meters"]).await?;
consumer.subscribe(['topic1', 'topic2'])
// 创建订阅 topics 列表
let topics = ['topic_test']
// 启动订阅
consumer.subscribe(topics);
// 创建订阅 topics 列表
List<String> topics = new List<string>();
topics.add("tmq_topic");
// 启动订阅
consumer.Subscribe(topics);
消费
以下代码展示了不同语言下如何对 TMQ 消息进行消费。
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
// 消费数据
while (running) {
TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
msg_process(msg);
}
这里是一个 while 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
processMsg(meter);
}
}
for {
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.Value())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
while True:
res = consumer.poll(100)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
while(true){
msg = consumer.consume(200);
// process message(consumeResult)
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
}
// 消费数据
while (true)
{
var consumerRes = consumer.Consume(100);
// process ConsumeResult
ProcessMsg(consumerRes);
consumer.Commit(consumerRes);
}
结束消费
消费结束后,应当取消订阅。
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
/* 取消订阅 */
tmq_unsubscribe(tmq);
/* 关闭消费者对象 */
tmq_consumer_close(tmq);
/* 取消订阅 */
consumer.unsubscribe();
/* 关闭消费 */
consumer.close();
/* Unsubscribe */
_ = consumer.Unsubscribe()
/* Close consumer */
_ = consumer.Close()
consumer.unsubscribe().await;
# 取消订阅
consumer.unsubscribe()
# 关闭消费
consumer.close()
consumer.unsubscribe();
consumer.close();
// 取消订阅
consumer.Unsubscribe();
// 关闭消费
consumer.Close();
删除 topic
如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。
/* 删除 topic */
DROP TOPIC topic_name;
状态查看
1、topics:查询已经创建的 topic
SHOW TOPICS;
2、consumers:查询 consumer 的状态及其订阅的 topic
SHOW CONSUMERS;
3、subscriptions:查询 consumer 与 vgroup 之间的分配关系
SHOW SUBSCRIPTIONS;
示例代码
以下是各语言的完整示例代码。
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static char dbName[64] = "tmqdb";
static char stbName[64] = "stb";
static char topicName[64] = "topicname";
static int32_t msg_process(TAOS_RES* msg) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content: %s\n", buf);
}
return rows;
}
static int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes;
// drop database if exists
printf("create database\n");
pRes = taos_query(pConn, "drop database if exists tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "create database tmqdb wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create super table
printf("create super table\n");
pRes = taos_query(
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create sub tables
printf("create sub tables\n");
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// insert data
printf("insert data into sub tables\n");
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "use tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
tmq_conf_res_t code;
tmq_conf_t* conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topicList = tmq_list_new();
int32_t code = tmq_list_append(topicList, "topicname");
if (code) {
return NULL;
}
return topicList;
}
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
} else {
break;
}
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
int main(int argc, char* argv[]) {
int32_t code;
if (init_env() < 0) {
return -1;
}
if (create_topic() < 0) {
return -1;
}
tmq_t* tmq = build_consumer();
if (NULL == tmq) {
fprintf(stderr, "%% build_consumer() fail!\n");
return -1;
}
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% Consumer closed\n");
}
return 0;
}
- 本地连接
- WebSocket 连接
package com.taos.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class SubscribeDemo {
private static final String TOPIC = "tmq_topic";
private static final String DB_NAME = "meters";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
Connection connection = DriverManager.getConnection(jdbcUrl);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME + " wal_retention_period 3600");
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}
// create consumer
Properties properties = new Properties();
properties.getProperty(TMQConstants.CONNECT_TYPE, "jni");
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
properties.setProperty(TMQConstants.GROUP_ID, "test1");
properties.setProperty(TMQConstants.CLIENT_ID, "1");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Meters> r : meters) {
Meters meter = r.value();
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}
package com.taos.example;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
package com.taos.example;
import java.sql.Timestamp;
public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}
[查看源码](https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/Meters.java)
package com.taos.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class WebsocketSubscribeDemo {
private static final String TOPIC = "tmq_topic_ws";
private static final String DB_NAME = "meters_ws";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://127.0.0.1:6041/?user=root&password=taosdata&batchfetch=true";
try (Connection connection = DriverManager.getConnection(jdbcUrl);
Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME + " wal_retention_period 3600");
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}
// create consumer
Properties properties = new Properties();
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
properties.setProperty(TMQConstants.GROUP_ID, "test2");
properties.setProperty(TMQConstants.CLIENT_ID, "1");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Meters> r : meters) {
Meters meter = (Meters) r.value();
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}
package com.taos.example;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
package com.taos.example;
import java.sql.Timestamp;
public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}
package main
import (
"fmt"
"os"
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)
func main() {
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists example_tmq wal_retention_period 3600")
if err != nil {
panic(err)
}
_, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
if err != nil {
panic(err)
}
if err != nil {
panic(err)
}
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
_, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil {
panic(err)
}
go func() {
for {
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if err != nil {
panic(err)
}
time.Sleep(time.Microsecond * 100)
}
}()
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.String())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
err = consumer.Unsubscribe()
if err != nil {
panic(err)
}
err = consumer.Close()
if err != nil {
panic(err)
}
}
use std::time::Duration;
use chrono::{DateTime, Local};
use taos::*;
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
}
async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "tmq";
// prepare database
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
])
.await?;
let task = tokio::spawn(prepare(taos));
tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
consumer.commit(offset).await?;
Ok(())
})
.await?;
consumer.unsubscribe().await;
task.await??;
Ok(())
}
from taos.tmq import Consumer
import taos
def init_tmq_env(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
conn.execute("create database if not exists {} wal_retention_period 3600".format(db))
conn.select_db(db)
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
def cleanup(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
if __name__ == '__main__':
init_tmq_env("tmq_test", "tmq_test_topic") # init env
consumer = Consumer(
{
"group.id": "tg2",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
try:
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
finally:
consumer.unsubscribe()
consumer.close()
cleanup("tmq_test", "tmq_test_topic")
const taos = require("@tdengine/client");
const conn = taos.connect({ host: "localhost", database: "power" });
var cursor = conn.cursor();
function runConsumer() {
// create topic
cursor.execute("create topic topic_name_example as select * from meters");
let consumer = taos.consumer({
'group.id': 'tg2',
'td.connect.user': 'root',
'td.connect.pass': 'taosdata',
'msg.with.table.name': 'true',
'enable.auto.commit': 'true'
});
// subscribe the topic just created.
consumer.subscribe("topic_name_example");
// get subscribe topic list
let topicList = consumer.subscription();
console.log(topicList);
for (let i = 0; i < 5; i++) {
let msg = consumer.consume(100);
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
consumer.commit(msg);
console.log(`=======consumer ${i} done`)
}
consumer.unsubscribe();
consumer.close();
// drop topic
cursor.execute("drop topic topic_name_example");
}
try {
runConsumer();
} finally {
setTimeout(() => {
cursor.close();
conn.close();
}, 2000);
}
using System;
using TDengineTMQ;
using TDengineDriver;
using System.Runtime.InteropServices;
namespace TMQExample
{
internal class SubscribeDemo
{
static void Main(string[] args)
{
IntPtr conn = GetConnection();
string topic = "topic_example";
//create topic
IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from meters");
if (TDengine.ErrorNo(res) != 0 )
{
throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
}
var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
};
// create consumer
var consumer = new ConsumerBuilder(cfg)
.Build();
// subscribe
consumer.Subscribe(topic);
// consume
for (int i = 0; i < 5; i++)
{
var consumeRes = consumer.Consume(300);
// print consumeResult
foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
{
Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
kv.Value.Metas.ForEach(meta =>
{
Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
});
Console.WriteLine("");
kv.Value.Datas.ForEach(data =>
{
Console.WriteLine(data.ToString());
});
}
consumer.Commit(consumeRes);
Console.WriteLine("\n================ {0} done ", i);
}
// retrieve topic list
List<string> topics = consumer.Subscription();
topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
// unsubscribe
consumer.Unsubscribe();
// close consumer after use.Otherwise will lead memory leak.
consumer.Close();
TDengine.Close(conn);
}
static IntPtr GetConnection()
{
string host = "localhost";
short port = 6030;
string username = "root";
string password = "taosdata";
string dbname = "power";
var conn = TDengine.Connect(host, username, password, dbname, port);
if (conn == IntPtr.Zero)
{
throw new Exception("Connect to TDengine failed");
}
else
{
Console.WriteLine("Connect to TDengine success");
}
return conn;
}
}
}