跳到主要内容

TDengine TSDB Node.js Connector

@tdengine/websocket 是 TDengine TSDB 的官方 Node.js 语言连接器。Node.js 开发人员可以通过它开发存取 TDengine TSDB 数据库的应用软件。 Node.js 连接器源码托管在 GitHub

Node.js 版本兼容性

支持 Node.js 14 及以上版本。

支持的平台

支持所有能运行 Node.js 的平台。

版本历史

Node.js 连接器 版本主要变化TDengine TSDB 版本
3.3.0支持负载均衡和故障转移-
3.2.31. 支持令牌身份认证
2. 支持上报连接器版本信息
-
3.2.2修复 Windows 系统上的时区处理问题-
3.2.1修复 SQL 查询结果排序问题-
3.2.0优化 stmt 参数绑定,提升写入效率-
3.1.9修复 WebSocket 连接的时区设置问题-
3.1.8修复当网络异常时,连接池返回的连接不可用-
3.1.7修复云服务 TMQ 连接参数问题-
3.1.61. 检测连接器是否支持数据库的版本
2. 连接器支持新增订阅参数
-
3.1.5密码支持特殊字符-
3.1.4修改 readme-
3.1.3升级了 es5-ext 版本,解决低版本的漏洞-
3.1.2对数据协议和解析进行了优化,性能得到大幅提升-
3.1.1优化了数据传输性能3.3.2.0 及更高版本
3.1.0新版本发布,支持 WebSocket 连接3.2.0.0 及更高版本

处理异常

在调用连接器 api 报错后,通过 try catch 可以获取到错误的信息和错误码。
错误码信息请参考 错误码

数据类型映射

下表为 TDengine TSDB DataType 和 Node.js DataType 之间的映射关系

TDengine TSDB DataTypeNode.js DataType
TIMESTAMPbigint
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTbigint
TINYINT UNSIGNEDnumber
SMALLINT UNSIGNEDnumber
INT UNSIGNEDnumber
BIGINT UNSIGNEDbigint
FLOATnumber
DOUBLEnumber
BOOLboolean
BINARYstring
NCHARstring
JSONstring
VARBINARYArrayBuffer
GEOMETRYArrayBuffer

注意:JSON 类型仅在 tag 中支持。

更多示例程序

示例程序示例程序描述
sql_example基本的使用如如建立连接,执行 SQL 等操作。
stmt_example绑定参数插入的示例。
line_example行协议写入示例。
tmq_example订阅的使用示例。
all_type_query支持全部类型示例。
all_type_stmt参数绑定支持全部类型示例。

使用限制

  • Node.js 连接器(@tdengine/websocket)支持 Node.js 14 以上版本,低于 14 的版本可能存在包兼容性的问题。
  • 目前只支持 WebSocket 连接,需要提前启动 taosAdapter。
  • 使用连接器结束后,需要调用 taos.connectorDestroy(); 释放连接器资源。
  • 如果要设置 SQL 字符串中时间字符串的时区,需要设置 taosadapter 所在机器的 taosc 时区配置。
  • 在解析结果集时,JavaScript 不支持 int64 类型,因此无法直接进行时区转换。若用户有需求,可引入第三方库以提供支持。

常见问题

  1. "Unable to establish connection" 或 "Unable to resolve FQDN"

    原因:一般都是因为配置 FQDN 不正确。可以参考如何彻底搞懂 TDengine TSDB 的 FQDN

API 参考

Node.js 连接器(@tdengine/websocket), 其通过 taosAdapter 提供的 WebSocket 接口连接 TDengine TSDB 实例。

URL 规范

