TDengine Rust Connector
taos
是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
该 Rust 连接器的源码托管在 GitHub。
连接方式
taos
提供两种建立连接的方式。一般我们推荐使用 Websocket 连接。
- 原生连接,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。
- Websocket 连接,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。
你可以通过不同的 “特性(即 Cargo 关键字 features
)” 来指定使用哪种连接器(默认同时支持)。
连接方式的详细介绍请参考:连接器建立连接的方式
支持的平台
原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。
版本历史
Rust 连接器版本 | TDengine 版本 | 主要功能 |
---|---|---|
v0.12.0 | 3.2.3.0 or later | WS 支持压缩。 |
v0.11.0 | 3.2.0.0 | TMQ 功能优化。 |
v0.10.0 | 3.1.0.0 | WS endpoint 变更。 |
v0.9.2 | 3.0.7.0 | STMT:ws 下获取 tag_fields、col_fields。 |
v0.8.12 | 3.0.5.0 | 消息订阅:获取消费进度及按照指定进度开始消费。 |
v0.8.0 | 3.0.4.0 | 支持无模式写入。 |
v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 |
v0.6.0 | 3.0.0.0 | 基础功能。 |
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
处理错误
在报错后,可以获取到错误的具体信息:
match conn.exec(sql) {
Ok(_) => {
Ok(())
}
Err(e) => {
eprintln!("ERROR: {:?}", e);
Err(e)
}
}
TDengine DataType 和 Rust DataType
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下:
TDengine DataType | Rust DataType |
---|---|
TIMESTAMP | Timestamp |
INT | i32 |
BIGINT | i64 |
FLOAT | f32 |
DOUBLE | f64 |
SMALLINT | i16 |
TINYINT | i8 |
BOOL | bool |
BINARY | Vec<u8> |
NCHAR | String |
JSON | serde_json::Value |
注意:JSON 类型仅在 tag 中支持。
安装步骤
安装前准备
- 安装 Rust 开发工具链
- 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动
安装连接器
根据选择的连接方式,按照如下说明在 Rust 项目中添加 taos 依赖:
- 同时支持
- 仅 Websocket
- 仅原生连接
在 Cargo.toml
文件中添加 taos:
[dependencies]
# use default feature
taos = "*"
在 Cargo.toml
文件中添加 taos,并启用 ws
特性。
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
当仅启用 ws
特性时,可同时指定 r2d2
使得在同步(blocking/sync)模式下使用 r2d2 作为连接池:
[dependencies]
taos = { version = "*", default-features = false, features = ["r2d2", "ws"] }
在 Cargo.toml
文件中添加 taos,并启用 native
特性:
[dependencies]
taos = { version = "*", default-features = false, features = ["native"] }
建立连接
TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。
let builder = TaosBuilder::from_dsn("taos://")?;
现在您可以使用该对象创建连接:
let conn = builder.build()?;
连接对象可以创建多个:
let conn1 = builder.build()?;
let conn2 = builder.build()?;
DSN 描述字符串基本结构如下:
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |
各部分意义见下表:
- driver: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
- taos: 表名使用 TDengine 连接器驱动。
- tmq: 使用 TMQ 订阅数据。
- http/ws: 使用 Websocket 创建连接。
- https/wss: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- protocol: 显示指定以何种方式建立连接,例如:
taos+ws://localhost:6041
指定以 Websocket 方式建立连接。 - username/password: 用于创建连接的用户名及密码。
- host/port: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(
taos://
),原生连接默认为localhost:6030
,Websocket 连接默认为localhost:6041
。 - database: 指定默认连接的数据库名,可选参数。
- params:其他可选参数。
一个完整的 DSN 描述字符串示例如下:
taos+ws://localhost:6041/test
表示使用 Websocket(ws
)方式通过 6041
端口连接服务器 localhost
,并指定默认数据库为 test
。
这使得用户可以通过 DSN 指定连接方式:
use taos::*;
// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
// use websocket protocol.
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
建立连接后,您可以进行相关数据库操作:
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
let values = result.
}
查询数据可以通过两种方式:使用内建类型或 serde 序列化框架。
// Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
dbg!(records);
Ok(())
使用示例
创建数据库和表
let db = "power";
// create database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
// create table
taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
]).await?;
注意:如果不使用
use db
指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 db.tb。
插入数据
let inserted = taos.exec("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ").await?;
println!("inserted: {} rows", inserted);
查询数据
let mut result = taos.query("SELECT * FROM power.meters").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
执行带有 req_id 的 SQL
reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:
- 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
- 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
- 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置
如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。
let result = taos.query_with_req_id("SELECT * FROM power.meters", 0).await?;
通过参数绑定写入数据
TDengine 的 Rust 连接器实现了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
参数绑定接口详见API参考
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let taos = TaosBuilder::from_dsn("taos://")?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d{}", i);
let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)];
stmt.set_tbname_tags(&table_name, &tags).await?;
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
stmt.bind(&values).await?;
}
stmt.add_batch().await?;
}
// execute.
let rows = stmt.execute().await?;
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
Ok(())
}
无模式写入
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见无模式写入。
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use crate::AsyncQueryable;
use crate::AsyncTBuilder;
use crate::TaosBuilder;
async fn put() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let dsn =
std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client.exec(format!("drop database if exists {db}")).await?;
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
// SchemalessProtocol::Telnet
let data = [
"meters.current 1648432611249 10.3 location=California.SanFrancisco group=2",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
// SchemalessProtocol::Json
let data = [
r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
client.exec(format!("drop database if exists {db}")).await?;
Ok(())
}
执行带有 req_id 的无模式写入
此 req_id 可用于请求链路追踪。
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.data(data)
.req_id(100u64)
.build()?;
client.put(&sml_data)?
数据订阅
TDengine 通过消息队列 TMQ 启动一个订阅。
创建 Topic
taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;
创建 Consumer
创建消费者:
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
订阅消费数据
消费者可订阅一个或多个 TOPIC
。
consumer.subscribe(["topic_meters"]).await?;
TMQ 消息队列是一个 futures::Stream 类型,可以使用相应 API 对每个消息进行消费,并通过 .commit
进行已消费标记。
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::info!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::info!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::info!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
获取消费进度:
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
指定订阅 Offset
按照指定的进度消费:
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
关闭订阅
consumer.unsubscribe().await;
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,group.id
是必须的。
group.id
: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。client.id
: 可选的订阅客户端识别项。auto.offset.reset
: 可选初始化订阅起点, earliest 为从头开始订阅, latest 为仅从最新数据开始订阅,默认值根据 TDengine 版本有所不同,详细参见 数据订阅。注意,此选项在同一个group.id
中仅生效一次。enable.auto.commit
: 当设置为true
时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。auto.commit.interval.ms
: 自动标记的时间间隔。
完整示例
完整订阅示例参见 GitHub 示例文件.
与连接池使用
在复杂应用中,建议启用连接池。taos 的连接池默认(异步模式)使用 deadpool 实现。
如下,可以生成一个默认参数的连接池。
let pool: Pool<TaosBuilder> = TaosBuilder::from_dsn("taos:///")
.unwrap()
.pool()
.unwrap();
同样可以使用连接池的构造器,对连接池参数进行设置:
let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0)
.max_size(88) // 最大连接数
.build()
.unwrap();
在应用代码中,使用 pool.get()?
来获取一个连接对象 Taos。
let taos = pool.get()?;
更多示例程序
示例程序源码位于 TDengine/examples/rust
下:
请参考:rust example
常见问题
请参考 FAQ
API 参考
Taos 对象提供了多个数据库操作的 API:
-
exec
: 执行某个非查询类 SQL 语句,例如CREATE
,ALTER
,INSERT
等。let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
-
exec_many
: 同时(顺序)执行多个 SQL 语句。taos.exec_many([
"CREATE DATABASE test",
"USE test",
"CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
]).await?; -
query
:执行查询语句,返回 [ResultSet] 对象。let mut q = taos.query("select * from log.logs").await?;
[ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):
列信息使用 [.fields()] 方法获取:
let cols = q.fields();
for col in cols {
println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
}逐行获取数据:
let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}或使用 serde 序列化框架。
#[derive(Debug, Deserialize)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
需要注意的是,需要使用 Rust 异步函数和异步运行时。
Taos 提供部分 SQL 的 Rust 方法化以减少 format!
代码块的频率:
.describe(table: &str)
: 执行DESCRIBE
并返回一个 Rust 数据结构。.create_database(database: &str)
: 执行CREATE DATABASE
语句。.use_database(database: &str)
: 执行USE
语句。
除此之外,该结构也是参数绑定和行协议接口的入口,使用方法请参考具体的 API 说明。
参数绑定接口
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 Taos 对象创建一个 SQL 语句的参数绑定对象 Stmt:
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
参数绑定对象提供了一组接口用于实现参数绑定:
.set_tbname(name)
用于绑定表名。
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
.set_tags(&[tag])
当 SQL 语句使用超级表时,用于绑定子表表名和标签值:
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
.bind(&[column])
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
let params = vec![
ColumnView::from_millis_timestamp(vec![164000000000]),
ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(¶ms)?.add_batch()?.execute()?;
.execute()
执行 SQL。Stmt 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 .add_batch
加入到执行队列中。
stmt.execute()?;
// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
一个可运行的示例请见 GitHub 上的示例。
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。