TDengine Node.js Connector
@tdengine/websocket
是 TDengine 的官方 Node.js 语言连接器。Node.js 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
Node.js 连接器源码托管在 GitHub。
Node.js 版本兼容性
支持 Node.js 14 及以上版本。
支持的平台
支持所有能运行 Node.js 的平台。
版本历史
Node.js 连接器 版本 | 主要变化 | TDengine 版本 |
---|---|---|
3.1.2 | 对数据协议和解析进行了优化,性能得到大幅提升 | - |
3.1.1 | 优化了数据传输性能 | 3.3.2.0 及更高版本 |
3.1.0 | 新版本发布,支持 WebSocket 连接 | 3.2.0.0 及更高版本 |
处理异常
在调用连接器 api 报错后,通过 try catch 可以获取到错误的信息和错误码。
错误说明:Node.js 连接器错误码在 100 到 110 之间,之外的错误为 TDengine 其他功能模块的报错。
具体的连接器错误码请参考:
Error Code | Description | Suggested Actions |
---|---|---|
100 | invalid variables | 参数不合法,请检查相应接口规范,调整参数类型及大小。 |
101 | invalid url | url 错误,请检查 url 是否填写正确。 |
102 | received server data but did not find a callback for processing | 接收到服务端数据但没有找到上层回调 |
103 | invalid message type | 接收到的消息类型无法识别,请检查服务端是否正常。 |
104 | connection creation failed | 连接创建失败,请检查网络是否正常。 |
105 | websocket request timeout | 请求超时 |
106 | authentication fail | 认证失败,请检查用户名,密码是否正确。 |
107 | unknown sql type in tdengine | 请检查 TDengine 支持的 Data Type 类型。 |
108 | connection has been closed | 连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。 |
109 | fetch block data parse fail | 获取到的查询数据,解析失败 |
110 | websocket connection has reached its maximum limit | WebSocket 连接达到上限 |
- TDengine Node.js Connector Error Code
- TDengine 其他功能模块的报错,请参考 错误码
数据类型映射
下表为 TDengine DataType 和 Node.js DataType 之间的映射关系
TDengine DataType | Node.js DataType |
---|---|
TIMESTAMP | bigint |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | bigint |
TINYINT UNSIGNED | number |
SMALLINT UNSIGNED | number |
INT UNSIGNED | number |
BIGINT UNSIGNED | bigint |
FLOAT | number |
DOUBLE | number |
BOOL | boolean |
BINARY | string |
NCHAR | string |
JSON | string |
VARBINARY | ArrayBuffer |
GEOMETRY | ArrayBuffer |
注意:JSON 类型仅在 tag 中支持。
更多示例程序
示例程序 | 示例程序描述 |
---|---|
sql_example | 基本的使用如如建立连接,执行 SQL 等操作。 |
stmt_example | 绑定参数插入的示例。 |
line_example | 行协议写入示例。 |
tmq_example | 订阅的使用示例。 |
all_type_query | 支持全部类型示例。 |
all_type_stmt | 参数绑定支持全部类型示例。 |
使用限制
- Node.js 连接器(
@tdengine/websocket
)支持 Node.js 14 以上版本,低于 14 的版本可能存在包兼容性的问题。 - 目前只支持 WebSocket 连接,需要提前启动 taosAdapter
- 使用连接器结束后,需要调用 taos.connectorDestroy(); 释放连接器资源。
常见问题
-
"Unable to establish connection" 或 "Unable to resolve FQDN"
原因:一般都是因为配置 FQDN 不正确。 可以参考如何彻底搞懂 TDengine 的 FQDN 。
API 参考
Node.js 连接器(@tdengine/websocket
), 其通过 taosAdapter 提供的 WebSocket 接口连接 TDengine 实例。
URL 规范
[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------------|---|-----------|-----------|------|------|------------|-----------------------|
| protocol | | username | password | host | port | database | params |
-
protocol: 使用 websocket 协议建立连接。例如
ws://localhost:6041
-
username/password: 数据库的用户名和密码。
-
host/port: 主机地址和端口号。例如
localhost:6041
-
database: 数据库名称。
-
params: 其他参数。 例如token。
-
完整 URL 示例:
ws://root:taosdata@localhost:6041
WSConfig
除了通过指定的 URL 获取连接,还可以使用 WSConfig 指定建立连接时的参数。
try {
let url = 'ws://127.0.0.1:6041'
let conf = WsSql.NewConfig(url)
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb('db')
conf.setTimeOut(500)
let wsSql = await WsSql.open(conf);
} catch (e) {
console.error(e);
}
WSConfig 中的配置如下:
- setUrl(url string) 设置 taosAdapter 连接地址 url,详见上文 URL 规范。
- setUser(user: string) 设置数据库用户名。
- setPwd(pws:string) 设置数据库密码。
- setDb(db: string) 设置数据库名称。
- setTimeOut(ms : number) 设置连接超时,单位毫秒。
- setToken(token: string) 设置 taosAdapter 认证token。
连接功能
static async open(wsConfig:WSConfig):Promise<WsSql>
- 接口说明:建立 taosAdapter 连接。
- 参数说明:
wsConfig
:连接配置,详见上文 WSConfig 章节。
- 返回值:连接对象。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
destroyed()
- 接口说明:释放销毁资源。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
获取 taosc 版本号
async version(): Promise<string>
- 接口说明:获取 taosc 客户端版本。
- 返回值:taosc 客户端版本号。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
执行 SQL
async exec(sql: string, reqId?: number): Promise<TaosResult>
- 接口说明:执行 sql 语句。
- 参数说明:
sql
:待执行的 sql 语句。reqId
: 请求 id 非必填,用于问题追踪。
- 返回值:执行结果
TaosResult {
affectRows: number, 影响的条数
timing: number, 执行时长
totalTime: number, 响应总时长
} - 异常:连接失败抛出
TDWebSocketClientError
异常。
async query(sql: string, reqId?:number): Promise<WSRows>
- 接口说明:查询数据。
- 参数说明:
sql
:待执行的查询 sql 语句。reqId
: 请求 id 非必填,用于问题追踪。
- 返回值:WSRows 数据集对象。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
数据集
getMeta():Array<TDengineMeta> | null
- 接口说明:获取查询结果的列的数量、类型和长度。
- 返回值:TDengineMeta 数据对象数组。
export interface TDengineMeta {
name: string,
type: string,
length: number,
}async next(): Promise<boolean>
- 接口说明:将游标从当前位置向后移动一行。用于遍历查询结果集。
- 返回值:如果新的当前行有效,则返回 true;如果结果集中没有更多行,则返回 false。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
getData(): Array<any>
- 接口说明:返回查询的一行数据。
- 返回值:返回查询的一行数据,此接口需要搭配 next 接口一起使用。
async close():Promise<void>
- 接口说明:数据读取完成后,释放结果集。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
无模式写入
async schemalessInsert(lines: Array<string>, protocol: SchemalessProto, precision: Precision, ttl: number, reqId?: number): Promise<void>
- 接口说明:无模式写入。
- 参数说明:
lines
:待写入的数据数组,无模式具体的数据格式可参考Schemaless 写入
。protocol
: 协议类型SchemalessProto.InfluxDBLineProtocol
:InfluxDB 行协议(Line Protocol)。SchemalessProto.OpenTSDBTelnetLineProtocol
:OpenTSDB 文本行协议。SchemalessProto.OpenTSDBJsonFormatProtocol
:JSON 协议格式。
precision
: 时间精度Precision.HOURS
: 小时Precision.MINUTES
:分钟Precision.SECONDS
:秒Precision.MILLI_SECONDS
: 毫秒Precision.MICRO_SECONDS
:微秒Precision.NANO_SECONDS
: 纳秒
ttl
:表过期时间,单位天。reqId
: 用于问题追踪,可选。
- 异常:连接失败抛出
TaosResultError
异常。
参数绑定
async stmtInit(reqId?:number): Promise<WsStmt>
- 接口说明 使用 WsSql 对象创建 stmt 对象。
- 参数说明:
reqId
: 请求 id 非必填,用于问题追踪。
- 返回值:stmt 对象。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
async prepare(sql: string): Promise<void>
- 接口说明 绑定预编译 sql 语句。
- 参数说明:
sql
: 预编译的 SQL 语句。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
async setTableName(tableName: string): Promise<void>
- 接口说明 设置将要写入数据的表名。
- 参数说明:
tableName
: 表名,如果需要指定数据库, 例如:db_name.table_name
即可。
- 异常:连接失败抛出
TDWebSocketClientError
异常。 通过 StmtBindParams 对象设置绑定数据。
setBoolean(params :any[])
- 接口说明 设置布尔值。
- 参数 说明:
params
: 布尔类型列表。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
- 下面接口除了要设置的值类型不同外,其余同 setBoolean:
setTinyInt(params :any[])
setUTinyInt(params :any[])
setSmallInt(params :any[])
setUSmallInt(params :any[])
setInt(params :any[])
setUInt(params :any[])
setBigint(params :any[])
setUBigint(params :any[])
setFloat(params :any[])
setDouble(params :any[])
setVarchar(params :any[])
setBinary(params :any[])
setNchar(params :any[])
setJson(params :any[])
setVarBinary(params :any[])
setGeometry(params :any[])
setTimestamp(params :any[])
async setTags(paramsArray:StmtBindParams): Promise<void>
- 接口说明 设置表 Tags 数据,用于自动建表。
- 参数说明:
paramsArray
: Tags 数据。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
async bind(paramsArray:StmtBindParams): Promise<void>
- 接口说明 绑定数据。
- 参数说明:
paramsArray
: 绑定数据。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
async batch(): Promise<void>
- 接口说明 提交绑定数据。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
async exec(): Promise<void>
- 接口说明 执行将绑定的数据全部写入。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
getLastAffected()
- 接口说明 获取写入条数。
- 返回值:写入条数。
async close(): Promise<void>
- 接口说明 关闭 stmt 对象。
- 异常:连接失败抛出
TDWebSocketClientError
异常。
数据订阅
- 创建消费者支持属性列表:
- taos.TMQConstants.CONNECT_USER: 用户名。
- taos.TMQConstants.CONNECT_PASS: 密码。
- taos.TMQConstants.GROUP_ID: 所在的 group。
- taos.TMQConstants.CLIENT_ID: 客户端id。
- taos.TMQConstants.WS_URL: taosAdapter 的url地址。
- taos.TMQConstants.AUTO_OFFSET_RESET: 来确定消费位置为最新数据(latest)还是包含旧数据(earliest)。
- taos.TMQConstants.ENABLE_AUTO_COMMIT: 是否允许自动提交。
- taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS: 自动提交间隔。
- taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT: 数据传输超时参数,单位 ms,默认为 10000 ms。
static async newConsumer(wsConfig:Map<string, any>):Promise<WsConsumer>
- 接口说明 消费者构造函数。
- 参数说明:
wsConfig
: 创建消费者属性配置。
- 返回值:WsConsumer 消费者对象。
- 异常:如果在执行过程中出现异常,抛出
TDWebSocketClientError
错误。
async subscribe(topics: Array<string>, reqId?:number): Promise<void>
- 接口说明 订阅一组主题。
- 参数说明:
topics
: 订阅的主题列表。reqId
: 请求 id 非必填,用于问题追踪。
- 异常:失败抛出
TDWebSocketClientError
异常。
async unsubscribe(reqId?:number): Promise<void>
- 接口说明 取消订阅。
- 参数说明:
reqId
: 请求 id 非必填,用于问题追踪。
- 异常:失败抛出
TDWebSocketClientError
异常。
async poll(timeoutMs: number, reqId?:number):Promise<Map<string, TaosResult>>
- 接口说明 轮询消息。
- 参数说明:
timeoutMs
: 表示轮询的超时时间,单位毫秒。reqId
: 请求 id 非必填,用于问题追踪。
- 返回值:
Map<string, TaosResult>
每个主题对应的数据。 - 异常:失败抛出
TDWebSocketClientError
异常。
async subscription(reqId?:number):Promise<Array<string>>
- 接口说明 获取当前订阅的所有主题。
- 参数说明:
reqId
: 请求 id 非必填,用于问题追踪。
- 返回值:
Array<string>
主题列表。 - 异常:失败抛出
TDWebSocketClientError
异常。
async commit(reqId?:number):Promise<Array<TopicPartition>>
- 接口说明 提交当前处理的消息的偏移量。
- 参数说明:
reqId
: 请求 id 非必填,用于问题追踪。
- 返回值:
Array<TopicPartition>
每个主题的消费进度。 - 异常:失败抛出
TDWebSocketClientError
异常。
async committed(partitions:Array<TopicPartition>, reqId?:number):Promise<Array<TopicPartition>>
- 接口说明:获取一组分区最后提交的偏移量。
- 参数说明:
partitions
:一个Array<TopicPartition>
类型的参数,表示要查询的分区集合。reqId
: 请求 id 非必填,用于问题追踪。
- 返回值:
Array<TopicPartition>
,即一组分区最后提交的偏移量。 - 异常:如果在获取提交的偏移量过程中发生错误,将抛出
TDWebSocketClientError
异常。
async seek(partition:TopicPartition, reqId?:number):Promise<void>
- 接口说明:将给定分区的偏移量设置到指定的位置。
- 参数说明:
partition
:一个TopicPartition
类型的参数,表示要操作的分区和要设置的偏移量。reqId
: 请求 id 非必填,用于问题追踪。
- 异常:如果在设置偏移量过程中发生错误,将抛出
TDWebSocketClientError
异常。
async positions(partitions:Array<TopicPartition>, reqId?:number):Promise<Array<TopicPartition>>
- 接口说明:获取给定分区当前的偏移量。
- 参数说明:
partitions
:一个TopicPartition
类型的参数,表示要查询的分区。reqId
: 请求 id 非必填,用于问题追踪。
- 返回值:
Array<TopicPartition>
,即一组分区最后提交的偏移量。 - 异常:如果在获取偏移量过程中发生错误,将抛出
TDWebSocketClientError
异常。
async seekToBeginning(partitions:Array<TopicPartition>):Promise<void>
- 接口说明:将一组分区的偏移量设置到最早的偏移量。
- 参数说明:
partitions
:一个Array<TopicPartition>
类型的参数,表示要操作的分区集合。
- 异常:如果在设置偏移量过程中发生错误,将抛出
TDWebSocketClientError
异常。
async seekToEnd(partitions:Array<TopicPartition>):Promise<void>
- 接口说明:将一组分区的偏移量设置到最新的偏移量。
- 参数说明:
partitions
:一个Array<TopicPartition>
类型的参数,表示要操作的分区集合。
- 异常:如果在设置偏移量过程中发生错误,将抛出
TDWebSocketClientError
异常。
async assignment(topics?:string[]):Promise<Array<TopicPartition>>
- 接口说明:获取消费者当前分配的指定的分区或所有分区。
- 参数说明:
topics
:需要获取的分区(非必填),不 填表示获取全部的分区
- 返回值:返回值类型为
Array<TopicPartition>
,即消费者当前分配的所有分区。 - 异常:如果在获取分配的分区过程中发生错误,将抛出
TDWebSocketClientError
异常。
async close():Promise<void>
- 接口说明:关闭 tmq 连接。
- 异常:操作失败抛出
TDWebSocketClientError
异常。