[+<protocol>]://[<username>:<password>@][<host1>:<port1>[,...<hostN>:<portN>]][/<database>][?<p1>=<v1>[&=<v2>]]
|-----------|---|-----------|-----------|------------------------------------|------------|-----------------------|
| protocol | | username | password | addresses | database | params |
  • protocol:使用 WebSocket 协议建立连接。例如 ws://localhost:6041

  • username/password:数据库用户名和密码。

  • addresses:地址列表,支持一个或多个 host:port,多个地址之间使用英文逗号分隔(例如 localhost:6041,localhost:6042)。@tdengine/websocket 同时支持 IPv4 和 IPv6 两种地址格式。对于 IPv6 地址,必须使用中括号包裹(例如 [::1][2001:db8:1234:5678::1]),以避免与端口号解析冲突。

  • database:数据库名称。

  • params

    • token:用于 TDengine TSDB 云服务身份认证。
    • bearer_token:用于 TDengine TSDB 身份认证,优先级高于用户名/密码。
    • retries:连接失败时的最大重试次数,默认为 5。
    • retry_backoff_ms:连接失败时的初始等待时间(毫秒),默认为 200。该值会随连续失败按指数增长,直到达到最大等待时间。
    • retry_backoff_max_ms:连接失败时的最大等待时间(毫秒),默认为 2000。
  • 完整 DSN 示例:

// IPV4:
ws://root:taosdata@localhost:6041

// IPV6:
ws://root:taosdata@[::1]:6041

// 多地址(可用于负载均衡和故障转移场景):
ws://root:taosdata@localhost:6041,localhost:6042,localhost:6043

WSConfig

除了通过指定的 URL 获取连接,还可以使用 WSConfig 指定建立连接时的参数。

const taos = require("@tdengine/websocket");

async function createConnect() {
try {
let url = 'ws://127.0.0.1:6041'
let conf = new taos.WSConfig(url)
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb('db')
conf.setTimeOut(500)
let wsSql = await taos.sqlConnect(conf)
} catch (e) {
console.error(e);
}
}

WSConfig 中的配置如下:

  • setUrl(url string) 设置 taosAdapter 连接地址 url,详见上文 URL 规范。
  • setUser(user: string) 设置数据库用户名。
  • setPwd(pws:string) 设置数据库密码。
  • setDb(db: string) 设置数据库名称。
  • setTimeOut(ms : number) 设置连接超时,单位毫秒。
  • setToken(token: string) 设置云服务认证 token。
  • setBearerToken(token: string) 设置 TDengine TSDB 认证 token,认证优先级高于用户名密码。

连接功能

  • static async open(wsConfig:WSConfig):Promise<WsSql>
    • 接口说明:建立 taosAdapter 连接。
    • 参数说明
      • wsConfig:连接配置,详见上文 WSConfig 章节。
    • 返回值:连接对象。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • destroyed()
    • 接口说明:释放销毁资源。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

获取 taosc 版本号

  • async version(): Promise<string>
    • 接口说明:获取 taosc 客户端版本。
    • 返回值:taosc 客户端版本号。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

执行 SQL

  • async exec(sql: string, reqId?: number): Promise<TaosResult>
    • 接口说明:执行 sql 语句。

    • 参数说明

      • sql:待执行的 sql 语句。
      • reqId:请求 id 非必填,用于问题追踪。
    • 返回值:执行结果

      TaosResult {
      affectRows: number, 影响的条数
      timing: number, 执行时长
      totalTime: number, 响应总时长
      }
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

  • async query(sql: string, reqId?:number): Promise<WSRows>
    • 接口说明:查询数据。
    • 参数说明
      • sql:待执行的查询 sql 语句。
      • reqId:请求 id 非必填,用于问题追踪。
    • 返回值:WSRows 数据集对象。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

数据集

  • getMeta():Array<TDengineMeta> | null

    • 接口说明:获取查询结果的列的数量、类型和长度。

    • 返回值:TDengineMeta 数据对象数组。

      export interface TDengineMeta {
      name: string,
      type: string,
      length: number,
      }
  • async next(): Promise<boolean>

    • 接口说明:将游标从当前位置向后移动一行。用于遍历查询结果集。
    • 返回值:如果新的当前行有效,则返回 true;如果结果集中没有更多行,则返回 false。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • getData(): Array<any>

    • 接口说明:返回查询的一行数据。
    • 返回值:返回查询的一行数据,此接口需要搭配 next 接口一起使用。
  • async close():Promise<void>

    • 接口说明:数据读取完成后,释放结果集。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

