Skip to main content

TDengine Rust Connector

Crates.io Crates.io docs.rs

taos 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。

taos 提供两种建立连接的方式。一种是原生连接,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 Websocket 连接,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 features)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。

该 Rust 连接器的源码托管在 GitHub

支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。

版本历史

Rust 连接器版本TDengine 版本主要功能
v0.8.103.0.5.0 or later消息订阅:获取消费进度及按照指定进度开始消费。
v0.8.03.0.4.0支持无模式写入。
v0.7.63.0.3.0支持在请求中使用 req_id。
v0.6.03.0.0.0基础功能。

Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。

安装

安装前准备

  • 安装 Rust 开发工具链
  • 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动

添加 taos 依赖

根据选择的连接方式,按照如下说明在 Rust 项目中添加 taos 依赖:

Cargo.toml 文件中添加 taos

[dependencies]
# use default feature
taos = "*"

建立连接

TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。

let builder = TaosBuilder::from_dsn("taos://")?;

现在您可以使用该对象创建连接:

let conn = builder.build()?;

连接对象可以创建多个:

let conn1 = builder.build()?;
let conn2 = builder.build()?;

DSN 描述字符串基本结构如下:

<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |

各部分意义见下表:

  • driver: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
    • taos: 表名使用 TDengine 连接器驱动。
    • tmq: 使用 TMQ 订阅数据。
    • http/ws: 使用 Websocket 创建连接。
    • https/wss: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
  • protocol: 显示指定以何种方式建立连接,例如:taos+ws://localhost:6041 指定以 Websocket 方式建立连接。
  • username/password: 用于创建连接的用户名及密码。
  • host/port: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(taos://),原生连接默认为 localhost:6030,Websocket 连接默认为 localhost:6041
  • database: 指定默认连接的数据库名,可选参数。
  • params:其他可选参数。

一个完整的 DSN 描述字符串示例如下:

taos+ws://localhost:6041/test

表示使用 Websocket(ws)方式通过 6041 端口连接服务器 localhost,并指定默认数据库为 test

这使得用户可以通过 DSN 指定连接方式:

use taos::*;

// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();

// use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;

建立连接后,您可以进行相关数据库操作:

async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;

let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;

assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;

for field in result.fields() {
println!("got field: {}", field.name());
}

let values = result.
}

查询数据可以通过两种方式:使用内建类型或 serde 序列化框架。

    // Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}

// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}

let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;

dbg!(records);
Ok(())

使用示例

写入数据

SQL 写入

use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build()?;

taos.exec_many([
"DROP DATABASE IF EXISTS power",
"CREATE DATABASE power",
"USE power",
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
]).await?;

let inserted = taos.exec("INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000)
('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2)
VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3)
VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)").await?;

assert_eq!(inserted, 8);
Ok(())
}

查看源码

STMT 写入

use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let taos = TaosBuilder::from_dsn("taos://")?.build()?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;

let mut stmt = Stmt::init(&taos)?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
// bind table name and tags
stmt.set_tbname_tags(
"d1001",
&[
Value::VarChar("California.SanFransico".into()),
Value::Int(2),
],
)?;
// bind values.
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249]),
ColumnView::from_floats(vec![10.3]),
ColumnView::from_ints(vec![219]),
ColumnView::from_floats(vec![0.31]),
];
stmt.bind(&values)?;
// bind one more row
let values2 = vec![
ColumnView::from_millis_timestamp(vec![1648432611749]),
ColumnView::from_floats(vec![12.6]),
ColumnView::from_ints(vec![218]),
ColumnView::from_floats(vec![0.33]),
];
stmt.bind(&values2)?;

stmt.add_batch()?;

// execute.
let rows = stmt.execute()?;
assert_eq!(rows, 2);
Ok(())
}

查看源码

Schemaless 写入

use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;

use crate::AsyncQueryable;
use crate::AsyncTBuilder;
use crate::TaosBuilder;

async fn put_line() -> anyhow::Result<()> {
// std::env::set_var("RUST_LOG", "taos=trace");
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();

let dsn =
std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
log::debug!("dsn: {:?}", &dsn);

let client = TaosBuilder::from_dsn(dsn)?.build().await?;

let db = "demo_schemaless_ws";

client.exec(format!("drop database if exists {db}")).await?;

client
.exec(format!("create database if not exists {db}"))
.await?;

// should specify database before insert
client.exec(format!("use {db}")).await?;

let data = [
"measurement,host=host1 field1=2i,field2=2.0 1577837300000",
"measurement,host=host1 field1=2i,field2=2.0 1577837400000",
"measurement,host=host1 field1=2i,field2=2.0 1577837500000",
"measurement,host=host1 field1=2i,field2=2.0 1577837600000",
]
.map(String::from)
.to_vec();

// demo with all fields
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());

// demo with default ttl
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.req_id(101u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());

// demo with default ttl and req_id
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.build()?;
assert_eq!(client.put(&sml_data).await?, ());

// demo with default precision
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.data(data)
.req_id(103u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());

client.exec(format!("drop database if exists {db}")).await?;

Ok(())
}

查看源码

查询数据

use taos::sync::*;

