数据订阅
TDengine 提供了类似于消息队列产品的数据订阅和消费接口。在许多场景中,采用 TDengine 的时序大数据平台,无须再集成消息队列产品,从而简化应用程序设计并降低运维成本。本章介绍各语言连接器数据订阅的相关 API 以及使用方法。数据订阅的基础知识请参考 数据订阅
创建主题
请用 TDengine CLI 或者参考 执行 SQL 章节用程序执行创建主题的 SQL:CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters
上述 SQL 将创建一个名为 topic_meters 的订阅。使用该订阅所获取的消息中的每条记录都由此查询语句 SELECT ts, current, voltage, phase, groupid, location FROM meters
所选择的列组成。
注意 在 TDengine 连接器实现中,对于订阅查询,有以下限制。
- 查询语句限制:订阅查询只能使用 select 语句,并不支持其他类型的 SQL,如订阅库、订阅超级表(非 select 方式)、insert、update 或 delete 等。
- 原始始数据查询:订阅查询只能查询原始数据,而不能查询聚合或计算结果。
- 时间顺序限制:订阅查询只能按照时间正序查询数据。
创建消费者
TDengine 消费者的概念跟 Kafka 类似,消费者通过订阅主题来接收数据流。消费者可以配置多种参数,如连接方式、服务器地址、自动提交 Offset 等以适应不同的数据处理需求。有的语言连接器的消费者还支持自动重连和数据传输压缩等高级功能,以确保数据的高效和稳定接收。
创建参数
创建消费者的参数较多,非常灵活的支持了各种连接类型、Offset 提交方式、压缩、重连、反序列化等特性。各语言连接器都适用的通用基础配置项如下表所示:
td.connect.ip
- 说明:服务端的 FQDN
- 类型:string
- 备注:可以是 ip 或者 host name
td.connect.user
- 说明:用户名
- 类型:string
td.connect.pass
- 说明:密码
- 类型:string
td.connect.port
- 说明:服务端的端口号
- 类型:integer
group.id
- 说明:消费组 ID,同一消费组共享消费进度
- 类型:string
- 备注:必填项。最大长度:192,超长将截断。
每个 topic 最多可建立 100 个 consumer group
client.id
- 说明:客户端 ID
- 类型:string
- 备注:最大长度 255,超长将截断
auto.offset.reset
- 说明:消费组订阅的初始位置
- 类型:enum
- 备注:
earliest
:default(version < 3.2.0.0),从头开始订阅;latest
:default(version >= 3.2.0.0),仅从最新数据开始订阅;none
:没有提交的 offset 无法订阅。
enable.auto.commit
- 说明:是否启用消费位点自动提交
- 类型:boolean
- 备注:true:自动提交,客户端应用无需 commit;false:客户端应用需要自行 commit;默认值为 true。
auto.commit.interval.ms
- 说明:消费记录自动提交消费位点时间间隔
- 类型:integer
- 备注:单位为毫秒,默认值为 5000
msg.with.table.name
- 说明:是否允许从消息中解析表名
- 类型:boolean
- 备注:不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句),默认关闭。v3.2.0.0 该参数废弃。
enable.replay
- 说明:是否开启数据回放功能
- 类型:boolean
- 备注:默认关闭
session.timeout.ms
- 说明:consumer 心跳丢失后超时时间
- 类型:integer
- 备注:超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除。默认值为 12000,取值范围 [6000,1800000]。v3.3.3.0 开始支持)
max.poll.interval.ms
- 说明:consumer poll 拉取数据间隔的最长时间
- 类型:integer
- 备注:超过该时间,会认为该 consumer 离线,触发 rebalance 逻辑,成功后该 consumer 会被删除。默认值为 300000,[1000,INT32_MAX]。v3.3.3.0 开始支持。
fetch.max.wait.ms
- 说明:服务端单次返回数据的最大耗时
- 类型:integer
- 备注:默认值为 1000,[1,INT32_MAX]。v3.3.6.0 开始支持。
min.poll.rows
- 说明:服务端单次返回数据的最小条数
- 类型:integer
- 备注:默认值为 4096,[1,INT32_MAX]。v3.3.6.0 开始支持。
msg.consume.rawdata
- 说明:消费数据时拉取数据类型为二进制类型,不可做解析操作
内部参数,只用于 taosX 数据迁移
- 类型:integer
- 备注:默认值为 0 表示不起效,非 0 为起效。v3.3.6.0 开始支持。
下面是各语言连接器创建参数:
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
Java 连接器创建消费者的参数为 Properties,可以设置的参数列表请参考 消费者参数
其他参数请参考上文通用基础配置项。
提供了 td.connect.websocket.scheme
参数来表示协议类型,其他参数同通用基础配置项。
创建消费者支持属性列表:
ws.url
:WebSocket 连接地址。ws.message.channelLen
:WebSocket 消息通道缓存长度,默认 0。ws.message.timeout
:WebSocket 消息超时时间,默认 5m。ws.message.writeWait
:WebSocket 写入消息超时时间,默认 10s。ws.message.enableCompression
:WebSocket 是否启用压缩,默认 false。ws.autoReconnect
:WebSocket 是否自动重连,默认 false。ws.reconnectIntervalMs
:WebSocket 重连间隔时间毫秒,默认 2000。ws.reconnectRetryCount
:WebSocket 重连重试次数,默认 3。
其他参数见上表。
Rust 连接器创建消费者的参数为 DSN,可以设置的参数列表请参考 DSN
其他参数请参考上文通用基础配置项。
提供了 WS_URL
参数来表示连接到的服务器地址,其他参数同通用基础配置项。
创建消费者支持属性列表:
useSSL
:是否使用 SSL 连接,默认为 false。token
:连接 TDengine cloud 的 token。ws.message.enableCompression
:是否启用 WebSocket 压缩,默认为 false。ws.autoReconnect
:是否自动重连,默认为 false。ws.reconnect.retry.count
:重连次数,默认为 3。ws.reconnect.interval.ms
:重连间隔毫秒时间,默认为 2000。
其他参数见上表。
- WebSocket 连接:因为使用 dsn,不需要
td.connect.ip
,td.connect.port
,td.connect.user
和td.connect.pass
四个配置项,其余同通用配置项。 - 原生连接:同通用基础配置项。
不支持
WebSocket 连接
介绍各语言连接器使用 WebSocket 连接方式创建消费者。指定连接的服务器地址,设置自动提交,从最新消息开始消费,指定 group.id
和 client.id
等信息。有的语言的连接器还支持反序列化参数。
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
def create_consumer():
try:
consumer = taosws.Consumer(conf={
"td.connect.websocket.scheme": tdConnWsScheme,
"group.id": groupId,
"client.id": clientId,
"auto.offset.reset": autoOffsetReset,
"td.connect.ip": host,
"td.connect.port": port,
"enable.auto.commit": autoCommitState,
"auto.commit.interval.ms": autoCommitIntv,
})
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}.");
return consumer;
except Exception as err:
print(f"Failed to create websocket consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.");
raise err
// create consumer
wsUrl := "ws://127.0.0.1:6041"
groupID = "group1"
clientID = "client1"
host = "127.0.0.1"
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": wsUrl,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"group.id": groupID,
"client.id": clientID,
})
if err != nil {
log.Fatalf(
"Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
host,
groupID,
clientID,
err.Error(),
)
}
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
let dsn = "ws://localhost:6041".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create websocket consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";
async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "td.connect.port", "6041" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
ws_tmq_t* build_consumer(const ConsumerConfig* config) {
ws_tmq_conf_res_t code;
ws_tmq_t* tmq = NULL;
// create a configuration object
ws_tmq_conf_t* conf = ws_tmq_conf_new();
// set the configuration parameters
code = ws_tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "group.id", config->group_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "client.id", config->client_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
// create a consumer object
tmq = ws_tmq_consumer_new(conf, "taos://localhost:6041", NULL, 0);
_end:
// destroy the configuration object
ws_tmq_conf_destroy(conf);
return tmq;
}
ws_tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", config.td_connect_host,
config.group_id, config.client_id);
}
调用 build_consumer
函数尝试获取消费者实例 tmq
。成功则打印成功日志,失败则打印失败日志。
不支持
原生连接
介绍各语言连接器使用原生连接方式创建消费者。指定连接的服务器地址,设置自动提交,从最新消息开始消费,指定 group.id
和 client.id
等信息。有的语言的连接器还支持反序列化参数。
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
from taos.tmq import Consumer
def create_consumer():
try:
consumer = Consumer(
{
"group.id": groupId,
"client.id": clientId,
"td.connect.user": user,
"td.connect.pass": password,
"enable.auto.commit": autoCommitState,
"auto.commit.interval.ms": autoCommitIntv,
"auto.offset.reset": autoOffsetReset,
"td.connect.ip": host,
"td.connect.port": str(port),
}
)
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}")
return consumer
except Exception as err:
print(f"Failed to create native consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// create consumer
groupID = "group1"
clientID = "client1"
host = "127.0.0.1"
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"group.id": groupID,
"client.id": clientID,
})
if err != nil {
log.Fatalf(
"Failed to create native consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
host,
groupID,
clientID,
err.Error(),
)
}
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
let dsn = "taos://localhost:6030".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create native consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
不支持
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.port", "6030" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
tmq_t* build_consumer(const ConsumerConfig* config) {
tmq_conf_res_t code;
tmq_t* tmq = NULL;
// create a configuration object
tmq_conf_t* conf = tmq_conf_new();
// set the configuration parameters
code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "group.id", config->group_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "client.id", config->client_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
// set the callback function for auto commit
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
// create a consumer object
tmq = tmq_consumer_new(conf, NULL, 0);
_end:
// destroy the configuration object
tmq_conf_destroy(conf);
return tmq;
}
tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
}
调用 build_consumer
函数尝试获取消费者实例 tmq
。成功则打印成功日志,失败则打印失败日志。
不支持
订阅消费数据
消费者订阅主题后,可以开始接收并处理这些主题中的消息。订阅消费数据的示例代码如下:
WebSocket 连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
subscribe
方法的参数含义为:订阅的主题列表(即名称),支持同时订阅多个主题。poll
每次调用获取一个消息,一个消息中可能包含多个记录。ResultBean
是我们自定义的一个内部类,其字段名和数据类型与列的名称和数据类型一一对应,这样根据value.deserializer
属性对应的反序列化类可以反序列化出ResultBean
类型的对象。
def subscribe(consumer):
try:
consumer.subscribe([topic])
print("Subscribe topics successfully")
for i in range(50):
records = consumer.poll(timeout=1.0)
if records:
for block in records:
for row in block:
print(f"data: {row}")
except Exception as err:
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
subscribe
方法的参数含义为:订阅的主题列表(即名称),支持同时订阅多个主题。poll
每次调用获取一个消息,一个消息中可能包含多个记录。records
包含了多个 block 块,每个块中可能包含多个记录。
topic = "topic_meters"
err = consumer.Subscribe(topic, nil)
if err != nil {
log.Fatalf(
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ {
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("data:%v\n", e)
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
case tmqcommon.Error:
log.Fatalf(
"Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.Error(),
)
}
}
}
let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;
- 消费者可订阅一个或多个
TOPIC
,一般建议一个消费者只订阅一个TOPIC
。 - TMQ 消息队列是一个 futures::Stream 类型,可以使用相应 API 对每个消息进行消费,并通过
.commit
进行已消费标记。 Record
是我们自定义的一个结构体,其字段名和数据类型与列的名称和数据类型一一对应,这样可以通过serde
反序列化出Record
类型的对象。
const taos = require("@tdengine/websocket");
const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";
async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}
async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 1; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
await wsSql.close();
}
async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
let res = new Map();
while (res.size == 0) {
res = await consumer.poll(100);
await consumer.commit();
}
let assignment = await consumer.assignment();
await consumer.seekToBeginning(assignment);
console.log("Assignment seek to beginning successfully");
} catch (err) {
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
async function test() {
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
}
test()
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine($"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// build a topic list used to subscribe
ws_tmq_list_t* build_topic_list() {
// create a empty topic list
ws_tmq_list_t* topicList = ws_tmq_list_new();
// append topic name to the list
int32_t code = ws_tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
ws_tmq_list_destroy(topicList);
fprintf(stderr,
"Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(NULL));
return NULL;
}
// if success, return the list
return topicList;
}
void basic_consume_loop(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
int32_t msg_process(WS_RES* msg) {
int32_t rows = 0;
const char* topicName = ws_tmq_get_topic_name(msg);
const char* dbName = ws_tmq_get_db_name(msg);
int32_t vgroupId = ws_tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
WS_ROW row = ws_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
ws_tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n", topic_name, config.group_id,
config.client_id);
return -1;
}
if ((code = ws_tmq_subscribe(tmq, topic_list))) {
fprintf(stderr,
"Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
ws_tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
订阅消费数据步骤:
- 调用
ws_build_topic_list
函数创建一个主题列表topic_list
。 - 如果
topic_list
为NULL
,表示创建失败,函数返回-1
。 - 使用
ws_tmq_subscribe
函数订阅tmq
指定的主题列表。如果订阅失败,打印错误信息。 - 销毁主题列表
topic_list
以释放资源。 - 调用
basic_consume_loop
函数开始基本的消费循环,处理订阅的消息。
不支持
原生连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
subscribe
方法的参数含义为:订阅的主题列表(即名称),支持同时订阅多个主题。poll
每次调用获取一个消息,一个消息中可能包含多个记录。ResultBean
是我们自定义的一个内部类,其字段名和数据类型与列的名称和数据类型一一对应,这样根据value.deserializer
属性对应的反序列化类可以反序列化出ResultBean
类型的对象。
def subscribe(consumer):
try:
# subscribe to the topics
consumer.subscribe(["topic_meters"])
print("Subscribe topics successfully")
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
data = block.fetchall()
print(f"data: {data}")
except Exception as err:
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
subscribe
方法的参数含义为:订阅的主题列表(即名称),支持同时订阅多个主题。poll
每次调用获取一个消息,一个消息中可能包含多个记录。records
包含了多个 block 块,每个块中可能包含多个记录。
topic = "topic_meters"
err = consumer.Subscribe(topic, nil)
if err != nil {
log.Fatalf(
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ {
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("data:%v\n", e)
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
case tmqcommon.Error:
log.Fatalf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, e.Error())
}
}
}
let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;
- 消费者可订阅一个或多个
TOPIC
,一般建议一个消费者只订阅一个TOPIC
。 - TMQ 消息队列是一个 futures::Stream 类型,可以使用相应 API 对每个消息进行消费,并通过
.commit
进行已消费标记。 Record
是我们自定义的一个结构体,其字段名和数据类型与列的名称和数据类型一一对应,这样可以通过serde
反序列化出Record
类型的对象。
不支持
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// build a topic list used to subscribe
tmq_list_t* build_topic_list() {
// create a empty topic list
tmq_list_t* topicList = tmq_list_new();
// append topic name to the list
int32_t code = tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return NULL;
}
// if success, return the list
return topicList;
}
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
int32_t msg_process(TAOS_RES* msg) {
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n",
topic_name, config.group_id, config.client_id);
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
订阅消费数据步骤:
- 调用
build_topic_list
函数创建一个主题列表topic_list
。 - 如果
topic_list
为NULL
,表示创建失败,函数返回-1
。 - 使用
tmq_subscribe
函数订阅tmq
指定的主题列表。如果订阅失败,打印错误信息。 - 销毁主题列表
topic_list
以释放资源。 - 调用
basic_consume_loop
函数开始基本的消费循环,处理订阅的消息。
不支持
指定订阅的 Offset
消费者可以指定从特定 Offset 开始读取分区中的消息,这允许消费者重读消息或跳过已处理的消息。下面展示各语言连接器如何指定订阅的 Offset。
WebSocket 连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
- 使用 consumer.poll 方法轮询数据,直到获取到数据为止。
- 对于轮询到的第一批数据,打印第一条数据的内容,并获取当前消费者的分区分配信息。
- 使用 consumer.seekToBeginning 方法将所有分区的偏移量重置到开始位置,并打印成功重置的消息。
- 再次使用 consumer.poll 方法轮询数据,并打印第一条数据的内容。
try:
assignments = consumer.assignment()
for assignment in assignments:
topic = assignment.topic()
print(f"topic: {topic}")
for assign in assignment.assignments():
print(
f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
consumer.seek(topic, assign.vg_id(), assign.begin())
print("Assignment seek to beginning successfully.")
except Exception as err:
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// get assignment
partitions, err := consumer.Assignment()
if err != nil {
log.Fatalf(
"Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ {
// seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
log.Fatalf(
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic,
groupID,
clientID,
partitions[i].Partition,
0,
err.Error(),
)
}
}
fmt.Println("Assignment seek to beginning successfully")
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);
match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);
- 通过调用 consumer.assignments() 方法获取消费者当前的分区分配信息,并记录初始分配状态。
- 遍历每个分区分配信息,对于每个分区:提取主题(topic)、消费组 ID(vgroup_id)、当前偏移量(current)、起始偏移量(begin)和结束偏移量(end)。 记录这些信息。
- 调用 consumer.offset_seek 方法将偏移量设置到起始位置。如果操作失败,记录错误信息和当前分配状态。
- 在所有分区的偏移量调整完成后,再次获取并记录消费者的分区分配信息,以确认偏移量调整后的状态。
async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
let res = new Map();
while (res.size == 0) {
res = await consumer.poll(100);
await consumer.commit();
}
let assignment = await consumer.assignment();
await consumer.seekToBeginning(assignment);
console.log("Assignment seek to beginning successfully");
} catch (err) {
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
void consume_repeatly(ws_tmq_t* tmq) {
int32_t numOfAssignment = 0;
ws_tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = ws_tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
ws_tmq_topic_assignment* p = &pAssign[i];
code = ws_tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr,
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, ws_tmq_errstr(tmq));
break;
}
}
if (code == 0) fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
ws_tmq_free_assignment(pAssign, numOfAssignment);
// let's consume the messages again
basic_consume_loop(tmq);
}
- 通过
ws_tmq_get_topic_assignment
函数获取特定主题的分配信息,包括分配的数量和具体分配详情。 - 如果获取分配信息失败,则打印错误信息并返回。
- 对于每个分配,使用
ws_tmq_offset_seek
函数将消费者的偏移量设置到最早的偏移量。 - 如果设置偏移量失败,则打印错误信息。
- 释放分配信息数组以释放资源。
- 调用
basic_consume_loop
函数开始新的消费循环,处理消息。
不支持
原生连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
- 使用 consumer.poll 方法轮询数据,直到获取到数据为止。
- 对于轮询到的第一批数据,打印第一条数据的内容,并获取当前消费者的分区分配信息。
- 使用 consumer.seekToBeginning 方法将所有分区的偏移量重置到开始位置,并打印成功重置的消息。
- 再次使用 consumer.poll 方法轮询数据,并打印第一条数据的内容。
try:
assignments = consumer.assignment()
if assignments:
for partition in assignments:
partition.offset = 0
consumer.seek(partition)
print(f"Assignment seek to beginning successfully.")
except Exception as err:
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// get assignment
partitions, err := consumer.Assignment()
if err != nil {
log.Fatalf("Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, err.Error())
}
fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ {
// seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
log.Fatalf(
"Failed to execute seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic,
groupID,
clientID,
partitions[i].Partition,
0,
err.Error(),
)
}
}
fmt.Println("Assignment seek to beginning successfully")
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);
match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);
- 通过调用 consumer.assignments() 方法获取消费者当前的分区分配信息,并记录初始分配状态。
- 遍历每个分区分配信息,对于每个分区:提取主题(topic)、消费组 ID(vgroup_id)、当前偏移量(current)、起始偏移量(begin)和结束偏移量(end)。 记录这些信息。
- 调用 consumer.offset_seek 方法将偏移量设置到起始位置。如果操作失败,记录错误信息和当前分配状态。
- 在所有分区的偏移量调整完成后,再次获取并记录消费者的分区分配信息,以确认偏移量调整后的状态。
不支持
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
void consume_repeatly(tmq_t* tmq) {
int32_t numOfAssignment = 0;
tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment* p = &pAssign[i];
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, tmq_err2str(code));
break;
}
}
if (code == 0)
fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
tmq_free_assignment(pAssign);
// let's consume the messages again
basic_consume_loop(tmq);
}
- 通过
tmq_get_topic_assignment
函数获取特定主题的分配信息,包括分配的数量和具体分配详情。 - 如果获取分配信息失败,则打印错误信息并返回。
- 对于每个分配,使用
tmq_offset_seek
函数将消费者的偏移量设置到最早的偏移量。 - 如果设置偏移量失败,则打印错误信息。
- 释放分配信息数组以释放资源。
- 调用
basic_consume_loop
函数开始新的消费循环,处理消息。
不支持
提交 Offset
当消费者读取并处理完消息后,它可以提交 Offset,这表示消费者已经成功处理到这个 Offset 的消息。Offset 提交可以是自动的(根据配置定期提交)或手动的(应用程序控制何时提交)。
当创建消费者时,属性 enable.auto.commit
为 false 时,可以手动提交 offset。
注意:手工提交消费进度前确保消息正常处理完成,否则处理出错的消息不会被再次消费。自动提交是在本次 poll
消息时可能会提交上次消息的消费进度,因此请确保消息处理完毕再进行下一次 poll
或消息获取。
WebSocket 连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
def commit_offset(consumer):
try:
for i in range(50):
records = consumer.poll(timeout=1.0)
if records:
for block in records:
for row in block:
print(f"data: {row}")
# after processing the data, commit the offset manually
consumer.commit(records)
print("Commit offset manually successfully.")
except Exception as err:
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
可以通过 consumer.commit
方法来手工提交消费进度。
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
await consumer.commit();
console.log("Commit offset manually successfully.");
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
void manual_commit(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = ws_tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr,
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
// free the message
ws_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
可以通过 ws_tmq_commit_sync
函数来手工提交消费进度。
不支持
原生连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
try:
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
print(block.fetchall())
# after processing the data, commit the offset manually
consumer.commit(records)
print("Commit offset manually successfully.");
except Exception as err:
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
可以通过 consumer.commit
方法来手工提交消费进度。
不支持
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
void manual_commit(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
// free the message
taos_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
可以通过 tmq_commit_sync
函数来手工提交消费进度。
不支持
取消订阅和关闭消费
消费者可以取消对主题的订阅,停止接收消息。当消费者不再需要时,应该关闭消费者实例,以释放资源和断开与 TDengine 服务器的连接。
WebSocket 连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
try:
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
注意:消费者取消订阅后已经关闭,无法重用,如果想订阅新的 topic
,请重新创建消费者。
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// unsubscribe the topic
code = ws_tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr,
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = ws_tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
不支持
原生连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
try:
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
注意:消费者取消订阅后已经关闭,无法重用,如果想订阅新的 topic
,请重新创建消费者。
不支持
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// unsubscribe the topic
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
不支持
完整示例
WebSocket 连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
完整代码示例
public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});
try {
TaosConsumer<ResultBean> consumer = getConsumer();
pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}
注意:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
完整代码示例
#!/usr/bin/python3
import taosws
db = "power"
topic = "topic_meters"
user = "root"
password = "taosdata"
host = "localhost"
port = 6041
groupId = "group1"
clientId = "1"
tdConnWsScheme = "ws"
autoOffsetReset = "latest"
autoCommitState = "true"
autoCommitIntv = "1000"
def prepareMeta():
conn = None
try:
conn = taosws.connect(user=user, password=password, host=host, port=port)
# create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
assert rowsAffected == 0
# change database.
rowsAffected = conn.execute(f"USE {db}")
assert rowsAffected == 0
# create super table
rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(64))"
)
assert rowsAffected == 0
# create table
rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
assert rowsAffected == 0
# ANCHOR: create_topic
# create topic
conn.execute(
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
)
# ANCHOR_END: create_topic
sql = """
INSERT INTO
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
affectedRows = conn.execute(sql)
print(f"Inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err:
print(f"Failed to prepareMeta, host: {host}:{port}, db: {db}, topic: {topic}, ErrMessage:{err}.")
raise err
finally:
if conn:
conn.close()
# ANCHOR: create_consumer
def create_consumer():
try:
consumer = taosws.Consumer(conf={
"td.connect.websocket.scheme": tdConnWsScheme,
"group.id": groupId,
"client.id": clientId,
"auto.offset.reset": autoOffsetReset,
"td.connect.ip": host,
"td.connect.port": port,
"enable.auto.commit": autoCommitState,
"auto.commit.interval.ms": autoCommitIntv,
})
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}.");
return consumer;
except Exception as err:
print(f"Failed to create websocket consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.");
raise err
# ANCHOR_END: create_consumer
def seek_offset(consumer):
# ANCHOR: assignment
try:
assignments = consumer.assignment()
for assignment in assignments:
topic = assignment.topic()
print(f"topic: {topic}")
for assign in assignment.assignments():
print(
f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
consumer.seek(topic, assign.vg_id(), assign.begin())
print("Assignment seek to beginning successfully.")
except Exception as err:
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: assignment
# ANCHOR: subscribe
def subscribe(consumer):
try:
consumer.subscribe([topic])
print("Subscribe topics successfully")
for i in range(50):
records = consumer.poll(timeout=1.0)
if records:
for block in records:
for row in block:
print(f"data: {row}")
except Exception as err:
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: subscribe
# ANCHOR: commit_offset
def commit_offset(consumer):
try:
for i in range(50):
records = consumer.poll(timeout=1.0)
if records:
for block in records:
for row in block:
print(f"data: {row}")
# after processing the data, commit the offset manually
consumer.commit(records)
print("Commit offset manually successfully.")
except Exception as err:
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: commit_offset
#
def unsubscribe(consumer):
# ANCHOR: unsubscribe
try:
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
# ANCHOR_END: unsubscribe
if __name__ == "__main__":
consumer = None
try:
prepareMeta()
consumer = create_consumer()
subscribe(consumer)
seek_offset(consumer)
commit_offset(consumer)
finally:
if consumer:
unsubscribe(consumer)
完整代码示例
package main
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
_ "github.com/taosdata/driver-go/v3/taosWS"
"github.com/taosdata/driver-go/v3/ws/tmq"
)
var done = make(chan struct{})
var groupID string
var clientID string
var host string
var topic string
func main() {
// init env
taosDSN := "root:taosdata@ws(127.0.0.1:6041)/"
conn, err := sql.Open("taosWS", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
}
defer func() {
conn.Close()
}()
initEnv(conn)
// ANCHOR: create_consumer
// create consumer
wsUrl := "ws://127.0.0.1:6041"
groupID = "group1"
clientID = "client1"
host = "127.0.0.1"
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": wsUrl,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"group.id": groupID,
"client.id": clientID,
})
if err != nil {
log.Fatalf(
"Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
host,
groupID,
clientID,
err.Error(),
)
}
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
// ANCHOR_END: create_consumer
// ANCHOR: subscribe
topic = "topic_meters"
err = consumer.Subscribe(topic, nil)
if err != nil {
log.Fatalf(
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ {
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("data:%v\n", e)
// ANCHOR: commit_offset
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
// ANCHOR_END: commit_offset
case tmqcommon.Error:
log.Fatalf(
"Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.Error(),
)
}
}
}
// ANCHOR_END: subscribe
// ANCHOR: seek
// get assignment
partitions, err := consumer.Assignment()
if err != nil {
log.Fatalf(
"Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ {
// seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
log.Fatalf(
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic,
groupID,
clientID,
partitions[i].Partition,
0,
err.Error(),
)
}
}
fmt.Println("Assignment seek to beginning successfully")
// ANCHOR_END: seek
// ANCHOR: close
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
// ANCHOR_END: close
<-done
}
func initEnv(conn *sql.DB) {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil {
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
}
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
if err != nil {
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
}
}
done <- struct{}{}
}()
}
完整代码示例
use chrono::DateTime;
use chrono::Local;
use std::str::FromStr;
use std::thread;
use std::time::Duration;
use taos::*;
use tokio::runtime::Runtime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
use taos_query::prelude::*;
// ANCHOR: create_consumer_dsn
let dsn = "ws://localhost:6041".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
// ANCHOR_END: create_consumer_dsn
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
// prepare database and table
taos.exec_many([
"drop topic if exists topic_meters",
"drop database if exists power",
"create database if not exists power WAL_RETENTION_PERIOD 86400",
"use power",
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
"create table if not exists power.d001 using power.meters tags(1,'location')",
])
.await?;
// ANCHOR: create_topic
taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;
// ANCHOR_END: create_topic
// ANCHOR: create_consumer_ac
let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create websocket consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
// ANCHOR_END: create_consumer_ac
let handle = thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
tokio::time::sleep(Duration::from_secs(1)).await;
let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap();
for i in 0..50 {
let insert_sql = format!(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW, 10.30000, {}, 0.31000)"#, i);
if let Err(e) = taos_insert.exec(insert_sql).await {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
});
// ANCHOR: consume
let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;
// ANCHOR_END: consume
// ANCHOR: consumer_commit_manually
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: seek_offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);
match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset
// ANCHOR: unsubscribe
consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
// ANCHOR_END: unsubscribe
tokio::time::sleep(Duration::from_secs(1)).await;
handle.join().unwrap();
taos.exec_many(["drop topic topic_meters", "drop database power"])
.await?;
Ok(())
}
完整代码示例
const taos = require("@tdengine/websocket");
// ANCHOR: create_consumer
const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";
async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// ANCHOR_END: create_consumer
async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}
const delay = function(ms) {
return new Promise(function(resolve) {
setTimeout(resolve, ms);
});
};
async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 50; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
await delay(100);
}
await wsSql.close();
}
async function subscribe(consumer) {
// ANCHOR: commit
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
await consumer.commit();
console.log("Commit offset manually successfully.");
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
// ANCHOR_END: commit
}
async function test() {
// ANCHOR: unsubscribe
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
// ANCHOR_END: unsubscribe
}
test()
完整代码示例
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace TMQExample
{
internal class SubscribeDemo
{
private static string _host = "";
private static string _groupId = "";
private static string _clientId = "";
private static string _topic = "";
public static void Main(string[] args)
{
try
{
var builder =
new ConnectionStringBuilder(
"protocol=WebSocket;host=127.0.0.1;port=6041;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("CREATE DATABASE IF NOT EXISTS power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC IF NOT EXISTS topic_meters as SELECT * from power.meters");
var consumer = CreateConsumer();
// insert data
Task.Run(InsertData);
// consume message
Consume(consumer);
// seek
Seek(consumer);
// commit
CommitOffset(consumer);
// close
Close(consumer);
Console.WriteLine("Done");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(e.Message);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(e.Message);
throw;
}
}
static void InsertData()
{
var builder =
new ConnectionStringBuilder(
"protocol=WebSocket;host=127.0.0.1;port=6041;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec(
"INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
static IConsumer<Dictionary<string, object>> CreateConsumer()
{
// ANCHOR: create_consumer
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "td.connect.port", "6041" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: create_consumer
return consumer;
}
static void Consume(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: subscribe
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine($"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: subscribe
}
static void Seek(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: seek
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: seek
}
static void CommitOffset(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: commit_offset
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
// ANCHOR_END: commit_offset
}
static void Close(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: close
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// ANCHOR_END: close
}
}
}
完整代码示例
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "taosws.h"
volatile int thread_stop = 0;
static int running = 1;
static int count = 0;
const char* topic_name = "topic_meters";
typedef struct {
const char* enable_auto_commit;
const char* auto_commit_interval_ms;
const char* group_id;
const char* client_id;
const char* td_connect_host;
const char* td_connect_port;
const char* td_connect_user;
const char* td_connect_pass;
const char* auto_offset_reset;
} ConsumerConfig;
ConsumerConfig config = {.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"};
void* prepare_data(void* arg) {
int code = 0;
char* dsn = "ws://localhost:6041";
WS_TAOS* pConn = ws_connect(dsn);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return NULL;
}
WS_RES* pRes;
int i = 1;
while (!thread_stop) {
char buf[200] = {0};
i++;
snprintf(
buf, sizeof(buf),
"INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, "
"219, 0.31000)",
i);
pRes = ws_query(pConn, buf);
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
}
ws_free_result(pRes);
sleep(1);
}
fprintf(stdout, "Prepare data thread exit\n");
return NULL;
}
// ANCHOR: msg_process
int32_t msg_process(WS_RES* msg) {
int32_t rows = 0;
const char* topicName = ws_tmq_get_topic_name(msg);
const char* dbName = ws_tmq_get_db_name(msg);
int32_t vgroupId = ws_tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
WS_ROW row = ws_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
// ANCHOR_END: msg_process
WS_TAOS* init_env() {
int code = 0;
char* dsn = "ws://localhost:6041";
WS_TAOS* pConn = ws_connect(dsn);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return NULL;
}
WS_RES* pRes;
// drop database if exists
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
pRes = ws_query(pConn, "DROP DATABASE IF EXISTS power");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
// create database
pRes = ws_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
// create super table
pRes =
ws_query(pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
return pConn;
END:
ws_free_result(pRes);
ws_close(pConn);
return NULL;
}
void deinit_env(WS_TAOS* pConn) {
if (pConn) ws_close(pConn);
}
int32_t create_topic(WS_TAOS* pConn) {
WS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = ws_query(pConn, "USE power");
code = ws_errno(pRes);
if (ws_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
pRes = ws_query(
pConn,
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
return 0;
}
int32_t drop_topic(WS_TAOS* pConn) {
WS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = ws_query(pConn, "USE power");
code = ws_errno(pRes);
if (ws_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
return 0;
}
void tmq_commit_cb_print(ws_tmq_t* tmq, int32_t code, void* param) {
count += 1;
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
}
// ANCHOR: create_consumer_1
ws_tmq_t* build_consumer(const ConsumerConfig* config) {
ws_tmq_conf_res_t code;
ws_tmq_t* tmq = NULL;
// create a configuration object
ws_tmq_conf_t* conf = ws_tmq_conf_new();
// set the configuration parameters
code = ws_tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "group.id", config->group_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "client.id", config->client_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
// create a consumer object
tmq = ws_tmq_consumer_new(conf, "taos://localhost:6041", NULL, 0);
_end:
// destroy the configuration object
ws_tmq_conf_destroy(conf);
return tmq;
}
// ANCHOR_END: create_consumer_1
// ANCHOR: build_topic_list
// build a topic list used to subscribe
ws_tmq_list_t* build_topic_list() {
// create a empty topic list
ws_tmq_list_t* topicList = ws_tmq_list_new();
// append topic name to the list
int32_t code = ws_tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
ws_tmq_list_destroy(topicList);
fprintf(stderr,
"Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(NULL));
return NULL;
}
// if success, return the list
return topicList;
}
// ANCHOR_END: build_topic_list
// ANCHOR: basic_consume_loop
void basic_consume_loop(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
// ANCHOR_END: basic_consume_loop
// ANCHOR: consume_repeatly
void consume_repeatly(ws_tmq_t* tmq) {
int32_t numOfAssignment = 0;
ws_tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = ws_tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
ws_tmq_topic_assignment* p = &pAssign[i];
code = ws_tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr,
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, ws_tmq_errstr(tmq));
break;
}
}
if (code == 0) fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
ws_tmq_free_assignment(pAssign, numOfAssignment);
// let's consume the messages again
basic_consume_loop(tmq);
}
// ANCHOR_END: consume_repeatly
// ANCHOR: manual_commit
void manual_commit(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = ws_tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr,
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
// free the message
ws_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
// ANCHOR_END: manual_commit
int main(int argc, char* argv[]) {
int32_t code;
pthread_t thread_id;
WS_TAOS* pConn = init_env();
if (pConn == NULL) {
fprintf(stderr, "Failed to init env.\n");
return -1;
}
if (create_topic(pConn) < 0) {
fprintf(stderr, "Failed to create topic.\n");
return -1;
}
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
fprintf(stderr, "Failed to create thread.\n");
return -1;
}
// ANCHOR: create_consumer_2
ws_tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", config.td_connect_host,
config.group_id, config.client_id);
}
// ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3
ws_tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n", topic_name, config.group_id,
config.client_id);
return -1;
}
if ((code = ws_tmq_subscribe(tmq, topic_list))) {
fprintf(stderr,
"Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
ws_tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
// ANCHOR_END: subscribe_3
consume_repeatly(tmq);
manual_commit(tmq);
// ANCHOR: unsubscribe_and_close
// unsubscribe the topic
code = ws_tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr,
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = ws_tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
// ANCHOR_END: unsubscribe_and_close
thread_stop = 1;
pthread_join(thread_id, NULL);
if (drop_topic(pConn) < 0) {
fprintf(stderr, "Failed to drop topic.\n");
return -1;
}
deinit_env(pConn);
return 0;
}
不支持
原生连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
完整代码示例
public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});
try {
TaosConsumer<ResultBean> consumer = getConsumer();
pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}
注意:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
完整代码示例
#!/usr/bin/python3
import taos
db = "power"
topic = "topic_meters"
user = "root"
password = "taosdata"
host = "localhost"
port = 6030
groupId = "group1"
clientId = "1"
tdConnWsScheme = "ws"
autoOffsetReset = "latest"
autoCommitState = "true"
autoCommitIntv = "1000"
def prepareMeta():
conn = None
try:
conn = taos.connect(host=host, user=user, password=password, port=port)
conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
# change database. same as execute "USE db"
conn.select_db(db)
# create super table
conn.execute(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
)
# ANCHOR: create_topic
# create topic
conn.execute(
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
)
# ANCHOR_END: create_topic
sql = """
INSERT INTO
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
affectedRows = conn.execute(sql)
print(f"Inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err:
print(f"Failed to prepareMeta, host: {host}:{port}, db: {db}, topic: {topic}, ErrMessage:{err}.")
raise err
finally:
if conn:
conn.close()
# ANCHOR: create_consumer
from taos.tmq import Consumer
def create_consumer():
try:
consumer = Consumer(
{
"group.id": groupId,
"client.id": clientId,
"td.connect.user": user,
"td.connect.pass": password,
"enable.auto.commit": autoCommitState,
"auto.commit.interval.ms": autoCommitIntv,
"auto.offset.reset": autoOffsetReset,
"td.connect.ip": host,
"td.connect.port": str(port),
}
)
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}")
return consumer
except Exception as err:
print(f"Failed to create native consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: create_consumer
# ANCHOR: subscribe
def subscribe(consumer):
try:
# subscribe to the topics
consumer.subscribe(["topic_meters"])
print("Subscribe topics successfully")
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
data = block.fetchall()
print(f"data: {data}")
except Exception as err:
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: subscribe
def commit_offset(consumer):
# ANCHOR: commit_offset
try:
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
print(block.fetchall())
# after processing the data, commit the offset manually
consumer.commit(records)
print("Commit offset manually successfully.");
except Exception as err:
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: commit_offset
def seek_offset(consumer):
# ANCHOR: assignment
try:
assignments = consumer.assignment()
if assignments:
for partition in assignments:
partition.offset = 0
consumer.seek(partition)
print(f"Assignment seek to beginning successfully.")
except Exception as err:
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: assignment
def unsubscribe(consumer):
# ANCHOR: unsubscribe
try:
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
# ANCHOR_END: unsubscribe
if __name__ == "__main__":
consumer = None
try:
prepareMeta()
consumer = create_consumer()
subscribe(consumer)
seek_offset(consumer)
commit_offset(consumer)
finally:
if consumer:
unsubscribe(consumer);
完整代码示例
package main
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/taosdata/driver-go/v3/af/tmq"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
var done = make(chan struct{})
var groupID string
var clientID string
var host string
var topic string
func main() {
// init env
taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/"
conn, err := sql.Open("taosSql", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
}
defer func() {
conn.Close()
}()
initEnv(conn)
// ANCHOR: create_consumer
// create consumer
groupID = "group1"
clientID = "client1"
host = "127.0.0.1"
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"group.id": groupID,
"client.id": clientID,
})
if err != nil {
log.Fatalf(
"Failed to create native consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
host,
groupID,
clientID,
err.Error(),
)
}
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
// ANCHOR_END: create_consumer
// ANCHOR: subscribe
topic = "topic_meters"
err = consumer.Subscribe(topic, nil)
if err != nil {
log.Fatalf(
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ {
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("data:%v\n", e)
// ANCHOR: commit_offset
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
// ANCHOR_END: commit_offset
case tmqcommon.Error:
log.Fatalf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, e.Error())
}
}
}
// ANCHOR_END: subscribe
// ANCHOR: seek
// get assignment
partitions, err := consumer.Assignment()
if err != nil {
log.Fatalf("Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, err.Error())
}
fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ {
// seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
log.Fatalf(
"Failed to execute seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic,
groupID,
clientID,
partitions[i].Partition,
0,
err.Error(),
)
}
}
fmt.Println("Assignment seek to beginning successfully")
// ANCHOR_END: seek
// ANCHOR: close
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
// ANCHOR_END: close
<-done
}
func initEnv(conn *sql.DB) {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil {
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
}
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
if err != nil {
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
}
}
done <- struct{}{}
}()
}
完整代码示例
use chrono::DateTime;
use chrono::Local;
use std::str::FromStr;
use std::thread;
use std::time::Duration;
use taos::*;
use tokio::runtime::Runtime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
use taos_query::prelude::*;
// ANCHOR: create_consumer_dsn
let dsn = "taos://localhost:6030".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
// ANCHOR_END: create_consumer_dsn
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
// prepare database and table
taos.exec_many([
"drop topic if exists topic_meters",
"drop database if exists power",
"create database if not exists power WAL_RETENTION_PERIOD 86400",
"use power",
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
"create table if not exists power.d001 using power.meters tags(1,'location')",
])
.await?;
// ANCHOR: create_topic
taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;
// ANCHOR_END: create_topic
// ANCHOR: create_consumer_ac
let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create native consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
// ANCHOR_END: create_consumer_ac
let handle = thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
tokio::time::sleep(Duration::from_secs(1)).await;
let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap();
for i in 0..50 {
let insert_sql = format!(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW, 10.30000, {}, 0.31000)"#, i);
if let Err(e) = taos_insert.exec(insert_sql).await {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
});
// ANCHOR: consume
let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;
// ANCHOR_END: consume
// ANCHOR: consumer_commit_manually
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: seek_offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);
match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset
// ANCHOR: unsubscribe
consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
// ANCHOR_END: unsubscribe
tokio::time::sleep(Duration::from_secs(1)).await;
handle.join().unwrap();
taos.exec_many(["drop topic topic_meters", "drop database power"])
.await?;
Ok(())
}
不支持
完整代码示例
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace TMQExample
{
internal class SubscribeDemo
{
private static string _host = "";
private static string _groupId = "";
private static string _clientId = "";
private static string _topic = "";
public static void Main(string[] args)
{
try
{
var builder = new ConnectionStringBuilder("host=127.0.0.1;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("CREATE DATABASE IF NOT EXISTS power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC IF NOT EXISTS topic_meters as SELECT * from power.meters");
var consumer = CreateConsumer();
// insert data
Task.Run(InsertData);
// consume message
Consume(consumer);
// seek
Seek(consumer);
// commit
CommitOffset(consumer);
// close
Close(consumer);
Console.WriteLine("Done");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(e.Message);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(e.Message);
throw;
}
}
static void InsertData()
{
var builder = new ConnectionStringBuilder("host=127.0.0.1;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec(
"INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
static IConsumer<Dictionary<string, object>> CreateConsumer()
{
// ANCHOR: create_consumer
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.port", "6030" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: create_consumer
return consumer;
}
static void Consume(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: subscribe
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: subscribe
}
static void Seek(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: seek
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: seek
}
static void CommitOffset(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: commit_offset
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
// ANCHOR_END: commit_offset
}
static void Close(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: close
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// ANCHOR_END: close
}
}
}
完整代码示例
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "taos.h"
volatile int thread_stop = 0;
static int running = 1;
static int count = 0;
const char* topic_name = "topic_meters";
typedef struct {
const char* enable_auto_commit;
const char* auto_commit_interval_ms;
const char* group_id;
const char* client_id;
const char* td_connect_host;
const char* td_connect_port;
const char* td_connect_user;
const char* td_connect_pass;
const char* auto_offset_reset;
} ConsumerConfig;
ConsumerConfig config = {
.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"
};
void* prepare_data(void* arg) {
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return NULL;
}
TAOS_RES* pRes;
int i = 1;
while (!thread_stop) {
char buf[200] = {0};
i++;
snprintf(
buf, sizeof(buf),
"INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, "
"219, 0.31000)",
i);
pRes = taos_query(pConn, buf);
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
}
taos_free_result(pRes);
sleep(1);
}
fprintf(stdout, "Prepare data thread exit\n");
return NULL;
}
// ANCHOR: msg_process
int32_t msg_process(TAOS_RES* msg) {
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
// ANCHOR_END: msg_process
TAOS* init_env() {
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return NULL;
}
TAOS_RES* pRes;
// drop database if exists
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "DROP DATABASE IF EXISTS power");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
// create super table
pRes = taos_query(
pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
return pConn;
END:
taos_free_result(pRes);
taos_close(pConn);
return NULL;
}
void deinit_env(TAOS* pConn) {
if (pConn)
taos_close(pConn);
}
int32_t create_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t drop_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
count +=1;
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
}
// ANCHOR: create_consumer_1
tmq_t* build_consumer(const ConsumerConfig* config) {
tmq_conf_res_t code;
tmq_t* tmq = NULL;
// create a configuration object
tmq_conf_t* conf = tmq_conf_new();
// set the configuration parameters
code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "group.id", config->group_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "client.id", config->client_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
// set the callback function for auto commit
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
// create a consumer object
tmq = tmq_consumer_new(conf, NULL, 0);
_end:
// destroy the configuration object
tmq_conf_destroy(conf);
return tmq;
}
// ANCHOR_END: create_consumer_1
// ANCHOR: build_topic_list
// build a topic list used to subscribe
tmq_list_t* build_topic_list() {
// create a empty topic list
tmq_list_t* topicList = tmq_list_new();
// append topic name to the list
int32_t code = tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return NULL;
}
// if success, return the list
return topicList;
}
// ANCHOR_END: build_topic_list
// ANCHOR: basic_consume_loop
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
// ANCHOR_END: basic_consume_loop
// ANCHOR: consume_repeatly
void consume_repeatly(tmq_t* tmq) {
int32_t numOfAssignment = 0;
tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment* p = &pAssign[i];
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, tmq_err2str(code));
break;
}
}
if (code == 0)
fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
tmq_free_assignment(pAssign);
// let's consume the messages again
basic_consume_loop(tmq);
}
// ANCHOR_END: consume_repeatly
// ANCHOR: manual_commit
void manual_commit(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
// free the message
taos_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
// ANCHOR_END: manual_commit
int main(int argc, char* argv[]) {
int32_t code;
pthread_t thread_id;
TAOS* pConn = init_env();
if (pConn == NULL) {
fprintf(stderr, "Failed to init env.\n");
return -1;
}
if (create_topic(pConn) < 0) {
fprintf(stderr, "Failed to create topic.\n");
return -1;
}
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
fprintf(stderr, "Failed to create thread.\n");
return -1;
}
// ANCHOR: create_consumer_2
tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
}
// ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n",
topic_name, config.group_id, config.client_id);
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
// ANCHOR_END: subscribe_3
consume_repeatly(tmq);
manual_commit(tmq);
// ANCHOR: unsubscribe_and_close
// unsubscribe the topic
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
// ANCHOR_END: unsubscribe_and_close
thread_stop = 1;
pthread_join(thread_id, NULL);
if (drop_topic(pConn) < 0) {
fprintf(stderr, "Failed to drop topic.\n");
return -1;
}
deinit_env(pConn);
return 0;
}
不支持