无模式写入

  • async schemalessInsert(lines: Array<string>, protocol: SchemalessProto, precision: Precision, ttl: number, reqId?: number): Promise<void>
    • 接口说明:无模式写入。
    • 参数说明
      • lines:待写入的数据数组,无模式具体的数据格式可参考 Schemaless 写入
      • protocol:协议类型
        • SchemalessProto.InfluxDBLineProtocol:InfluxDB 行协议(Line Protocol)。
        • SchemalessProto.OpenTSDBTelnetLineProtocol:OpenTSDB 文本行协议。
        • SchemalessProto.OpenTSDBJsonFormatProtocol:JSON 协议格式。
      • precision:时间精度
        • Precision.HOURS:小时
        • Precision.MINUTES:分钟
        • Precision.SECONDS:秒
        • Precision.MILLI_SECONDS:毫秒
        • Precision.MICRO_SECONDS:微秒
        • Precision.NANO_SECONDS:纳秒
      • ttl:表过期时间,单位天。
      • reqId:用于问题追踪,可选。
    • 异常:连接失败抛出 TaosResultError 异常。

参数绑定

  • async stmtInit(reqId?:number): Promise<WsStmt>
    • 接口说明 使用 WsSql 对象创建 stmt 对象。
    • 参数说明
      • reqId:请求 id 非必填,用于问题追踪。
    • 返回值:stmt 对象。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async prepare(sql: string): Promise<void>
    • 接口说明 绑定预编译 sql 语句。
    • 参数说明
      • sql:预编译的 SQL 语句。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async setTableName(tableName: string): Promise<void>
    • 接口说明 设置将要写入数据的表名。
    • 参数说明
      • tableName:表名,如果需要指定数据库,例如:db_name.table_name 即可。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。 通过 StmtBindParams 对象设置绑定数据。
  • setBoolean(params :any[])
    • 接口说明 设置布尔值。
    • 参数说明
      • params:布尔类型列表。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • 下面接口除了要设置的值类型不同外,其余同 setBoolean:
    • setTinyInt(params :any[])
    • setUTinyInt(params :any[])
    • setSmallInt(params :any[])
    • setUSmallInt(params :any[])
    • setInt(params :any[])
    • setUInt(params :any[])
    • setBigint(params :any[])
    • setUBigint(params :any[])
    • setFloat(params :any[])
    • setDouble(params :any[])
    • setVarchar(params :any[])
    • setBinary(params :any[])
    • setNchar(params :any[])
    • setJson(params :any[])
    • setVarBinary(params :any[])
    • setGeometry(params :any[])
    • setTimestamp(params :any[])
  • async setTags(paramsArray:StmtBindParams): Promise<void>
    • 接口说明 设置表 Tags 数据,用于自动建表。
    • 参数说明
      • paramsArray:Tags 数据。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async bind(paramsArray:StmtBindParams): Promise<void>
    • 接口说明 绑定数据。
    • 参数说明
      • paramsArray:绑定数据。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async batch(): Promise<void>
    • 接口说明 提交绑定数据。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async exec(): Promise<void>
    • 接口说明 执行将绑定的数据全部写入。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • getLastAffected()
    • 接口说明 获取写入条数。
    • 返回值:写入条数。
  • async close(): Promise<void>
    • 接口说明 关闭 stmt 对象。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

