无模式写入
在物联网应用中,为了实现自动化管理、业务分析和设备监控等多种功能,通常需要采集大量的数据项。然而,由于应用逻辑的版本升级和设备自身的硬件调整等原因,数据采集项可能会频繁发生变化。为了应对这种挑战,TDengine 提供了无模式(schemaless)写入方式,旨在简化数据记录过程。
采用无模式写入方式,用户无须预先创建超级表或子表,因为 TDengine 会根据实际写入的数据自动创建相应的存储结构。此外,在必要时,无模式写入方式还能自动添加必要的数据列或标签列,确保用户写入的数据能够被正确存储。
值得注意的是,通过无模式写入方式创建的超级表及其对应的子表与通过 SQL 直接创建的超级表和子表在功能上没有区别,用户仍然可以使用 SQL 直接向其中写入数据。然而,由于无模式写入方式生成的表名是基于标签值按照固定的映射规则生成的,因此这些表名可能缺乏可读性,不易于理解。
采用无模式写入方式时会自动创建表,无须手动创建表。手动建表的话可能会出现未知的错误。
无模式写入行协议
TDengine 的无模式写入行协议兼容 InfluxDB 的行协议、OpenTSDB 的 TELNET 行协议和 OpenTSDB 的 JSON 格式协议。InfluxDB、OpenTSDB 的标准写入协议请参考各自的官方文档。
下面首先以 InfluxDB 的行协议为基础,介绍 TDengine 扩展的协议内容。该协议允许用户采用更加精细的方式控制(超级表)模式。采用一个字符串来表达一个数据行,可以向写入 API 中一次传入多行字符串来实现多个数据行的批量写入,其格式约定如下。
measurement,tag_set field_set timestamp
各参数说明如下。
- measurement 为数据表名,与 tag_set 之间使用一个英文逗号来分隔。
- tag_set 格式形如
<tag_key>=<tag_value>, <tag_key>=<tag_value>
,表示标签列数据,使用英文逗号分隔,与 field_set 之间使用一个半角空格分隔。 - field_set 格式形如
<field_key>=<field_value>, <field_key>=<field_value>
,表示普通列,同样使用英文逗号来分隔,与 timestamp 之间使用一个半角空格分隔。 - timestamp 为本行数据对应的主键时间戳。
- 无模式写入不支持含第二主键列的表的数据写入。
tag_set 中的所有的数据自动转化为 nchar 数据类型,并不需要使用双引号。 在无模式写入数据行协议中,field_set 中的每个数据项都需要对自身的数据类型进行描述,具体要求如下。
- 如果两边有英文双引号,表示 varchar 类型,例如 "abc"。
- 如果两边有英文双引号而且带有 L 或 l 前缀,表示 nchar 类型,例如 L" 报错信息 "。
- 如果两边有英文双引号而且带有 G 或 g 前缀,表示 geometry 类型,例如 G"Point(4.343 89.342)"。
- 如果两边有英文双引号而且带有 B 或 b 前缀,表示 varbinary 类型,双引号内可以为 \x 开头的十六进制或者字符串,例如 B"\x98f46e" 和 B"hello"。
- 对于空格、等号(=)、逗号(,)、双引号(")、反斜杠(\),前面需要使用反斜杠(\)进行转义(均为英文半角符号)。无模式写入协议的域转义规则如下表所示。
序号 | 域 | 需转义字符 |
---|---|---|
1 | 超级表名 | 逗号,空格 |
2 | 标签名 | 逗号,等号,空格 |
3 | 标签值 | 逗号,等号,空格 |
4 | 列名 | 逗号,等号,空格 |
5 | 列值 | 双引号,反斜杠 |
如果使用两个连续的反斜杠,则第 1 个反斜杠作为转义符,当只有一个反斜杠时则无须转义。无模式写入协议的反斜杠转义规则如下表所示。
序号 | 反斜杠 | 转义为 |
---|---|---|
1 | \ | \ |
2 | \\ | \ |
3 | \\\ | \\ |
4 | \\\\ | \\ |
5 | \\\\\ | \\\ |
6 | \\\\\\ | \\\ |
数值类型将通过后缀来区分数据类型。无模式写入协议的数值类型转义规则如下表所示。
序号 | 后缀 | 映射类型 | 大小(字节) |
---|---|---|---|
1 | 无或 f64 | double | 8 |
2 | f32 | float | 4 |
3 | i8/u8 | TinyInt/UTinyInt | 1 |
4 | i16/u16 | SmallInt/USmallInt | 2 |
5 | i32/u32 | Int/UInt | 4 |
6 | i64/i/u64/u | BigInt/BigInt/UBigInt/UBigInt | 8 |
- t、T、true、True、TRUE、f、F、false、False 将直接作为 BOOL 型来处理。
例如如下数据行表示:向名为 st 的超级表下的 t1 标签为 "3"(NCHAR)、t2 标签为 "4"(NCHAR)、t3 标签为 "t3"(NCHAR)的数据子表,写入 c1 列为 3(BIGINT)、c2 列为 false(BOOL)、c3 列为 "passit"(BINARY)、c4 列为 4(DOUBLE)、主键时间戳为 1626006833639000000 的一行数据。
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000
需要注意的是,如果描述数据类型后缀时出现大小写错误,或者为数据指定的数据类型有误,均可能引发报错提示而导致数据写入失败。
TDengine 提供数据写入的幂等性保证,即您可以反复调用 API 进行出错数据的写入操作。但是不提供多行数据写入的原子性保证。即在多行数据一批次写入过程中,会出现部分数据写入成功,部分数据写入失败的情况。
无模式写入处理规则
无模式写入按照如下原则来处理行数据:
-
将使用如下规则来生成子表名:首先将 measurement 的名称和标签的 key 和 value 组合成为如下的字符串
"measurement,tag_key1=tag_value1,tag_key2=tag_value2"
-
需要注意的是,这里的 tag_key1、tag_key2 并不是用户输入的标签的原始顺序,而是使用了标签名称按照字符串升序排列后的结果。所以,tag_key1 并不是在行协议中输入的第一个标签。 排列完成以后计算该字符串的 MD5 散列值 "md5_val"。然后将计算的结果与字符串组合生成表名:"t_md5_val"。其中的 "t_" 是固定的前缀,每个通过该映射关系自动生成的表都具有该前缀。
-
如果不想用自动生成的表名,有两种指定子表名的方式(第一种优先级更高)。
- 通过在 taos.cfg 里配置 smlAutoChildTableNameDelimiter 参数来指定(
@ # 空格 回车 换行 制表符
除外),例如配置 smlAutoChildTableNameDelimiter=- 插入数据为 st,t0=cpu1,t1=4 c1=3 1626006833639000000 则创建的表名为 cpu1-4。 - 通过在 taos.cfg 里配置 smlChildTableName 参数来指定,例如配置 smlChildTableName=tname 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的表名为 cpu1,注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略。
- 通过在 taos.cfg 里配置 smlAutoChildTableNameDelimiter 参数来指定(
-
-
如果解析行协议获得的超级表不存在,则会创建这个超级表(不建议手动创建超级表,不然插入数据可能异常)。
-
如果解析行协议获得子表不存在,则 schemaless 会按照步骤 1 或 2 确定的子表名来创建子表。
-
如果数据行中指定的标签列或普通列不存在,则在超级表中增加对应的标签列或普通列(只增不减)。
-
如果超级表中存在一些标签列或普通列未在一个数据行中被指定取值,那么这些列的值在这一行中会被置为 NULL。
-
对 BINARY 或 NCHAR 列,如果数据行中所提供值的长度超出了列类型的限制,自动增加该列允许存储的字符长度上限(只增不减),以保证数据的完整保存。
-
整个处理过程中遇到的错误会中断写入过程,并返回错误代码。
-
为了提高写入的效率,默认假设同一个超级表中 field_set 的顺序是一样的(第一条数据包含所有的 field,后面的数据按照这个顺序),如果顺序不一样,需要配置参数 smlDataFormat 为 false,否则,数据写入按照相同顺序写入,库中数据会异常,从 3.0.3.0 开始,自动检测顺序是否一致,该配置废弃。
-
由于 sql 建表表名不支持点号(.),所以 schemaless 也对点号(.)做了处理,如果 schemaless 自动建表的表名如果有点号(.),会自动替换为下划线(_)。如果手动指定子表名的话,子表名里有点号(.),同样转化为下划线(_)。
-
taos.cfg 增加 smlTsDefaultName 配置(字符串类型),只在 client 端起作用,配置后,schemaless 自动建表的时间列名字可以通过该配置设置。不配置的话,默认为 _ts。
-
无模式写入的数据超级表或子表名区分大小写。
-
无模式写入仍然遵循 TDengine 对数据结构的底层限制,例如每行数据的总长度不能超过 48KB(从 3.0.5.0 版本开始为 64KB),标签值的总长度不超过 16KB。
时间分辨率识别
无模式写入支持 3 个指定的模式,如下表所示:
序号 | 值 | 说明 |
---|---|---|
1 | SML_LINE_PROTOCOL | InfluxDB 行协议(Line Protocol) |
2 | SML_TELNET_PROTOCOL | OpenTSDB 文本行协议 |
3 | SML_JSON_PROTOCOL | JSON 协议格式 |
在 SML_LINE_PROTOCOL 解析模式下,需要用户指定输入的时间戳的时间分辨率。可用的时间分辨率如下表所示:
序号 | 时间分辨率定义 | 含义 |
---|---|---|
1 | TSDB_SML_TIMESTAMP_NOT_CONFIGURED | 未定义(无效) |
2 | TSDB_SML_TIMESTAMP_HOURS | 小时 |
3 | TSDB_SML_TIMESTAMP_MINUTES | 分钟 |
4 | TSDB_SML_TIMESTAMP_SECONDS | 秒 |
5 | TSDB_SML_TIMESTAMP_MILLI_SECONDS | 毫秒 |
6 | TSDB_SML_TIMESTAMP_MICRO_SECONDS | 微秒 |
7 | TSDB_SML_TIMESTAMP_NANO_SECONDS | 纳秒 |
在 SML_TELNET_PROTOCOL 和 SML_JSON_PROTOCOL 模式下,根据时间戳的长度来确定时间精度(与 OpenTSDB 标准操作方式相同),此时会忽略用户指定的时间分辨率。
数据模式映射规则
InfluxDB 行协议的数据将被映射成具有模式的数据,其中,measurement 映射为超级表名称,tag_set 中的标签名称映射为数据模式中的标签名,field_set 中的名称映射为列名称。例如下面的数据。
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000
该行数据映射生成一个超级表:st, 其包含了 3 个类型为 nchar 的标签,分别是:t1、t2、t3。五个数据列,分别是 ts(timestamp)、c1 (bigint)、c3(binary)、c2 (bool)、c4 (bigint)。映射成为如下 SQL 语句:
create stable st (_ts timestamp, c1 bigint, c2 bool, c3 binary(6), c4 bigint) tags(t1 nchar(1), t2 nchar(1), t3 nchar(2))
数据模式变更处理
本节将说明不同行数据写入情况下,对于数据模式的影响。
在使用行协议写入一个明确的标识的字段类型的时候,后续更改该字段的类型定义,会出现明确的数据模式错误,即会触发写入 API 报告错误。如下所示,
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4i 1626006833640000000
第一行的数据类型映射将 c4 列定义为 Double, 但是第二行的数据又通过数值后缀方式声明该列为 bigInt, 由此会触发无模式写入的解析错误。
如果列前面的行协议将数据列声明为了 binary, 后续的要求长度更长的 binary 长度,此时会触发超级表模式的变更。
st,t1=3,t2=4,t3=t3 c1=3i64,c5="pass" 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c5="passit" 1626006833640000000
第一行中行协议解析会声明 c5 列是一个 binary(4)的字段,第二次行数据写入会提取列 c5 仍然是 binary 列,但是其宽度为 6,此时需要将 binary 的宽度增加到能够容纳 新字符串的宽度。
st,t1=3,t2=4,t3=t3 c1=3i64 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c6="passit" 1626006833640000000
第二行数据相对于第一行来说增加了一个列 c6,类型为 binary(6)。那么此时会自动增加一个列 c6, 类型为 binary(6)。
无模式写入示例
下面以智能电表为例,介绍各语言连接器使用无模式写入接口写入数据的代码样例,包含了三种协议: InfluxDB 的行协议、OpenTSDB 的 TELNET 行协议和 OpenTSDB 的 JSON 格式协议。
- 因为无模式写入自动建表规则与之前执行 SQL 样例中不同,因此运行代码样例前请确保
meters
、metric_telnet
和metric_json
表不存在。 - OpenTSDB 的 TELNET 行协议和 OpenTSDB 的 JSON 格式协议只支持一个数据列,因此我们采用了其他示例。
WebSocket 连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class SchemalessWsTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS-WS://" + host + ":6041?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(url)) {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
执行带有 reqId 的无模式写入,最后一个参数 reqId 可用于请求链路追踪。
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
import taosws
host = "localhost"
port = 6041
def prepare():
conn = None
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
# create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
assert rowsAffected == 0
except Exception as err:
print(f"Failed to create db and table, db addrr:{host}:{port} ; ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
def schemaless_insert():
conn = None
lineDemo = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port,
database='power')
conn.schemaless_insert(
lines = lineDemo,
protocol = taosws.PySchemalessProtocol.Line,
precision = taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=1,
)
conn.schemaless_insert(
lines=telnetDemo,
protocol=taosws.PySchemalessProtocol.Telnet,
precision=taosws.PySchemalessPrecision.Microsecond,
ttl=1,
req_id=2,
)
conn.schemaless_insert(
lines=jsonDemo,
protocol=taosws.PySchemalessProtocol.Json,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=3,
)
print("Inserted data with schemaless successfully.");
except Exception as err:
print(f"Failed to insert data with schemaless, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
if __name__ == "__main__":
prepare()
schemaless_insert()
package main
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/taosdata/driver-go/v3/common"
_ "github.com/taosdata/driver-go/v3/taosWS"
"github.com/taosdata/driver-go/v3/ws/schemaless"
)
func main() {
host := "127.0.0.1"
lineDemo := "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
telnetDemo := "metric_telnet 1707095283260 4 host=host0 interface=eth0"
jsonDemo := "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
taosDSN := fmt.Sprintf("root:taosdata@ws(%s:6041)/", host)
db, err := sql.Open("taosWS", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
defer db.Close()
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
s, err := schemaless.NewSchemaless(schemaless.NewConfig("ws://localhost:6041", 1,
schemaless.SetDb("power"),
schemaless.SetReadTimeout(10*time.Second),
schemaless.SetWriteTimeout(10*time.Second),
schemaless.SetUser("root"),
schemaless.SetPassword("taosdata"),
))
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
// insert influxdb line protocol
err = s.Insert(lineDemo, schemaless.InfluxDBLineProtocol, "ms", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + lineDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb telnet line protocol
err = s.Insert(telnetDemo, schemaless.OpenTSDBTelnetLineProtocol, "ms", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data: " + telnetDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb json format protocol
err = s.Insert(jsonDemo, schemaless.OpenTSDBJsonFormatProtocol, "s", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data: " + jsonDemo + ", ErrMessage: " + err.Error())
}
fmt.Println("Inserted data with schemaless successfully.")
}
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let host = "localhost";
let dsn = format!("ws://{}:6041/power", host);
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Json
let data = [
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
println!("Inserted data with schemaless successfully.");
Ok(())
}
const taos = require("@tdengine/websocket");
let influxdbData = ["meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"];
let jsonData = ["{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"]
let telnetData = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"];
async function createConnect() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
return wsSql;
}
async function test() {
let wsSql = null;
let wsRows = null;
let ttl = 0;
try {
wsSql = await createConnect()
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
console.log("Inserted data with schemaless successfully.")
}
catch (err) {
console.error(`Failed to insert data with schemaless, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}
test()
public static void Main(string[] args)
{
var host = "127.0.0.1";
var lineDemo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
var telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
var jsonDemo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
try
{
var builder =
new ConnectionStringBuilder(
$"protocol=WebSocket;host={host};port=6041;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// insert influx line protocol data
client.SchemalessInsert(new[] { lineDemo }, TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert opentsdb telnet protocol data
client.SchemalessInsert(new[] { telnetDemo }, TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert json data
client.SchemalessInsert(new[] { jsonDemo }, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId());
}
Console.WriteLine("Inserted data with schemaless successfully.");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert data with schemaless, ErrCode: " + e.Code +
", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert data with schemaless, ErrMessage: " + e.Message);
throw;
}
}
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// create database
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// use database
result = ws_query(taos, "USE power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// schemaless demo data
char *line_demo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
"1626006833639";
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
char *json_demo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// influxdb line protocol
char *lines[] = {line_demo};
int totalLines = 0;
result = ws_schemaless_insert_raw(taos, line_demo, strlen(line_demo), &totalLines, WS_TSDB_SML_LINE_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb telnet protocol
totalLines = 0;
result = ws_schemaless_insert_raw(taos, telnet_demo, strlen(telnet_demo), &totalLines, WS_TSDB_SML_TELNET_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
totalLines = 0;
result = ws_schemaless_insert_raw(taos, json_demo, strlen(json_demo), &totalLines, WS_TSDB_SML_JSON_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
free(jsons[0]);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", totalLines);
ws_free_result(result);
// close & clean
ws_close(taos);
return 0;
不支持
原生连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class SchemalessJniTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
执行带有 reqId 的无模式写入,最后一个参数 reqId 可用于请求链路追踪。
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
import taos
lineDemo = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
host = "localhost"
port = 6030
try:
conn = taos.connect(
user="root",
password="taosdata",
host=host,
port=port
)
conn.execute("CREATE DATABASE IF NOT EXISTS power")
# change database. same as execute "USE db"
conn.select_db("power")
conn.schemaless_insert(
lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
)
conn.schemaless_insert(
telnetDemo, taos.SmlProtocol.TELNET_PROTOCOL, taos.SmlPrecision.MICRO_SECONDS
)
conn.schemaless_insert(
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
)
print("Inserted data with schemaless successfully.");
except Exception as err:
print(f"Failed to insert data with schemaless, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
func main() {
host := "127.0.0.1"
lineDemo := "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
telnetDemo := "metric_telnet 1707095283260 4 host=host0 interface=eth0"
jsonDemo := "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
conn, err := af.Open(host, "root", "taosdata", "", 0)
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
defer conn.Close()
_, err = conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = conn.Exec("USE power")
if err != nil {
log.Fatalln("Failed to use database power, ErrMessage: " + err.Error())
}
// insert influxdb line protocol
err = conn.InfluxDBInsertLines([]string{lineDemo}, "ms")
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + lineDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb telnet protocol
err = conn.OpenTSDBInsertTelnetLines([]string{telnetDemo})
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + telnetDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb json protocol
err = conn.OpenTSDBInsertJsonPayload(jsonDemo)
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + jsonDemo + ", ErrMessage: " + err.Error())
}
fmt.Println("Inserted data with schemaless successfully.")
}
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let host = "localhost";
let dsn = format!("taos://{}:6030", host);
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Json
let data = [
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
println!("Inserted data with schemaless successfully.");
Ok(())
}
不支持
public static void Main(string[] args)
{
var host = "127.0.0.1";
var lineDemo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
var telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
var jsonDemo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
try
{
var builder =
new ConnectionStringBuilder(
$"host={host};port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// insert influx line protocol data
client.SchemalessInsert(new[]{lineDemo}, TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert opentsdb telnet protocol data
client.SchemalessInsert(new[]{telnetDemo}, TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert json data
client.SchemalessInsert(new []{jsonDemo}, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId());
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert data with schemaless, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert data with schemaless, ErrMessage:" + e.Message);
throw;
}
}
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// create database
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// use database
result = taos_query(taos, "USE power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// schemaless demo data
char *line_demo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
"1626006833639";
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
char *json_demo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// influxdb line protocol
char *lines[] = {line_demo};
result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
int rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", rows);
taos_free_result(result);
// opentsdb telnet protocol
char *telnets[] = {telnet_demo};
result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", rows);
taos_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
size_t size = 1024;
jsons[0] = malloc(size);
if (jsons[0] == NULL) {
fprintf(stderr, "Failed to allocate memory: %zu bytes.\n", size);
taos_close(taos);
taos_cleanup();
return -1;
}
(void)strncpy(jsons[0], json_demo, 1023);
result = taos_schemaless_insert(taos, jsons, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
code = taos_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
free(jsons[0]);
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
不支持
查询写入的数据
运行上节的样例代码,会在 power 数据库中自动建表,我们可以通过 TDengine CLI 或者应用程序来查询数据。下面给出用 TDengine CLI 查询超级表和 meters 表数据的样例。
taos> show power.stables;
stable_name |
=================================
meter_current |
stb0_0 |
meters |
Query OK, 3 row(s) in set (0.002527s)
taos> select * from power.meters limit 1 \G;
*************************** 1.row ***************************
_ts: 2021-07-11 20:33:53.639
current: 10.300000199999999
voltage: 219
phase: 0.310000000000000
groupid: 2
location: California.SanFrancisco
Query OK, 1 row(s) in set (0.004501s)