TDengine Rust Connector
taos
是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
该 Rust 连接器的源码托管在 GitHub。
连接方式
taos
提供两种建立连接的方式。一般我们推荐使用 Websocket 连接。
- 原生连接,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。
- Websocket 连接,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。
你可以通过不同的 “特性(即 Cargo 关键字 features
)” 来指定使用哪种连接器(默认同时支持)。
连接方式的详细介绍请参考:连接器建立连接的方式
支持的平台
原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。
版本历史
Rust 连接器版本 | TDengine 版本 | 主要功能 |
---|---|---|
v0.12.0 | 3.2.3.0 or later | WS 支持压缩。 |
v0.11.0 | 3.2.0.0 | TMQ 功能优化。 |
v0.10.0 | 3.1.0.0 | WS endpoint 变更。 |
v0.9.2 | 3.0.7.0 | STMT:ws 下获取 tag_fields、col_fields。 |
v0.8.12 | 3.0.5.0 | 消息订阅:获取消费进度及按照指定进度开始消费。 |
v0.8.0 | 3.0.4.0 | 支持无模式写入。 |
v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 |
v0.6.0 | 3.0.0.0 | 基础功能。 |
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
处理错误
在报错后,可以获取到错误的具体信息:
match conn.exec(sql) {
Ok(_) => {
Ok(())
}
Err(e) => {
eprintln!("ERROR: {:?}", e);
Err(e)
}
}
TDengine DataType 和 Rust DataType
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下:
TDengine DataType | Rust DataType |
---|---|
TIMESTAMP | Timestamp |
INT | i32 |
BIGINT | i64 |
FLOAT | f32 |
DOUBLE | f64 |
SMALLINT | i16 |
TINYINT | i8 |
BOOL | bool |
BINARY | Vec<u8> |
NCHAR | String |
JSON | serde_json::Value |
注意:JSON 类型仅在 tag 中支持。
安装步骤
安装前准备
- 安装 Rust 开发工具链
- 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动
安装连接器
根据选择的连接方式,按照如下说明在 Rust 项目中添加 taos 依赖:
- 同时支持
- 仅 Websocket
- 仅原生连接
在 Cargo.toml
文件中添加 taos:
[dependencies]
# use default feature
taos = "*"
在 Cargo.toml
文件中添加 taos,并启用 ws
特性。
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
当仅启用 ws
特性时,可同时指定 r2d2
使得在同步(blocking/sync)模式下使用 r2d2 作为连接池:
[dependencies]
taos = { version = "*", default-features = false, features = ["r2d2", "ws"] }
在 Cargo.toml
文件中添加 taos,并启用 native
特性:
[dependencies]
taos = { version = "*", default-features = false, features = ["native"] }
建立连接
TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。
let builder = TaosBuilder::from_dsn("taos://")?;
现在您可以使用该对象创建连接:
let conn = builder.build()?;
连接对象可以创建多个:
let conn1 = builder.build()?;
let conn2 = builder.build()?;
DSN 描述字符串基本结构如下:
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |
各部分意义见下表:
- driver: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
- taos: 表名使用 TDengine 连接器驱动。
- tmq: 使用 TMQ 订阅数据。
- http/ws: 使用 Websocket 创建连接。
- https/wss: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- protocol: 显示指定以何种方式建立连接,例如:
taos+ws://localhost:6041
指定以 Websocket 方式建立连接。 - username/password: 用于创建连接的用户名及密码。
- host/port: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(
taos://
),原生连接默认为localhost:6030
,Websocket 连接默认为localhost:6041
。 - database: 指定默认连接的数据库名,可选参数。
- params:其他可选参数。
一个完整的 DSN 描述字符串示例如下:
taos+ws://localhost:6041/test
表示使用 Websocket(ws
)方式通过 6041
端口连接服务器 localhost
,并指定默认数据库为 test
。
这使得用户可以通过 DSN 指定连接方式:
use taos::*;
// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
// use websocket protocol.
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
建立连接后,您可以进行相关数据库操作:
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
let values = result.
}
查询数据可以通过两种方式:使用内建类型或 serde 序列化框架。
// Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
dbg!(records);
Ok(())
使用示例
创建数据库和表
let db = "power";
// create database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
// create table
taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
]).await?;
注意:如果不使用
use db
指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 db.tb。
插入数据
let inserted = taos.exec("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ").await?;
println!("inserted: {} rows", inserted);
查询数据
let mut result = taos.query("SELECT * FROM power.meters").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
执行带有 req_id 的 SQL
reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:
- 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
- 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
- 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置
如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。
let result = taos.query_with_req_id("SELECT * FROM power.meters", 0).await?;
通过参数绑定写入数据
TDengine 的 Rust 连接器实现了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
参数绑定接口详见API参考
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let taos = TaosBuilder::from_dsn("taos://")?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d{}", i);
let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)];
stmt.set_tbname_tags(&table_name, &tags).await?;
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
stmt.bind(&values).await?;
}
stmt.add_batch().await?;
}
// execute.
let rows = stmt.execute().await?;
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
Ok(())
}
无模式写入
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见无模式写入。
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use crate::AsyncQueryable;
use crate::AsyncTBuilder;
use crate::TaosBuilder;
async fn put() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let dsn =
std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client.exec(format!("drop database if exists {db}")).await?;
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
// SchemalessProtocol::Telnet
let data = [
"meters.current 1648432611249 10.3 location=California.SanFrancisco group=2",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
// SchemalessProtocol::Json
let data = [
r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
client.exec(format!("drop database if exists {db}")).await?;
Ok(())
}
执行带有 req_id 的无模式写入
此 req_id 可用于请求链路追踪。
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.data(data)
.req_id(100u64)
.build()?;
client.put(&sml_data)?
数据订阅
TDengine 通过消息队列 TMQ 启动一个订阅。
创建 Topic
taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;
创建 Consumer
创建消费者:
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
订阅消费数据
消费者可订阅一个或多个 TOPIC
。
consumer.subscribe(["topic_meters"]).await?;
TMQ 消息队列是一个 futures::Stream 类型,可以使用相 应 API 对每个消息进行消费,并通过 .commit
进行已消费标记。
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::info!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::info!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::info!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
获取消费进度:
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
指定订阅 Offset
按照指定的进度消费:
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
关闭订阅
consumer.unsubscribe().await;
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,group.id
是必须的。
group.id
: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。client.id
: 可选的订阅客户端识别项。auto.offset.reset
: 可选初始化订阅起点, earliest 为从头开始订阅, latest 为仅从最新数据开始订阅,默认值根据 TDengine 版本有所不同,详细参见 数据订阅。注意,此选项在同一个group.id
中仅生效一次。enable.auto.commit
: 当设置为true
时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。auto.commit.interval.ms
: 自动标记的时间间隔。
完整示例
完整订阅示例参见 GitHub 示例文件.
与连接池使用
在复杂应用中,建 议启用连接池。taos 的连接池默认(异步模式)使用 deadpool 实现。
如下,可以生成一个默认参数的连接池。
let pool: Pool<TaosBuilder> = TaosBuilder::from_dsn("taos:///")
.unwrap()
.pool()
.unwrap();
同样可以使用连接池的构造器,对连接池参数进行设置:
let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0)
.max_size(88) // 最大连接数
.build()
.unwrap();
在应用代码中,使用 pool.get()?
来获取一个连接对象 Taos。
let taos = pool.get()?;
更多示例程序
示例程序源码位于 TDengine/examples/rust
下:
请参考:rust example