TDengine Rust Connector
taos
是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
该 Rust 连接器的源码托管在 GitHub。
版本支持
请参考版本历史及支持列表
连接方式
taos
提供原生连接和WebSocket连接两种方式,我们使用 WebSocket 连接方式访问 TDengine Cloud 实例。
它通过 taosAdapter 的 WebSocket 接口连接实例
关于如何建立连接的详细介绍请参考:开发指南-建立连接-Rust
安装
安装前准备
安装 Rust 开发工具链
添加 taos 依赖
taos
连接器使用WebSocket 方式连接TDengine Cloud 实例。需要在 Rust 项目中添加 taos 依赖,并启用ws
和ws-rustls
特性。
在 Cargo.toml
文件中添加 taos 并启用特性,以下两种方式都可以:
- 启用默认特性
[dependencies]
taos = { version = "*"} - 禁用默认特性,并启用 ws 和 ws-rustls 特性
[dependencies]
taos = { version = "*", default-features = false, features = ["ws", "ws-rustls"] }
使用示例
在执行下面样例代码的之前,您必须先在 TDengine Cloud - 数据浏览器 页面创建一个名为 power 的数据库
建立连接
TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。DSN 由下面的格式组成wss://<host>?token=<token>
。
let builder = TaosBuilder::from_dsn(DSN)?;
现在您可以使用该对象创建连接:
let conn = builder.build()?;
连接对象可以创建多个:
let conn1 = builder.build()?;
let conn2 = builder.build()?;
插入数据
exec
方法执行某个非查询类 SQL 语句,例如 CREATE
,ALTER
,INSERT
等。
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
taos.exec("INSERT INTO
power.d1001 USING power.meters
TAGS('California.SanFrancisco', 2)
VALUES (NOW, 12.3, 219, 0.31000) (NOW - 1s, 12.60000, 218, 0.33000) (NOW - 3s, 12.30000, 221, 0.31000)
power.d1002 USING power.meters
TAGS('California.SanFrancisco', 3)
VALUES ('2018-10-03 14:39:16.650', 23.4, 218, 0.25000)
").await?;
exec_many
方法同时(顺序)执行多个 SQL 语句。
taos.use_database("power").await?;
taos.exec_many([
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)",
"CREATE TABLE IF NOT EXISTS d0 USING power.meters TAGS('California.LosAngles',0)",
"INSERT INTO d0 values(now - 10s, 10, 116, 0.32)",
"INSERT INTO d1 USING meters TAGS('California.SanFrancisco', 4) values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34)",
]).await?;
查询数据
query
方法执行查询语句,返回 ResultSet 对象。
在这个例子里面,我们使用查询方法来执行 SQL ,然后获取到 ResultSet 对象。
let mut result = taos.query("SELECT * FROM power.meters limit 5").await?;
获取列的元数据。
ResultSet 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度, 列信息使用 [.fields()] 方法获取:
let fields = result.fields();
for column in fields {
println!("name: {}, type: {:?} , bytes: {}", column.name(), column.ty(), column.bytes());
}
获取前5行数据并输出每一行数据:
let rows = result.rows();
rows.try_for_each(|row| async {
println!("{}", row.into_value_iter().join(","));
Ok(())
}).await?;
逐行逐列获取数据
let mut query_restult = taos.query("SELECT * FROM power.meters limit 5").await?;
let mut ressult_rows = query_restult.rows();
let mut nrows = 0;
while let Some(row) = ressult_rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
使用 serde 序列化框架
use anyhow::Result;
use chrono::{DateTime, Local};
use serde::Deserialize;
use taos::*;
#[derive(Debug, Deserialize)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let dsn = std::env::var("TDENGINE_CLOUD_DSN")?;
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build().await?;
let records: Vec<Record> = taos.query("select * from power.meters limit 5")
.await?
.deserialize()
.try_collect()
.await?;
println!("length of records: {}", records.len());
records.iter().for_each(|record| {
println!("{:?}", record);
});
Ok(())
}
连接池
在复杂应用中,建议启用连接池。taos 的连接池使用 r2d2 实现。
如下,可以生成一个默认参数的连接池。
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
在应用代码中,使用 pool.get()?
来获取一个连接对象 taos
。
let taos = pool.get()?;