TDengine Node.js Connector
@tdengine/websocket
是 TDengine 的官方 Node.js 语言连接器。Node.js 开发人员可以通过它开发存取 TDengine 数据库的的应用软件。
Node.js 连接器源码托管在 GitHub。
连接方式
Node.js 连接器目前仅支持 Websocket 连接器, 其通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例。
连接方式的详细介绍请参考:连接器建立连接的方式
支持的平台
支持 Node.js 14及以上版本。
版本历史
Node.js 连接器 版本 | 主要变化 | TDengine 版本 |
---|---|---|
3.1.0 | 新版本发布,支持 WebSocket 连接 | 3.2.0.0 及更高版本 |
支持的功能特性
- 连接管理
- SQL写入
- SQL查询
- 参数绑定
- 数据订阅
- 无模式写入
处理异常
在调用连接器 api 报错后,通过 try catch 可以获取到错误的信息和错误码。
错误说明:Node.js 连接器错误码在 100 到 110 之间,之外的错误为 TDengine 其他功能模块的报错。
具体的连接器错误码请参考:
Error Code | Description | Suggested Actions |
---|---|---|
100 | invalid variables | 参数不合法,请检查相应接口规范,调整参数类型及大小。 |
101 | invalid url | url 错误,请检查 url 是否填写正确。 |
102 | received server data but did not find a callback for processing | 接收到服务端数据但没有找到上层回调 |
103 | invalid message type | 接收到的消息类型无法识别,请检查服务端是否正常。 |
104 | connection creation failed | 连接创建失败,请检查网络是否正常。 |
105 | websocket request timeout | 请求超时 |
106 | authentication fail | 认证失败,请检查用户名,密码是否正确。 |
107 | unknown sql type in tdengine | 请检查 TDengine 支持的 Data Type 类型。 |
108 | connection has been closed | 连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。 |
109 | fetch block data parse fail | 获取到的查询数据,解析失败 |
110 | websocket connection has reached its maximum limit | Websocket 连接达到上限 |
类型映射
下表为 TDengine DataType 和 Node.js DataType 之间的映射关系
TDengine DataType | Node.js DataType |
---|---|
TIMESTAMP | bigint |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | bigint |
TINYINT UNSIGNED | number |
SMALLINT UNSIGNED | number |
INT UNSIGNED | number |
BIGINT UNSIGNED | bigint |
FLOAT | number |
DOUBLE | number |
BOOL | boolean |
BINARY | string |
NCHAR | string |
JSON | string |
VARBINARY | ArrayBuffer |
GEOMETRY | ArrayBuffer |
注意:JSON 类型仅在 tag 中支持。
安装步骤
安装前准备
- 安装 Node.js 开发环境, 使用14以上版本。下载链接: https://nodejs.org/en/download/
使用 npm 安装 Node.js 连接器
npm install @tdengine/websocket
安装验证
验证方法:
-
新建安装验证目录,例如:
~/tdengine-test
,下载 GitHub 上 nodejsChecker.js 源代码到本地。 -
在命令行中执行以下命令。
npm init -y
npm install @tdengine/websocket
node nodejsChecker.js
- 执行以上步骤后,在命令行会输出 nodeChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
建立连接
安装并引用 @tdengine/websocket
包。
注意:
- 链接器使用结束后,需要调用 taos.destroy() 释放连接器资源
const taos = require("@tdengine/websocket");
//数据库操作......
taos.destroy();
WSConfig配置Websocket参数如下:
getToken(): string | undefined | null;
setToken(token: string): void;
getUser(): string | undefined | null;
setUser(user: string): void;
getPwd(): string | undefined | null;
setPwd(pws: string): void;
getDb(): string | undefined | null;
setDb(db: string): void;
getUrl(): string;
setUrl(url: string): void;
setTimeOut(ms: number): void;
getTimeOut(): number | undefined | null;
async function createConnect() {
let dsn = 'ws://localhost:6041';
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
return await taos.sqlConnect(conf);
}
使用示例
创建数据库和表
async function createDbAndTable(wsSql) {
let wsSql = null;
try {
wsSql = await createConnect();
await wsSql.exec('CREATE DATABASE IF NOT EXISTS POWER ' +
'KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
await wsSql.exec('CREATE STABLE IF NOT EXISTS meters ' +
'(_ts timestamp, current float, voltage int, phase float) ' +
'TAGS (location binary(64), groupId int);');
taosResult = await wsSql.exec('describe meters');
console.log(taosResult);
} catch (err) {
console.error(err.code, err.message);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
注意:如果不使用
USE power
指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。
插入数据
async function insertData(wsSql) {
let wsSql = null;
try {
wsSql = await createConnect();
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
taosResult = await wsSql.exec(insertQuery);
console.log(taosResult);
} catch (err) {
console.error(err.code, err.message);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
NOW 为系统内部函数,默认为客户端所在计算机当前时间。
NOW + 1s
代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。
查询数据
async function queryData() {
let wsRows = null;
let wsSql = null;
try {
wsSql = await createConnect();
wsRows = await wsSql.query('select * from meters');
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}
查询到的数据
wsRow:meta:=> [
{ name: 'ts', type: 'TIMESTAMP', length: 8 },
{ name: 'current', type: 'FLOAT', length: 4 },
{ name: 'voltage', type: 'INT', length: 4 },
{ name: 'phase', type: 'FLOAT', length: 4 },
{ name: 'location', type: 'VARCHAR', length: 64},
{ name: 'groupid', type: 'INT', length: 4 }
]
wsRow:data:=> [
[ 1714013737536n, 12.3, 221, 0.31, 'California.SanFrancisco', 3 ]
]
执行带有 reqId 的 SQL
reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:
- 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
- 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
- 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置
如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。
async function sqlWithReqid(wsSql) {
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
let wsRows = null;
let wsSql = null;
try {
wsSql = await createConnect();
taosResult = await wsSql.exec(insertQuery, 1);
wsRows = await wsSql.query('select * from meters', 2);
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}