跳到主要内容

TDengine Rust Connector

Crates.io Crates.io docs.rs

taos 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。

该 Rust 连接器的源码托管在 GitHub

版本支持

请参考版本历史及支持列表

连接方式

taos 提供原生连接WebSocket连接两种方式,我们使用 WebSocket 连接方式访问 TDengine Cloud 实例。 它通过 taosAdapter 的 WebSocket 接口连接实例

关于如何建立连接的详细介绍请参考:开发指南-建立连接-Rust

安装

安装前准备

安装 Rust 开发工具链

添加 taos 依赖

taos 连接器使用WebSocket 方式连接TDengine Cloud 实例。需要在 Rust 项目中添加 taos 依赖,并启用wsws-rustls特性。

Cargo.toml 文件中添加 taos 并启用特性,以下两种方式都可以:

  • 启用默认特性
    [dependencies]
    taos = { version = "*"}
  • 禁用默认特性,并启用 ws 和 ws-rustls 特性
    [dependencies]
    taos = { version = "*", default-features = false, features = ["ws", "ws-rustls"] }

使用示例

IMPORTANT

在执行下面样例代码的之前,您必须先在 TDengine Cloud - 数据浏览器 页面创建一个名为 power 的数据库

建立连接

TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。DSN 由下面的格式组成wss://<host>?token=<token>

let builder = TaosBuilder::from_dsn(DSN)?;

现在您可以使用该对象创建连接:

let conn = builder.build()?;

连接对象可以创建多个:

let conn1 = builder.build()?;
let conn2 = builder.build()?;

插入数据

exec 方法执行某个非查询类 SQL 语句,例如 CREATEALTERINSERT 等。

exec
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
taos.exec("INSERT INTO
power.d1001 USING power.meters
TAGS('California.SanFrancisco', 2)
VALUES (NOW, 12.3, 219, 0.31000) (NOW - 1s, 12.60000, 218, 0.33000) (NOW - 3s, 12.30000, 221, 0.31000)
power.d1002 USING power.meters
TAGS('California.SanFrancisco', 3)
VALUES ('2018-10-03 14:39:16.650', 23.4, 218, 0.25000)
").await?;

查看源码

exec_many 方法同时(顺序)执行多个 SQL 语句。

exec_many
taos.use_database("power").await?;
taos.exec_many([
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)",
"CREATE TABLE IF NOT EXISTS d0 USING power.meters TAGS('California.LosAngles',0)",
"INSERT INTO d0 values(now - 10s, 10, 116, 0.32)",
"INSERT INTO d1 USING meters TAGS('California.SanFrancisco', 4) values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34)",
]).await?;

查看源码

查询数据

query 方法执行查询语句,返回 ResultSet 对象。

在这个例子里面,我们使用查询方法来执行 SQL ,然后获取到 ResultSet 对象。

let mut result = taos.query("SELECT * FROM power.meters limit 5").await?;

获取列的元数据。

ResultSet 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度, 列信息使用 [.fields()] 方法获取:

let fields = result.fields();
for column in fields {
println!("name: {}, type: {:?} , bytes: {}", column.name(), column.ty(), column.bytes());
}

获取前5行数据并输出每一行数据:

let rows = result.rows();
rows.try_for_each(|row| async {
println!("{}", row.into_value_iter().join(","));
Ok(())
}).await?;

查看源码

逐行逐列获取数据

let mut query_restult = taos.query("SELECT * FROM power.meters limit 5").await?;
let mut ressult_rows = query_restult.rows();
let mut nrows = 0;
while let Some(row) = ressult_rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}

查看源码

使用 serde 序列化框架

use anyhow::Result;
use chrono::{DateTime, Local};
use serde::Deserialize;
use taos::*;

#[derive(Debug, Deserialize)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}

#[tokio::main]
async fn main() -> Result<()> {
let dsn = std::env::var("TDENGINE_CLOUD_DSN")?;
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build().await?;

let records: Vec<Record> = taos.query("select * from power.meters limit 5")
.await?
.deserialize()
.try_collect()
.await?;
println!("length of records: {}", records.len());
records.iter().for_each(|record| {
println!("{:?}", record);
});
Ok(())
}



查看源码

连接池

在复杂应用中,建议启用连接池。taos 的连接池使用 r2d2 实现。

如下,可以生成一个默认参数的连接池。

let pool = TaosBuilder::from_dsn(dsn)?.pool()?;

在应用代码中,使用 pool.get()? 来获取一个连接对象 taos

let taos = pool.get()?;

API 参考