Skip to main content

TDengine Rust Connector

Crates.io Crates.io docs.rs

taos 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。

该 Rust 连接器的源码托管在 GitHub

连接方式

taos 提供两种建立连接的方式。一般我们推荐使用 Websocket 连接

  • 原生连接,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。
  • Websocket 连接,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。

你可以通过不同的 “特性(即 Cargo 关键字 features)” 来指定使用哪种连接器(默认同时支持)。

连接方式的详细介绍请参考:连接器建立连接的方式

支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。

版本历史

Rust 连接器版本TDengine 版本主要功能
v0.12.03.2.3.0 or laterWS 支持压缩。
v0.11.03.2.0.0TMQ 功能优化。
v0.10.03.1.0.0WS endpoint 变更。
v0.9.23.0.7.0STMT:ws 下获取 tag_fields、col_fields。
v0.8.123.0.5.0消息订阅:获取消费进度及按照指定进度开始消费。
v0.8.03.0.4.0支持无模式写入。
v0.7.63.0.3.0支持在请求中使用 req_id。
v0.6.03.0.0.0基础功能。

Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。

处理错误

在报错后,可以获取到错误的具体信息:

match conn.exec(sql) {
Ok(_) => {
Ok(())
}
Err(e) => {
eprintln!("ERROR: {:?}", e);
Err(e)
}
}

TDengine DataType 和 Rust DataType

TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下:

TDengine DataTypeRust DataType
TIMESTAMPTimestamp
INTi32
BIGINTi64
FLOATf32
DOUBLEf64
SMALLINTi16
TINYINTi8
BOOLbool
BINARYVec<u8>
NCHARString
JSONserde_json::Value

注意:JSON 类型仅在 tag 中支持。

安装步骤

安装前准备

  • 安装 Rust 开发工具链
  • 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动

安装连接器

根据选择的连接方式,按照如下说明在 Rust 项目中添加 taos 依赖:

Cargo.toml 文件中添加 taos

[dependencies]
# use default feature
taos = "*"

建立连接

TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。

let builder = TaosBuilder::from_dsn("taos://")?;

现在您可以使用该对象创建连接:

let conn = builder.build()?;

连接对象可以创建多个:

let conn1 = builder.build()?;
let conn2 = builder.build()?;

DSN 描述字符串基本结构如下:

<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |

各部分意义见下表:

  • driver: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
    • taos: 表名使用 TDengine 连接器驱动。
    • tmq: 使用 TMQ 订阅数据。
    • http/ws: 使用 Websocket 创建连接。
    • https/wss: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
  • protocol: 显示指定以何种方式建立连接,例如:taos+ws://localhost:6041 指定以 Websocket 方式建立连接。
  • username/password: 用于创建连接的用户名及密码。
  • host/port: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(taos://),原生连接默认为 localhost:6030,Websocket 连接默认为 localhost:6041
  • database: 指定默认连接的数据库名,可选参数。
  • params:其他可选参数。

一个完整的 DSN 描述字符串示例如下:

taos+ws://localhost:6041/test

表示使用 Websocket(ws)方式通过 6041 端口连接服务器 localhost,并指定默认数据库为 test

这使得用户可以通过 DSN 指定连接方式:

use taos::*;

// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();

// use websocket protocol.
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();

建立连接后,您可以进行相关数据库操作:

async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;

let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;

assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;

for field in result.fields() {
println!("got field: {}", field.name());
}

let values = result.
}

查询数据可以通过两种方式:使用内建类型或 serde 序列化框架。

    // Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}

// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}

let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;

dbg!(records);
Ok(())

使用示例

创建数据库和表

    let db = "power";
// create database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;

// create table
taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
]).await?;

查看源码

注意:如果不使用 use db 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 db.tb。

插入数据

    let inserted = taos.exec("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ").await?;

println!("inserted: {} rows", inserted);

查看源码

查询数据

    let mut result = taos.query("SELECT * FROM power.meters").await?;

for field in result.fields() {
println!("got field: {}", field.name());
}

let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}

查看源码

执行带有 req_id 的 SQL

reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:

  • 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
  • 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
  • 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置

如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。

    let result = taos.query_with_req_id("SELECT * FROM power.meters", 0).await?;

查看源码

通过参数绑定写入数据

TDengine 的 Rust 连接器实现了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。

参数绑定接口详见API参考

use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let taos = TaosBuilder::from_dsn("taos://")?.build().await?;

taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;

let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;

const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d{}", i);
let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)];
stmt.set_tbname_tags(&table_name, &tags).await?;
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
stmt.bind(&values).await?;
}
stmt.add_batch().await?;
}

// execute.
let rows = stmt.execute().await?;
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
Ok(())
}

查看源码

无模式写入

TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见无模式写入

use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;

use crate::AsyncQueryable;
use crate::AsyncTBuilder;
use crate::TaosBuilder;

async fn put() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let dsn =
std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
log::debug!("dsn: {:?}", &dsn);

let client = TaosBuilder::from_dsn(dsn)?.build().await?;

let db = "power";

client.exec(format!("drop database if exists {db}")).await?;

client
.exec(format!("create database if not exists {db}"))
.await?;

// should specify database before insert
client.exec(format!("use {db}")).await?;

// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000",
]
.map(String::from)
.to_vec();

let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());