fn main() -> anyhow::Result<()> {
let taos = TaosBuilder::from_dsn("ws:///power")?.build()?;
let mut result = taos.query("SELECT ts, current FROM meters LIMIT 2")?;
// print column names
let meta = result.fields();
println!("{}", meta.iter().map(|field| field.name()).join("\t"));

// print rows
let rows = result.rows();
for row in rows {
let row = row?;
for (_name, value) in row {
print!("{}\t", value);
}
println!();
}
Ok(())
}

// output(suppose you are in +8 timezone):
// ts current
// 2018-10-03T14:38:05+08:00 10.3
// 2018-10-03T14:38:15+08:00 12.6

查看源码

API 参考

连接构造器

通过 DSN 来构建一个连接器构造器。

let cfg = TaosBuilder::default().build()?;

使用 builder 对象创建多个连接:

let conn: Taos = cfg.build();

连接池

在复杂应用中,建议启用连接池。taos 的连接池默认(异步模式)使用 deadpool 实现。

如下,可以生成一个默认参数的连接池。

let pool: Pool<TaosBuilder> = TaosBuilder::from_dsn("taos:///")
.unwrap()
.pool()
.unwrap();

同样可以使用连接池的构造器,对连接池参数进行设置:

let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0)
.max_size(88) // 最大连接数
.build()
.unwrap();

在应用代码中,使用 pool.get()? 来获取一个连接对象 Taos

let taos = pool.get()?;

连接

Taos 对象提供了多个数据库操作的 API:

  1. exec: 执行某个非查询类 SQL 语句,例如 CREATEALTERINSERT 等。

    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
  2. exec_many: 同时(顺序)执行多个 SQL 语句。

    taos.exec_many([
    "CREATE DATABASE test",
    "USE test",
    "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
  3. query:执行查询语句,返回 [ResultSet] 对象。

    let mut q = taos.query("select * from log.logs").await?;

    [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):

    列信息使用 [.fields()] 方法获取:

    let cols = q.fields();
    for col in cols {
    println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
    }

    逐行获取数据:

    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
    for (col, (name, value)) in row.enumerate() {
    println!(
    "[{}] got value in col {} (named `{:>8}`): {}",
    nrows, col, name, value
    );
    }
    nrows += 1;
    }

    或使用 serde 序列化框架。

    #[derive(Debug, Deserialize)]
    struct Record {
    // deserialize timestamp to chrono::DateTime<Local>
    ts: DateTime<Local>,
    // float to f32
    current: Option<f32>,
    // int to i32
    voltage: Option<i32>,
    phase: Option<f32>,
    groupid: i32,
    // binary/varchar to String
    location: String,
    }

    let records: Vec<Record> = taos
    .query("select * from `meters`")
    .await?
    .deserialize()
    .try_collect()
    .await?;

需要注意的是,需要使用 Rust 异步函数和异步运行时。

Taos 提供部分 SQL 的 Rust 方法化以减少 format! 代码块的频率:

  • .describe(table: &str): 执行 DESCRIBE 并返回一个 Rust 数据结构。
  • .create_database(database: &str): 执行 CREATE DATABASE 语句。
  • .use_database(database: &str): 执行 USE 语句。

除此之外,该结构也是 参数绑定行协议接口 的入口,使用方法请参考具体的 API 说明。

参数绑定接口

与 C 接口类似,Rust 提供参数绑定接口。首先,通过 Taos 对象创建一个 SQL 语句的参数绑定对象 Stmt

let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;

参数绑定对象提供了一组接口用于实现参数绑定:

.set_tbname(name)

用于绑定表名。

let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;

.set_tags(&[tag])

当 SQL 语句使用超级表时,用于绑定子表表名和标签值:

let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;

.bind(&[column])

用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:

let params = vec![
ColumnView::from_millis_timestamp(vec![164000000000]),
ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;

.execute()

执行 SQL。Stmt 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 .add_batch 加入到执行队列中。

stmt.execute()?;

// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;

一个可运行的示例请见 GitHub 上的示例

订阅

TDengine 通过消息队列 TMQ 启动一个订阅。

从 DSN 开始,构建一个 TMQ 连接器。

let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;

创建消费者:

let mut consumer = tmq.build()?;

消费者可订阅一个或多个 TOPIC

consumer.subscribe(["tmq_meters"]).await?;

TMQ 消息队列是一个 futures::Stream 类型,可以使用相应 API 对每个消息进行消费,并通过 .commit 进行已消费标记。

{
let mut stream = consumer.stream();

while let Some((offset, message)) = stream.try_next().await? {
// get information from offset

// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");

if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}

获取消费进度:

版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0

let assignments = consumer.assignments().await.unwrap();

按照指定的进度消费:

版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0

consumer.offset_seek(topic, vgroup_id, offset).await;

停止订阅:

consumer.unsubscribe().await;

对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,group.id 是必须的。

  • group.id: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
  • client.id: 可选的订阅客户端识别项。
  • auto.offset.reset: 可选初始化订阅起点, earliest 为从头开始订阅, latest 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 group.id 中仅生效一次。
  • enable.auto.commit: 当设置为 true 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
  • auto.commit.interval.ms: 自动标记的时间间隔。

完整订阅示例参见 GitHub 示例文件.

其他相关结构体 API 使用说明请移步 Rust 文档托管网页:https://docs.rs/taos