Skip to main content

TDengine Node.js Connector

@tdengine/websocket 是 TDengine 的官方 Node.js 语言连接器。Node.js 开发人员可以通过它开发存取 TDengine 数据库的的应用软件。

Node.js 连接器源码托管在 GitHub

连接方式

Node.js 连接器目前仅支持 Websocket 连接器, 其通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例。

连接方式的详细介绍请参考:连接器建立连接的方式

支持的平台

支持 Node.js 14及以上版本。

版本历史

Node.js 连接器 版本主要变化TDengine 版本
3.1.0新版本发布,支持 WebSocket 连接3.2.0.0 及更高版本

支持的功能特性

  • 连接管理
  • SQL写入
  • SQL查询
  • 参数绑定
  • 数据订阅
  • 无模式写入

处理异常

在调用连接器 api 报错后,通过 try catch 可以获取到错误的信息和错误码。

错误说明:Node.js 连接器错误码在 100 到 110 之间,之外的错误为 TDengine 其他功能模块的报错。

具体的连接器错误码请参考:

Error CodeDescriptionSuggested Actions
100invalid variables参数不合法,请检查相应接口规范,调整参数类型及大小。
101invalid urlurl 错误,请检查 url 是否填写正确。
102received server data but did not find a callback for processing接收到服务端数据但没有找到上层回调
103invalid message type接收到的消息类型无法识别,请检查服务端是否正常。
104connection creation failed连接创建失败,请检查网络是否正常。
105websocket request timeout请求超时
106authentication fail认证失败,请检查用户名,密码是否正确。
107unknown sql type in tdengine请检查 TDengine 支持的 Data Type 类型。
108connection has been closed连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。
109fetch block data parse fail获取到的查询数据,解析失败
110websocket connection has reached its maximum limitWebsocket 连接达到上限

类型映射

下表为 TDengine DataType 和 Node.js DataType 之间的映射关系

TDengine DataTypeNode.js DataType
TIMESTAMPbigint
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTbigint
TINYINT UNSIGNEDnumber
SMALLINT UNSIGNEDnumber
INT UNSIGNEDnumber
BIGINT UNSIGNEDbigint
FLOATnumber
DOUBLEnumber
BOOLboolean
BINARYstring
NCHARstring
JSONstring
VARBINARYArrayBuffer
GEOMETRYArrayBuffer

注意:JSON 类型仅在 tag 中支持。

安装步骤

安装前准备

使用 npm 安装 Node.js 连接器

npm install @tdengine/websocket

安装验证

验证方法:

  • 新建安装验证目录,例如:~/tdengine-test,下载 GitHub 上 nodejsChecker.js 源代码到本地。

  • 在命令行中执行以下命令。

  npm init -y
npm install @tdengine/websocket
node nodejsChecker.js
  • 执行以上步骤后,在命令行会输出 nodeChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。

建立连接

安装并引用 @tdengine/websocket 包。

注意

  • 链接器使用结束后,需要调用 taos.destroy() 释放连接器资源
const taos = require("@tdengine/websocket");

//数据库操作......

taos.destroy();
WSConfig配置Websocket参数如下:
getToken(): string | undefined | null;
setToken(token: string): void;
getUser(): string | undefined | null;
setUser(user: string): void;
getPwd(): string | undefined | null;
setPwd(pws: string): void;
getDb(): string | undefined | null;
setDb(db: string): void;
getUrl(): string;
setUrl(url: string): void;
setTimeOut(ms: number): void;
getTimeOut(): number | undefined | null;
async function createConnect() {
let dsn = 'ws://localhost:6041';
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
return await taos.sqlConnect(conf);
}

查看源码

使用示例

创建数据库和表

async function createDbAndTable(wsSql) {
let wsSql = null;
try {
wsSql = await createConnect();
await wsSql.exec('CREATE DATABASE IF NOT EXISTS POWER ' +
'KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');

await wsSql.exec('USE power');

await wsSql.exec('CREATE STABLE IF NOT EXISTS meters ' +
'(_ts timestamp, current float, voltage int, phase float) ' +
'TAGS (location binary(64), groupId int);');

taosResult = await wsSql.exec('describe meters');
console.log(taosResult);
} catch (err) {

console.error(err.code, err.message);
} finally {
if (wsSql) {
await wsSql.close();
}
}

}

查看源码

注意:如果不使用 USE power 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。

插入数据