// SchemalessProtocol::Telnet
let data = [
"meters.current 1648432611249 10.3 location=California.SanFrancisco group=2",
]
.map(String::from)
.to_vec();

let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());

// SchemalessProtocol::Json
let data = [
r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
]
.map(String::from)
.to_vec();

let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());

client.exec(format!("drop database if exists {db}")).await?;

Ok(())
}

查看源码

执行带有 req_id 的无模式写入

此 req_id 可用于请求链路追踪。

let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.data(data)
.req_id(100u64)
.build()?;

client.put(&sml_data)?

数据订阅

TDengine 通过消息队列 TMQ 启动一个订阅。

创建 Topic

    taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;

查看源码

创建 Consumer

创建消费者:

    dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string());

let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;

查看源码

订阅消费数据

消费者可订阅一个或多个 TOPIC

    consumer.subscribe(["topic_meters"]).await?;

查看源码

TMQ 消息队列是一个 futures::Stream 类型,可以使用相应 API 对每个消息进行消费,并通过 .commit 进行已消费标记。

    {
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));

while let Some((offset, message)) = stream.try_next().await? {

let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);

match message {
MessageSet::Meta(meta) => {
log::info!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;

let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::info!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::info!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;

let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}

while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}

查看源码

获取消费进度:

版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0

    let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);

查看源码

指定订阅 Offset

按照指定的进度消费:

版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0

            let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}

查看源码

关闭订阅

    consumer.unsubscribe().await;

查看源码

对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,group.id 是必须的。

  • group.id: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
  • client.id: 可选的订阅客户端识别项。
  • auto.offset.reset: 可选初始化订阅起点, earliest 为从头开始订阅, latest 为仅从最新数据开始订阅,默认值根据 TDengine 版本有所不同,详细参见 数据订阅。注意,此选项在同一个 group.id 中仅生效一次。
  • enable.auto.commit: 当设置为 true 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
  • auto.commit.interval.ms: 自动标记的时间间隔。

完整示例

完整订阅示例参见 GitHub 示例文件.

与连接池使用

在复杂应用中,建议启用连接池。taos 的连接池默认(异步模式)使用 deadpool 实现。

如下,可以生成一个默认参数的连接池。

let pool: Pool<TaosBuilder> = TaosBuilder::from_dsn("taos:///")
.unwrap()
.pool()
.unwrap();

同样可以使用连接池的构造器,对连接池参数进行设置:

let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0)
.max_size(88) // 最大连接数
.build()
.unwrap();

在应用代码中,使用 pool.get()? 来获取一个连接对象 Taos

let taos = pool.get()?;

更多示例程序

示例程序源码位于 TDengine/examples/rust 下:

请参考:rust example

常见问题

请参考 FAQ

API 参考

Taos 对象提供了多个数据库操作的 API:

  1. exec: 执行某个非查询类 SQL 语句,例如 CREATEALTERINSERT 等。

    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
  2. exec_many: 同时(顺序)执行多个 SQL 语句。

    taos.exec_many([
    "CREATE DATABASE test",
    "USE test",
    "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
  3. query:执行查询语句,返回 [ResultSet] 对象。

    let mut q = taos.query("select * from log.logs").await?;

    [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):

    列信息使用 [.fields()] 方法获取:

    let cols = q.fields();
    for col in cols {
    println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
    }

    逐行获取数据:

    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
    for (col, (name, value)) in row.enumerate() {
    println!(
    "[{}] got value in col {} (named `{:>8}`): {}",
    nrows, col, name, value
    );
    }
    nrows += 1;
    }

    或使用 serde 序列化框架。

    #[derive(Debug, Deserialize)]
    struct Record {
    // deserialize timestamp to chrono::DateTime<Local>
    ts: DateTime<Local>,
    // float to f32
    current: Option<f32>,
    // int to i32
    voltage: Option<i32>,
    phase: Option<f32>,
    groupid: i32,
    // binary/varchar to String
    location: String,
    }

    let records: Vec<Record> = taos
    .query("select * from `meters`")
    .await?
    .deserialize()
    .try_collect()
    .await?;

需要注意的是,需要使用 Rust 异步函数和异步运行时。

Taos 提供部分 SQL 的 Rust 方法化以减少 format! 代码块的频率:

  • .describe(table: &str): 执行 DESCRIBE 并返回一个 Rust 数据结构。
  • .create_database(database: &str): 执行 CREATE DATABASE 语句。
  • .use_database(database: &str): 执行 USE 语句。

除此之外,该结构也是参数绑定和行协议接口的入口,使用方法请参考具体的 API 说明。

参数绑定接口

与 C 接口类似,Rust 提供参数绑定接口。首先,通过 Taos 对象创建一个 SQL 语句的参数绑定对象 Stmt

let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;

参数绑定对象提供了一组接口用于实现参数绑定:

.set_tbname(name)

用于绑定表名。

let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;

.set_tags(&[tag])

当 SQL 语句使用超级表时,用于绑定子表表名和标签值:

let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;

.bind(&[column])

用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:

let params = vec![
ColumnView::from_millis_timestamp(vec![164000000000]),
ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;

.execute()

执行 SQL。Stmt 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 .add_batch 加入到执行队列中。

stmt.execute()?;

// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;

一个可运行的示例请见 GitHub 上的示例

其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。