TDengine Node.js Connector
@tdengine/websocket
是 TDengine 的官方 Node.js 语言连接器。Node.js 开发人员可以通过它开发存取 TDengine 数据库的的应用软件。
Node.js 连接器源码托管在 GitHub。
连接方式
Node.js 连接器目前仅支持 Websocket 连接器, 其通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例。
连接方式的详细介绍请参考:连接器建立连接的方式
支持的平台
支持 Node.js 14及以上版本。
版本历史
Node.js 连接器 版本 | 主要变化 | TDengine 版本 |
---|---|---|
3.1.0 | 新版本发布,支持 WebSocket 连接 | 3.2.0.0 及更高版本 |
支持的功能特性
- 连接管理
- SQL写入
- SQL查询
- 参数绑定
- 数据订阅
- 无模式写入
处理异常
在调用连接器 api 报错后,通过 try catch 可以获取到错误的信息和错误码。
错误说明:Node.js 连接器错误码在 100 到 110 之间,之外的错误为 TDengine 其他功能模块的报错。
具体的连接器错误码请参考:
Error Code | Description | Suggested Actions |
---|---|---|
100 | invalid variables | 参数不合法,请检查相应接口规范,调整参数类型及大小。 |
101 | invalid url | url 错误,请检查 url 是否填写正确。 |
102 | received server data but did not find a callback for processing | 接收到服务端数据但没有找到上层回调 |
103 | invalid message type | 接收到的消息类型无法识别,请检查服务端是否正常。 |
104 | connection creation failed | 连接创建失败,请检查网络是否正常。 |
105 | websocket request timeout | 请求超时 |
106 | authentication fail | 认证失败,请检查用户名,密码是否正确。 |
107 | unknown sql type in tdengine | 请检查 TDengine 支持的 Data Type 类型。 |
108 | connection has been closed | 连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。 |
109 | fetch block data parse fail | 获取到的查询数据,解析失败 |
110 | websocket connection has reached its maximum limit | Websocket 连接达到上限 |
类型映射
下表为 TDengine DataType 和 Node.js DataType 之间的映射关系
TDengine DataType | Node.js DataType |
---|---|
TIMESTAMP | bigint |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | bigint |
TINYINT UNSIGNED | number |
SMALLINT UNSIGNED | number |
INT UNSIGNED | number |
BIGINT UNSIGNED | bigint |
FLOAT | number |
DOUBLE | number |
BOOL | boolean |
BINARY | string |
NCHAR | string |
JSON | string |
VARBINARY | ArrayBuffer |
GEOMETRY | ArrayBuffer |
注意:JSON 类型仅在 tag 中支持。
安装步骤
安装前准备
- 安装 Node.js 开发环境, 使用14以上版本。下载链接: https://nodejs.org/en/download/
使用 npm 安装 Node.js 连接器
npm install @tdengine/websocket
安装验证
验证方法:
-
新建安装验证目录,例如:
~/tdengine-test
,下载 GitHub 上 nodejsChecker.js 源代码到本地。 -
在命令行中执行以下命令。
npm init -y
npm install @tdengine/websocket
node nodejsChecker.js
- 执行以上步骤后,在命令行会输出 nodeChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
建立连接
安装并引用 @tdengine/websocket
包。
注意:
- 链接器使用结束后,需要调用 taos.destroy() 释放连接器资源
const taos = require("@tdengine/websocket");
//数据库操作......
taos.destroy();
WSConfig配置Websocket参数如下:
getToken(): string | undefined | null;
setToken(token: string): void;
getUser(): string | undefined | null;
setUser(user: string): void;
getPwd(): string | undefined | null;
setPwd(pws: string): void;
getDb(): string | undefined | null;
setDb(db: string): void;
getUrl(): string;
setUrl(url: string): void;
setTimeOut(ms: number): void;
getTimeOut(): number | undefined | null;
async function createConnect() {
let dsn = 'ws://localhost:6041';
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
return await taos.sqlConnect(conf);
}
使用示例
创建数据库和表
async function createDbAndTable(wsSql) {
let wsSql = null;
try {
wsSql = await createConnect();
await wsSql.exec('CREATE DATABASE IF NOT EXISTS POWER ' +
'KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
await wsSql.exec('CREATE STABLE IF NOT EXISTS meters ' +
'(_ts timestamp, current float, voltage int, phase float) ' +
'TAGS (location binary(64), groupId int);');
taosResult = await wsSql.exec('describe meters');
console.log(taosResult);
} catch (err) {
console.error(err.code, err.message);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
注意:如果不使用
USE power
指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。
插入数据
async function insertData(wsSql) {
let wsSql = null;
try {
wsSql = await createConnect();
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
taosResult = await wsSql.exec(insertQuery);
console.log(taosResult);
} catch (err) {
console.error(err.code, err.message);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
NOW 为系统内部函数,默认为客户端所在计算机当前时间。
NOW + 1s
代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。
查询数据
async function queryData() {
let wsRows = null;
let wsSql = null;
try {
wsSql = await createConnect();
wsRows = await wsSql.query('select * from meters');
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}
查询到的数据
wsRow:meta:=> [
{ name: 'ts', type: 'TIMESTAMP', length: 8 },
{ name: 'current', type: 'FLOAT', length: 4 },
{ name: 'voltage', type: 'INT', length: 4 },
{ name: 'phase', type: 'FLOAT', length: 4 },
{ name: 'location', type: 'VARCHAR', length: 64},
{ name: 'groupid', type: 'INT', length: 4 }
]
wsRow:data:=> [
[ 1714013737536n, 12.3, 221, 0.31, 'California.SanFrancisco', 3 ]
]
执行带有 reqId 的 SQL
reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:
- 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
- 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
- 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置
如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。
async function sqlWithReqid(wsSql) {
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
let wsRows = null;
let wsSql = null;
try {
wsSql = await createConnect();
taosResult = await wsSql.exec(insertQuery, 1);
wsRows = await wsSql.query('select * from meters', 2);
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}
通过参数绑定写入数据
TDengine 的 NodeJs 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 ? 来代表待绑定的参数。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
注意:
- 预处理语句中指定数据库与子表名称不要使用
db.?
,应直接使用?
,然后在 setTableName 中指定数据库,如:stmt.setTableName("db.t1")
。
示例代码:
const taos = require("@tdengine/websocket");
let db = 'power';
let stable = 'meters';
let tags = ['California.SanFrancisco', 3];
let values = [
[1706786044994, 1706786044995, 1706786044996],
[10.2, 10.3, 10.4],
[292, 293, 294],
[0.32, 0.33, 0.34],
];
async function prepare() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb(db)
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(`CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`);
await wsSql.exec(`CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`);
return wsSql
}
(async () => {
let stmt = null;
let connector = null;
try {
connector = await prepare();
stmt = await connector.stmtInit();
await stmt.prepare(`INSERT INTO ? USING ${db}.${stable} (location, groupId) TAGS (?, ?) VALUES (?, ?, ?, ?)`);
await stmt.setTableName('d1001');
let tagParams = stmt.newStmtParam();
tagParams.setVarchar([tags[0]]);
tagParams.setInt([tags[1]]);
await stmt.setTags(tagParams);
let bindParams = stmt.newStmtParam();
bindParams.setTimestamp(values[0]);
bindParams.setFloat(values[1]);
bindParams.setInt(values[2]);
bindParams.setFloat(values[3]);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
console.log(stmt.getLastAffected());
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (stmt) {
await stmt.close();
}
if (connector) {
await connector.close();
}
taos.destroy();
}
})();
用于设定 TAG/VALUES 数据列的取值的方法总共有:
setBoolean(params: any[]): void;
setTinyInt(params: any[]): void;
setUTinyInt(params: any[]): void;
setSmallInt(params: any[]): void;
setUSmallInt(params: any[]): void;
setInt(params: any[]): void;
setUInt(params: any[]): void;
setBigint(params: any[]): void;
setUBigint(params: any[]): void;
setFloat(params: any[]): void;
setDouble(params: any[]): void;
setVarchar(params: any[]): void;
setBinary(params: any[]): void;
setNchar(params: any[]): void;
setJson(params: any[]): void;
setVarBinary(params: any[]): void;
setGeometry(params: any[]): void;
setTimestamp(params: any[]): void;
注意:JSON 类型仅在 tag 中支持。
无模式写入
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见无模式写入。
const taos = require("@tdengine/websocket");
let influxdbData = ["meters,location=California.LosAngeles,groupId=2 current=11.8,voltage=221,phase=0.28 1648432611249",
"meters,location=California.LosAngeles,groupId=2 current=13.4,voltage=223,phase=0.29 1648432611250",
"meters,location=California.LosAngeles,groupId=3 current=10.8,voltage=223,phase=0.29 1648432611249"];
let jsonData = ["{\"metric\": \"meter_current\",\"timestamp\": 1626846402,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846403,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1002\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846404,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1003\"}}"]
let telnetData = ["meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3"];
async function createConnect() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
return wsSql;
}
async function test() {
let wsSql = null;
let wsRows = null;
let ttl = 0;
try {
wsSql = await createConnect()
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}
test()
执行带有 reqId 的无模式写入
此 reqId 可用于请求链路追踪。
await wsSchemaless.schemalessInsert([influxdbData], SchemalessProto.InfluxDBLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([telnetData], SchemalessProto.OpenTSDBTelnetLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([jsonData], SchemalessProto.OpenTSDBJsonFormatProtocol, Precision.NANO_SECONDS, ttl, reqId);
数据订阅
TDengine NodeJs 连接器支持订阅功能,应用 API 如下:
创建 Topic
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
创建 Consumer
async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "gId"],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.CLIENT_ID, 'test_tmq_client'],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
return await taos.tmqConnect(configMap);
}
参数说明
- taos.TMQConstants.CONNECT_USER: 用户名。
- taos.TMQConstants.CONNECT_PASS: 密码。
- taos.TMQConstants.GROUP_ID: 所在的 group。
- taos.TMQConstants.CLIENT_ID: client id。
- taos.TMQConstants.WS_URL: taosAdapter 的url地址。
- taos.TMQConstants.AUTO_OFFSET_RESET: 来确定消费位置为最新数据(latest)还是包含旧数据(earliest)。
- taos.TMQConstants.ENABLE_AUTO_COMMIT: 是否允许自动提交。
- taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS: 自动提交间隔。
- taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT: 数据传输超时参数,单位 ms,默认为 10000 ms。
其他参数请参考:Consumer 参数列表, 注意 TDengine 服务端自3.2.0.0版本开始消息订阅中的 auto.offset.reset 默认值发生变化。
订阅消费数据
async function subscribe(consumer) {
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.commit();
}
}
指定订阅 Offset
let assignment = await consumer.assignment();
console.log(assignment);
assignment = await consumer.seekToBeginning(assignment);
for(let i in assignment) {
console.log("seek after:", assignment[i])
}
关闭订阅
// 取消订阅
consumer.unsubscribe();
// 关闭消费
consumer.close()
// 释放连接器资源
taos.destroy();
详情请参考:数据订阅
完整示例
const taos = require("@tdengine/websocket");
const db = 'power';
const stable = 'meters';
const topics = ['power_meters_topic'];
// ANCHOR: create_consumer
async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "gId"],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.CLIENT_ID, 'test_tmq_client'],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
return await taos.tmqConnect(configMap);
}
// ANCHOR_END: create_consumer
async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb('power')
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);
// ANCHOR: create_topic
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
// ANCHOR_END: create_topic
for (let i = 0; i < 10; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
wsSql.Close();
}
// ANCHOR: subscribe
async function subscribe(consumer) {
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.commit();
}
}
// ANCHOR_END: subscribe
async function test() {
let consumer = null;
try {
await prepare();
let consumer = await createConsumer()
await subscribe(consumer)
// ANCHOR: assignment
let assignment = await consumer.assignment();
console.log(assignment);
assignment = await consumer.seekToBeginning(assignment);
for(let i in assignment) {
console.log("seek after:", assignment[i])
}
// ANCHOR_END: assignment
await consumer.unsubscribe();
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (consumer) {
await consumer.close();
}
taos.destroy();
}
}
test()
更多示例程序
示例程序 | 示例程序描述 |
---|---|
sql_example | 基本的使用如如建立连接,执行 SQL 等操作。 |
stmt_example | 绑定参数插入的示例。 |
line_example | 行协议写入示例。 |
telnet_line_example | OpenTSDB Telnet 行协议写入示例。 |
json_line_example | OpenTSDB JSON 行协议写入示例。 |
tmq_example | 订阅的使用示例。 |
使用限制
- Node.js 连接器(
@tdengine/websocket
)支持 Node.js 14 以上版本,低于 14 的版本可能存在包兼容性的问题。 - 目前只支持 WebSocket 连接,需要提前启动 taosAdapter
- 使用连接器结束后,需要调用 taos.connectorDestroy(); 释放连接器资源。
常见问题
-
"Unable to establish connection" 或 "Unable to resolve FQDN"
原因:一般都是因为配置 FQDN 不正确。 可以参考如何彻底搞懂 TDengine 的 FQDN 。