async function insertData(wsSql) {
let wsSql = null;
try {
wsSql = await createConnect();
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
taosResult = await wsSql.exec(insertQuery);
console.log(taosResult);
} catch (err) {
console.error(err.code, err.message);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}

查看源码

NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。

查询数据

async function queryData() {
let wsRows = null;
let wsSql = null;
try {
wsSql = await createConnect();
wsRows = await wsSql.query('select * from meters');
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}

查看源码

查询到的数据

wsRow:meta:=> [
{ name: 'ts', type: 'TIMESTAMP', length: 8 },
{ name: 'current', type: 'FLOAT', length: 4 },
{ name: 'voltage', type: 'INT', length: 4 },
{ name: 'phase', type: 'FLOAT', length: 4 },
{ name: 'location', type: 'VARCHAR', length: 64},
{ name: 'groupid', type: 'INT', length: 4 }
]
wsRow:data:=> [
[ 1714013737536n, 12.3, 221, 0.31, 'California.SanFrancisco', 3 ]
]

执行带有 reqId 的 SQL

reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:

  • 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
  • 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
  • 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置

如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。

async function sqlWithReqid(wsSql) {
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";

let wsRows = null;
let wsSql = null;
try {
wsSql = await createConnect();
taosResult = await wsSql.exec(insertQuery, 1);
wsRows = await wsSql.query('select * from meters', 2);
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}

查看源码

通过参数绑定写入数据

TDengine 的 NodeJs 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 ? 来代表待绑定的参数。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。

注意

  • 预处理语句中指定数据库与子表名称不要使用 db.?,应直接使用 ?,然后在 setTableName 中指定数据库,如:stmt.setTableName("db.t1")

示例代码:

const taos = require("@tdengine/websocket");

let db = 'power';
let stable = 'meters';
let tags = ['California.SanFrancisco', 3];
let values = [
[1706786044994, 1706786044995, 1706786044996],
[10.2, 10.3, 10.4],
[292, 293, 294],
[0.32, 0.33, 0.34],
];

async function prepare() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb(db)
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(`CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`);
await wsSql.exec(`CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`);
return wsSql
}

(async () => {
let stmt = null;
let connector = null;
try {
connector = await prepare();
stmt = await connector.stmtInit();
await stmt.prepare(`INSERT INTO ? USING ${db}.${stable} (location, groupId) TAGS (?, ?) VALUES (?, ?, ?, ?)`);
await stmt.setTableName('d1001');
let tagParams = stmt.newStmtParam();
tagParams.setVarchar([tags[0]]);
tagParams.setInt([tags[1]]);
await stmt.setTags(tagParams);

let bindParams = stmt.newStmtParam();
bindParams.setTimestamp(values[0]);
bindParams.setFloat(values[1]);
bindParams.setInt(values[2]);
bindParams.setFloat(values[3]);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
console.log(stmt.getLastAffected());
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (stmt) {
await stmt.close();
}
if (connector) {
await connector.close();
}
taos.destroy();
}
})();

查看源码

用于设定 TAG/VALUES 数据列的取值的方法总共有:

setBoolean(params: any[]): void;
setTinyInt(params: any[]): void;
setUTinyInt(params: any[]): void;
setSmallInt(params: any[]): void;
setUSmallInt(params: any[]): void;
setInt(params: any[]): void;
setUInt(params: any[]): void;
setBigint(params: any[]): void;
setUBigint(params: any[]): void;
setFloat(params: any[]): void;
setDouble(params: any[]): void;
setVarchar(params: any[]): void;
setBinary(params: any[]): void;
setNchar(params: any[]): void;
setJson(params: any[]): void;
setVarBinary(params: any[]): void;
setGeometry(params: any[]): void;
setTimestamp(params: any[]): void;

注意:JSON 类型仅在 tag 中支持。

无模式写入

TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见无模式写入

const taos = require("@tdengine/websocket");

let influxdbData = ["meters,location=California.LosAngeles,groupId=2 current=11.8,voltage=221,phase=0.28 1648432611249",
"meters,location=California.LosAngeles,groupId=2 current=13.4,voltage=223,phase=0.29 1648432611250",
"meters,location=California.LosAngeles,groupId=3 current=10.8,voltage=223,phase=0.29 1648432611249"];

let jsonData = ["{\"metric\": \"meter_current\",\"timestamp\": 1626846402,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846403,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1002\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846404,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1003\"}}"]

let telnetData = ["meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3"];

async function createConnect() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
return wsSql;
}

async function test() {
let wsSql = null;
let wsRows = null;
let ttl = 0;
try {
wsSql = await createConnect()
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}
test()

查看源码

执行带有 reqId 的无模式写入

此 reqId 可用于请求链路追踪。

await wsSchemaless.schemalessInsert([influxdbData], SchemalessProto.InfluxDBLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([telnetData], SchemalessProto.OpenTSDBTelnetLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([jsonData], SchemalessProto.OpenTSDBJsonFormatProtocol, Precision.NANO_SECONDS, ttl, reqId);

数据订阅

TDengine NodeJs 连接器支持订阅功能,应用 API 如下:

创建 Topic

let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);

查看源码

创建 Consumer

async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "gId"],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.CLIENT_ID, 'test_tmq_client'],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
return await taos.tmqConnect(configMap);
}