数据订阅

  • 创建消费者支持属性列表
    • taos.TMQConstants.CONNECT_USER:用户名。
    • taos.TMQConstants.CONNECT_PASS:密码。
    • taos.TMQConstants.GROUP_ID:所在的 group。
    • taos.TMQConstants.CLIENT_ID:客户端 id。
    • taos.TMQConstants.WS_URL:taosAdapter 的 url 地址。
    • taos.TMQConstants.AUTO_OFFSET_RESET:来确定消费位置为最新数据(latest)还是包含旧数据(earliest)。
    • taos.TMQConstants.ENABLE_AUTO_COMMIT:是否允许自动提交。
    • taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS:自动提交间隔。
    • taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT:数据传输超时参数,单位 ms,默认为 10000 ms。
  • static async newConsumer(wsConfig:Map<string, any>):Promise<WsConsumer>
    • 接口说明 消费者构造函数。
    • 参数说明
      • wsConfig:创建消费者属性配置。
    • 返回值:WsConsumer 消费者对象。
    • 异常:如果在执行过程中出现异常,抛出 TDWebSocketClientError 错误。
  • async subscribe(topics: Array<string>, reqId?:number): Promise<void>
    • 接口说明 订阅一组主题。
    • 参数说明
      • topics:订阅的主题列表。
      • reqId:请求 id 非必填,用于问题追踪。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async unsubscribe(reqId?:number): Promise<void>
    • 接口说明 取消订阅。
    • 参数说明
      • reqId:请求 id 非必填,用于问题追踪。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async poll(timeoutMs: number, reqId?:number):Promise<Map<string, TaosResult>>
    • 接口说明 轮询消息。
    • 参数说明
      • timeoutMs:表示轮询的超时时间,单位毫秒。
      • reqId:请求 id 非必填,用于问题追踪。
    • 返回值Map<string, TaosResult> 每个主题对应的数据。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async subscription(reqId?:number):Promise<Array<string>>
    • 接口说明 获取当前订阅的所有主题。
    • 参数说明
      • reqId:请求 id 非必填,用于问题追踪。
    • 返回值Array<string> 主题列表。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async commit(reqId?:number):Promise<Array<TopicPartition>>
    • 接口说明 提交当前处理的消息的偏移量。
    • 参数说明
      • reqId:请求 id 非必填,用于问题追踪。
    • 返回值Array<TopicPartition> 每个主题的消费进度。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async committed(partitions:Array<TopicPartition>, reqId?:number):Promise<Array<TopicPartition>>
    • 接口说明:获取一组分区最后提交的偏移量。
    • 参数说明
      • partitions:一个 Array<TopicPartition> 类型的参数,表示要查询的分区集合。
      • reqId:请求 id 非必填,用于问题追踪。
    • 返回值Array<TopicPartition>,即一组分区最后提交的偏移量。
    • 异常:如果在获取提交的偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async seek(partition:TopicPartition, reqId?:number):Promise<void>
    • 接口说明:将给定分区的偏移量设置到指定的位置。
    • 参数说明
      • partition:一个 TopicPartition 类型的参数,表示要操作的分区和要设置的偏移量。
      • reqId:请求 id 非必填,用于问题追踪。
    • 异常:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async positions(partitions:Array<TopicPartition>, reqId?:number):Promise<Array<TopicPartition>>
    • 接口说明:获取给定分区当前的偏移量。
    • 参数说明
      • partitions:一个 TopicPartition 类型的参数,表示要查询的分区。
      • reqId:请求 id 非必填,用于问题追踪。
    • 返回值Array<TopicPartition>,即一组分区最后提交的偏移量。
    • 异常:如果在获取偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async seekToBeginning(partitions:Array<TopicPartition>):Promise<void>
    • 接口说明:将一组分区的偏移量设置到最早的偏移量。
    • 参数说明
      • partitions:一个 Array<TopicPartition> 类型的参数,表示要操作的分区集合。
    • 异常:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async seekToEnd(partitions:Array<TopicPartition>):Promise<void>
    • 接口说明:将一组分区的偏移量设置到最新的偏移量。
    • 参数说明
      • partitions:一个 Array<TopicPartition> 类型的参数,表示要操作的分区集合。
    • 异常:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async assignment(topics?:string[]):Promise<Array<TopicPartition>>
    • 接口说明:获取消费者当前分配的指定的分区或所有分区。
    • 参数说明
      • topics:需要获取的分区(非必填),不填表示获取全部的分区
    • 返回值:返回值类型为 Array<TopicPartition>,即消费者当前分配的所有分区。
    • 异常:如果在获取分配的分区过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async close():Promise<void>
    • 接口说明:关闭 tmq 连接。
    • 异常:操作失败抛出 TDWebSocketClientError 异常。