跳到主要内容

TDengine Node.js Connector

@tdengine/websocket 是 TDengine 的官方 Node.js 语言连接器。Node.js 开发人员可以通过它开发存取 TDengine 数据库的应用软件。 Node.js 连接器源码托管在 GitHub

Node.js 版本兼容性

支持 Node.js 14 及以上版本。

支持的平台

支持所有能运行 Node.js 的平台。

版本历史

Node.js 连接器 版本主要变化TDengine 版本
3.1.2对数据协议和解析进行了优化,性能得到大幅提升-
3.1.1优化了数据传输性能3.3.2.0 及更高版本
3.1.0新版本发布,支持 WebSocket 连接3.2.0.0 及更高版本

处理异常

在调用连接器 api 报错后,通过 try catch 可以获取到错误的信息和错误码。

错误说明:Node.js 连接器错误码在 100 到 110 之间,之外的错误为 TDengine 其他功能模块的报错。

具体的连接器错误码请参考:

Error CodeDescriptionSuggested Actions
100invalid variables参数不合法,请检查相应接口规范,调整参数类型及大小。
101invalid urlurl 错误,请检查 url 是否填写正确。
102received server data but did not find a callback for processing接收到服务端数据但没有找到上层回调
103invalid message type接收到的消息类型无法识别,请检查服务端是否正常。
104connection creation failed连接创建失败,请检查网络是否正常。
105websocket request timeout请求超时
106authentication fail认证失败,请检查用户名,密码是否正确。
107unknown sql type in tdengine请检查 TDengine 支持的 Data Type 类型。
108connection has been closed连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。
109fetch block data parse fail获取到的查询数据,解析失败
110websocket connection has reached its maximum limitWebSocket 连接达到上限

数据类型映射

下表为 TDengine DataType 和 Node.js DataType 之间的映射关系

TDengine DataTypeNode.js DataType
TIMESTAMPbigint
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTbigint
TINYINT UNSIGNEDnumber
SMALLINT UNSIGNEDnumber
INT UNSIGNEDnumber
BIGINT UNSIGNEDbigint
FLOATnumber
DOUBLEnumber
BOOLboolean
BINARYstring
NCHARstring
JSONstring
VARBINARYArrayBuffer
GEOMETRYArrayBuffer

注意:JSON 类型仅在 tag 中支持。

更多示例程序

示例程序示例程序描述
sql_example基本的使用如如建立连接,执行 SQL 等操作。
stmt_example绑定参数插入的示例。
line_example行协议写入示例。
tmq_example订阅的使用示例。
all_type_query支持全部类型示例。
all_type_stmt参数绑定支持全部类型示例。

使用限制

  • Node.js 连接器(@tdengine/websocket)支持 Node.js 14 以上版本,低于 14 的版本可能存在包兼容性的问题。
  • 目前只支持 WebSocket 连接,需要提前启动 taosAdapter
  • 使用连接器结束后,需要调用 taos.connectorDestroy(); 释放连接器资源。

常见问题

  1. "Unable to establish connection" 或 "Unable to resolve FQDN"

    原因:一般都是因为配置 FQDN 不正确。 可以参考如何彻底搞懂 TDengine 的 FQDN

API 参考

Node.js 连接器(@tdengine/websocket), 其通过 taosAdapter 提供的 WebSocket 接口连接 TDengine 实例。

URL 规范