查看源码

参数说明

  • taos.TMQConstants.CONNECT_USER: 用户名。
  • taos.TMQConstants.CONNECT_PASS: 密码。
  • taos.TMQConstants.GROUP_ID: 所在的 group。
  • taos.TMQConstants.CLIENT_ID: client id。
  • taos.TMQConstants.WS_URL: taosAdapter 的url地址。
  • taos.TMQConstants.AUTO_OFFSET_RESET: 来确定消费位置为最新数据(latest)还是包含旧数据(earliest)。
  • taos.TMQConstants.ENABLE_AUTO_COMMIT: 是否允许自动提交。
  • taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS: 自动提交间隔。
  • taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT: 数据传输超时参数,单位 ms,默认为 10000 ms。

其他参数请参考:Consumer 参数列表, 注意 TDengine 服务端自3.2.0.0版本开始消息订阅中的 auto.offset.reset 默认值发生变化。

订阅消费数据

async function subscribe(consumer) {
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.commit();
}
}

查看源码

指定订阅 Offset

        let assignment = await consumer.assignment();
console.log(assignment);

assignment = await consumer.seekToBeginning(assignment);
for(let i in assignment) {
console.log("seek after:", assignment[i])
}

查看源码

关闭订阅

// 取消订阅
consumer.unsubscribe();
// 关闭消费
consumer.close()
// 释放连接器资源
taos.destroy();

详情请参考:数据订阅

完整示例

const taos = require("@tdengine/websocket");

const db = 'power';
const stable = 'meters';
const topics = ['power_meters_topic'];

// ANCHOR: create_consumer
async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "gId"],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.CLIENT_ID, 'test_tmq_client'],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
return await taos.tmqConnect(configMap);
}
// ANCHOR_END: create_consumer

async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb('power')
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;

let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);

// ANCHOR: create_topic
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
// ANCHOR_END: create_topic

for (let i = 0; i < 10; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
wsSql.Close();
}

// ANCHOR: subscribe
async function subscribe(consumer) {
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.commit();
}
}
// ANCHOR_END: subscribe

async function test() {
let consumer = null;
try {
await prepare();
let consumer = await createConsumer()
await subscribe(consumer)
// ANCHOR: assignment
let assignment = await consumer.assignment();
console.log(assignment);

assignment = await consumer.seekToBeginning(assignment);
for(let i in assignment) {
console.log("seek after:", assignment[i])
}
// ANCHOR_END: assignment
await consumer.unsubscribe();
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (consumer) {
await consumer.close();
}
taos.destroy();
}
}

test()

查看源码

更多示例程序

示例程序示例程序描述
sql_example基本的使用如如建立连接,执行 SQL 等操作。
stmt_example绑定参数插入的示例。
line_example行协议写入示例。
telnet_line_exampleOpenTSDB Telnet 行协议写入示例。
json_line_exampleOpenTSDB JSON 行协议写入示例。
tmq_example订阅的使用示例。

使用限制

  • Node.js 连接器(@tdengine/websocket)支持 Node.js 14 以上版本,低于 14 的版本可能存在包兼容性的问题。
  • 目前只支持 WebSocket 连接,需要提前启动 taosAdapter
  • 使用连接器结束后,需要调用 taos.connectorDestroy(); 释放连接器资源。

常见问题

  1. "Unable to establish connection" 或 "Unable to resolve FQDN"

    原因:一般都是因为配置 FQDN 不正确。 可以参考如何彻底搞懂 TDengine 的 FQDN