[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------------|---|-----------|-----------|------|------|------------|-----------------------|
| protocol | | username | password | host | port | database | params |
  • protocol: 使用 websocket 协议建立连接。例如ws://localhost:6041

  • username/password: 数据库的用户名和密码。

  • host/port: 主机地址和端口号。例如localhost:6041

  • database: 数据库名称。

  • params: 其他参数。 例如token。

  • 完整 URL 示例:

    ws://root:taosdata@localhost:6041

WSConfig

除了通过指定的 URL 获取连接,还可以使用 WSConfig 指定建立连接时的参数。

try {
let url = 'ws://127.0.0.1:6041'
let conf = WsSql.NewConfig(url)
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb('db')
conf.setTimeOut(500)
let wsSql = await WsSql.open(conf);
} catch (e) {
console.error(e);
}

WSConfig 中的配置如下:

  • setUrl(url string) 设置 taosAdapter 连接地址 url,详见上文 URL 规范。
  • setUser(user: string) 设置数据库用户名。
  • setPwd(pws:string) 设置数据库密码。
  • setDb(db: string) 设置数据库名称。
  • setTimeOut(ms : number) 设置连接超时,单位毫秒。
  • setToken(token: string) 设置 taosAdapter 认证token。

连接功能

  • static async open(wsConfig:WSConfig):Promise<WsSql>
    • 接口说明:建立 taosAdapter 连接。
    • 参数说明
      • wsConfig:连接配置,详见上文 WSConfig 章节。
    • 返回值:连接对象。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • destroyed()
    • 接口说明:释放销毁资源。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

获取 taosc 版本号

  • async version(): Promise<string>
    • 接口说明:获取 taosc 客户端版本。
    • 返回值:taosc 客户端版本号。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

执行 SQL

  • async exec(sql: string, reqId?: number): Promise<TaosResult>
    • 接口说明:执行 sql 语句。
    • 参数说明
      • sql:待执行的 sql 语句。
      • reqId: 请求 id 非必填,用于问题追踪。
    • 返回值:执行结果
      TaosResult {
      affectRows: number, 影响的条数
      timing: number, 执行时长
      totalTime: number, 响应总时长
      }
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async query(sql: string, reqId?:number): Promise<WSRows>
    • 接口说明:查询数据。
    • 参数说明
      • sql:待执行的查询 sql 语句。
      • reqId: 请求 id 非必填,用于问题追踪。
    • 返回值:WSRows 数据集对象。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

数据集

  • getMeta():Array<TDengineMeta> | null
    • 接口说明:获取查询结果的列的数量、类型和长度。
    • 返回值:TDengineMeta 数据对象数组。
    export interface TDengineMeta {
    name: string,
    type: string,
    length: number,
    }
  • async next(): Promise<boolean>
    • 接口说明:将游标从当前位置向后移动一行。用于遍历查询结果集。
    • 返回值:如果新的当前行有效,则返回 true;如果结果集中没有更多行,则返回 false。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • getData(): Array<any>
    • 接口说明:返回查询的一行数据。
    • 返回值:返回查询的一行数据,此接口需要搭配 next 接口一起使用。
  • async close():Promise<void>
    • 接口说明:数据读取完成后,释放结果集。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

无模式写入

  • async schemalessInsert(lines: Array<string>, protocol: SchemalessProto, precision: Precision, ttl: number, reqId?: number): Promise<void>
    • 接口说明:无模式写入。
    • 参数说明
      • lines:待写入的数据数组,无模式具体的数据格式可参考 Schemaless 写入
      • protocol: 协议类型
        • SchemalessProto.InfluxDBLineProtocol:InfluxDB 行协议(Line Protocol)。
        • SchemalessProto.OpenTSDBTelnetLineProtocol:OpenTSDB 文本行协议。
        • SchemalessProto.OpenTSDBJsonFormatProtocol:JSON 协议格式。
      • precision: 时间精度
        • Precision.HOURS: 小时
        • Precision.MINUTES:分钟
        • Precision.SECONDS:秒
        • Precision.MILLI_SECONDS:毫秒
        • Precision.MICRO_SECONDS:微秒
        • Precision.NANO_SECONDS: 纳秒
      • ttl:表过期时间,单位天。
      • reqId: 用于问题追踪,可选。
    • 异常:连接失败抛出 TaosResultError 异常。

参数绑定

  • async stmtInit(reqId?:number): Promise<WsStmt>
    • 接口说明 使用 WsSql 对象创建 stmt 对象。
    • 参数说明
      • reqId: 请求 id 非必填,用于问题追踪。
    • 返回值:stmt 对象。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async prepare(sql: string): Promise<void>
    • 接口说明 绑定预编译 sql 语句。
    • 参数说明
      • sql: 预编译的 SQL 语句。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async setTableName(tableName: string): Promise<void>
    • 接口说明 设置将要写入数据的表名。
    • 参数说明
      • tableName: 表名,如果需要指定数据库, 例如: db_name.table_name 即可。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。 通过 StmtBindParams 对象设置绑定数据。
  • setBoolean(params :any[])
    • 接口说明 设置布尔值。
    • 参数说明
      • params: 布尔类型列表。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • 下面接口除了要设置的值类型不同外,其余同 setBoolean:
    • setTinyInt(params :any[])
    • setUTinyInt(params :any[])
    • setSmallInt(params :any[])
    • setUSmallInt(params :any[])
    • setInt(params :any[])
    • setUInt(params :any[])
    • setBigint(params :any[])
    • setUBigint(params :any[])
    • setFloat(params :any[])
    • setDouble(params :any[])
    • setVarchar(params :any[])
    • setBinary(params :any[])
    • setNchar(params :any[])
    • setJson(params :any[])
    • setVarBinary(params :any[])
    • setGeometry(params :any[])
    • setTimestamp(params :any[])
  • async setTags(paramsArray:StmtBindParams): Promise<void>
    • 接口说明 设置表 Tags 数据,用于自动建表。
    • 参数说明
      • paramsArray: Tags 数据。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async bind(paramsArray:StmtBindParams): Promise<void>
    • 接口说明 绑定数据。
    • 参数说明
      • paramsArray: 绑定数据。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async batch(): Promise<void>
    • 接口说明 提交绑定数据。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • async exec(): Promise<void>
    • 接口说明 执行将绑定的数据全部写入。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。
  • getLastAffected()
    • 接口说明 获取写入条数。
    • 返回值:写入条数。
  • async close(): Promise<void>
    • 接口说明 关闭 stmt 对象。
    • 异常:连接失败抛出 TDWebSocketClientError 异常。

数据订阅

  • 创建消费者支持属性列表
    • taos.TMQConstants.CONNECT_USER: 用户名。
    • taos.TMQConstants.CONNECT_PASS: 密码。
    • taos.TMQConstants.GROUP_ID: 所在的 group。
    • taos.TMQConstants.CLIENT_ID: 客户端id。
    • taos.TMQConstants.WS_URL: taosAdapter 的url地址。
    • taos.TMQConstants.AUTO_OFFSET_RESET: 来确定消费位置为最新数据(latest)还是包含旧数据(earliest)。
    • taos.TMQConstants.ENABLE_AUTO_COMMIT: 是否允许自动提交。
    • taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS: 自动提交间隔。
    • taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT: 数据传输超时参数,单位 ms,默认为 10000 ms。
  • static async newConsumer(wsConfig:Map<string, any>):Promise<WsConsumer>
    • 接口说明 消费者构造函数。
    • 参数说明
      • wsConfig: 创建消费者属性配置。
    • 返回值:WsConsumer 消费者对象。
    • 异常:如果在执行过程中出现异常,抛出 TDWebSocketClientError 错误。
  • async subscribe(topics: Array<string>, reqId?:number): Promise<void>
    • 接口说明 订阅一组主题。
    • 参数说明
      • topics: 订阅的主题列表。
      • reqId: 请求 id 非必填,用于问题追踪。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async unsubscribe(reqId?:number): Promise<void>
    • 接口说明 取消订阅。
    • 参数说明
      • reqId: 请求 id 非必填,用于问题追踪。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async poll(timeoutMs: number, reqId?:number):Promise<Map<string, TaosResult>>
    • 接口说明 轮询消息。
    • 参数说明
      • timeoutMs: 表示轮询的超时时间,单位毫秒。
      • reqId: 请求 id 非必填,用于问题追踪。
    • 返回值Map<string, TaosResult> 每个主题对应的数据。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async subscription(reqId?:number):Promise<Array<string>>
    • 接口说明 获取当前订阅的所有主题。
    • 参数说明
      • reqId: 请求 id 非必填,用于问题追踪。
    • 返回值Array<string> 主题列表。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async commit(reqId?:number):Promise<Array<TopicPartition>>
    • 接口说明 提交当前处理的消息的偏移量。
    • 参数说明
      • reqId: 请求 id 非必填,用于问题追踪。
    • 返回值Array<TopicPartition> 每个主题的消费进度。
    • 异常:失败抛出 TDWebSocketClientError 异常。
  • async committed(partitions:Array<TopicPartition>, reqId?:number):Promise<Array<TopicPartition>>
    • 接口说明:获取一组分区最后提交的偏移量。
    • 参数说明
      • partitions:一个 Array<TopicPartition> 类型的参数,表示要查询的分区集合。
      • reqId: 请求 id 非必填,用于问题追踪。
    • 返回值Array<TopicPartition>,即一组分区最后提交的偏移量。
    • 异常:如果在获取提交的偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async seek(partition:TopicPartition, reqId?:number):Promise<void>
    • 接口说明:将给定分区的偏移量设置到指定的位置。
    • 参数说明
      • partition:一个 TopicPartition 类型的参数,表示要操作的分区和要设置的偏移量。
      • reqId: 请求 id 非必填,用于问题追踪。
    • 异常:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async positions(partitions:Array<TopicPartition>, reqId?:number):Promise<Array<TopicPartition>>
    • 接口说明:获取给定分区当前的偏移量。
    • 参数说明
      • partitions:一个 TopicPartition 类型的参数,表示要查询的分区。
      • reqId: 请求 id 非必填,用于问题追踪。
    • 返回值Array<TopicPartition>,即一组分区最后提交的偏移量。
    • 异常:如果在获取偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async seekToBeginning(partitions:Array<TopicPartition>):Promise<void>
    • 接口说明:将一组分区的偏移量设置到最早的偏移量。
    • 参数说明
      • partitions:一个 Array<TopicPartition> 类型的参数,表示要操作的分区集合。
    • 异常:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async seekToEnd(partitions:Array<TopicPartition>):Promise<void>
    • 接口说明:将一组分区的偏移量设置到最新的偏移量。
    • 参数说明
      • partitions:一个 Array<TopicPartition> 类型的参数,表示要操作的分区集合。
    • 异常:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async assignment(topics?:string[]):Promise<Array<TopicPartition>>
    • 接口说明:获取消费者当前分配的指定的分区或所有分区。
    • 参数说明
      • topics:需要获取的分区(非必填),不填表示获取全部的分区
    • 返回值:返回值类型为 Array<TopicPartition>,即消费者当前分配的所有分区。
    • 异常:如果在获取分配的分区过程中发生错误,将抛出 TDWebSocketClientError 异常。
  • async close():Promise<void>
    • 接口说明:关闭 tmq 连接。
    • 异常:操作失败抛出 TDWebSocketClientError 异常。