跳到主要内容

TDengine Go Connector

driver-go 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 database/sql 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。

Go 版本兼容性

支持 Go 1.14 及以上版本。

支持的平台

  • 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
  • WebSocket/REST 连接支持所有能运行 Go 的平台。

版本历史

driver-go 版本主要变化TDengine 版本
v3.5.8修复空指针异常-
v3.5.7taosWS 和 taosRestful 支持传入 request id-
v3.5.6提升 websocket 查询和写入性能3.3.2.0 及更高版本
v3.5.5restful 支持跳过 ssl 证书检查-
v3.5.4兼容 TDengine 3.3.0.0 tmq raw data-
v3.5.3重构 taosWS-
v3.5.2websocket 压缩和优化消息订阅性能3.2.3.0 及更高版本
v3.5.1原生 stmt 查询和 geometry 类型支持3.2.1.0 及更高版本
v3.5.0获取消费进度及按照指定进度开始消费3.0.5.0 及更高版本
v3.3.1基于 websocket 的 schemaless 协议写入3.0.4.1 及更高版本
v3.1.0提供贴近 kafka 的订阅 api-
v3.0.4新增 request id 相关接口3.0.2.2 及更高版本
v3.0.3基于 websocket 的 statement 写入-
v3.0.2基于 websocket 的数据查询和写入3.0.1.5 及更高版本
v3.0.1基于 websocket 的消息订阅-
v3.0.0适配 TDengine 3.0 查询和写入3.0.0.0 及更高版本

异常和错误码

如果是 TDengine 错误可以通过以下方式获取错误码和错误信息。

// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}

TDengine 其他功能模块的报错,请参考 错误码

数据类型映射

TDengine DataTypeGo Type
TIMESTAMPtime.Time
TINYINTint8
SMALLINTint16
INTint32
BIGINTint64
TINYINT UNSIGNEDuint8
SMALLINT UNSIGNEDuint16
INT UNSIGNEDuint32
BIGINT UNSIGNEDuint64
FLOATfloat32
DOUBLEfloat64
BOOLbool
BINARYstring
NCHARstring
JSON[]byte
GEOMETRY[]byte
VARBINARY[]byte

注意:JSON 类型仅在 tag 中支持。 GEOMETRY类型是 little endian 字节序的二进制数据,符合 WKB 规范。详细信息请参考 数据类型 WKB规范请参考Well-Known Binary (WKB)

示例程序汇总

示例程序源码请参考:示例程序

常见问题

  1. database/sql 中 stmt(参数绑定)相关接口崩溃

    REST 不支持参数绑定相关接口,建议使用db.Execdb.Query

  2. 使用 use db 语句后执行其他语句报错 [0x217] Database not specified or available

    在 REST 接口中 SQL 语句的执行无上下文关联,使用 use db 语句不会生效,解决办法见上方使用限制章节。

  3. 使用 taosSql 不报错使用 taosRestful 报错 [0x217] Database not specified or available

    因为 REST 接口无状态,使用 use db 语句不会生效,解决办法见上方使用限制章节。

  4. readBufferSize 参数调大后无明显效果

    readBufferSize 调大后会减少获取结果时 syscall 的调用。如果查询结果的数据量不大,修改该参数不会带来明显提升,如果该参数修改过大,瓶颈会在解析 JSON 数据。如果需要优化查询速度,需要根据实际情况调整该值来达到查询效果最优。

  5. disableCompression 参数设置为 false 时查询效率降低

    disableCompression 参数设置为 false 时查询结果会使用 gzip 压缩后传输,拿到数据后要先进行 gzip 解压。

  6. go get 命令无法获取包,或者获取包超时

设置 Go 代理 go env -w GOPROXY=https://goproxy.cn,direct

API 参考

database/sql 驱动

driver-go 实现了 Go 的 database/sql/driver 接口,可以直接使用 Go 的 database/sql 包。提供了三个驱动:github.com/taosdata/driver-go/v3/taosSqlgithub.com/taosdata/driver-go/v3/taosRestfulgithub.com/taosdata/driver-go/v3/taosWS 分别对应 原生连接REST 连接WebSocket 连接

DSN 规范

数据源名称具有通用格式,例如 PEAR DB,但没有类型前缀(方括号表示可选):

[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...&paramN=valueN]

完整形式的 DSN:

username:password@protocol(address)/dbname?param=value
原生连接

导入驱动:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)

使用 taosSql 作为 driverName 并且使用一个正确的 DSN 作为 dataSourceName 如下:

var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)

支持的 DSN 参数:

  • cfg 指定 taos.cfg 目录
  • cgoThread 指定 cgo 同时执行的数量,默认为系统核数
  • cgoAsyncHandlerPoolSize 指定异步函数的 handle 大小,默认为 10000
Rest 连接

导入驱动:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)

使用 taosRestful 作为 driverName 并且使用一个正确的 DSN 作为 dataSourceName 如下:

var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)

支持的 DSN 参数:

  • disableCompression 是否接受压缩数据,默认为 true 不接受压缩数据,如果传输数据使用 gzip 压缩设置为 false。
  • readBufferSize 读取数据的缓存区大小默认为 4K(4096),当查询结果数据量多时可以适当调大该值。
  • token 连接云服务时使用的 token。
  • skipVerify 是否跳过证书验证,默认为 false 不跳过证书验证,如果连接的是不安全的服务设置为 true。
WebSocket 连接

导入驱动:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosWS"
)

使用 taosWS 作为 driverName 并且使用一个正确的 DSN 作为 dataSourceName 如下:

var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)

支持的 DSN 参数:

  • enableCompression 是否发送压缩数据,默认为 false 不发送压缩数据,如果传输数据使用压缩设置为 true。
  • readTimeout 读取数据的超时时间,默认为 5m。
  • writeTimeout 写入数据的超时时间,默认为 10s。
备注
  • 与原生连接方式不同,REST 接口是无状态的。在使用 REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。
  • 如果在 DSN 中指定了 dbname,那么,REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。

连接功能

Go 驱动支持创建连接,返回支持 sql/driver 标准的 Connector 接口的对象,还提供了 af 包,扩充了一些无模式写入接口。

标准接口

database/sql 包中创建连接的接口

  • func Open(driverName, dataSourceName string) (*DB, error)
    • 接口说明:(database/sql)连接数据库
    • 参数说明
      • driverName:驱动名称。
      • dataSourceName:连接参数 DSN。
    • 返回值:连接对象,错误信息。

扩展接口

af 包中创建连接的接口

  • func Open(host, user, pass, db string, port int) (*Connector, error)
    • 接口说明:连接数据库。
    • 参数说明
      • host:主机地址。
      • user:用户名。
      • pass:密码。
      • db:数据库名称。
      • port:端口号。
    • 返回值:连接对象,错误信息。

无模式写入

af 包中使用原生连接进行无模式写入的接口。

  • func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error

    • 接口说明:无模式写入 influxDB 格式数据。
    • 参数说明
      • lines:写入的数据。
      • precision:时间精度。
    • 返回值:错误信息。
  • func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error

    • 接口说明:无模式写入 OpenTSDB JSON 格式数据。
    • 参数说明
      • payload:写入的数据。
    • 返回值:错误信息。
  • func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error

    • 接口说明:无模式写入 OpenTSDB Telnet 格式数据。
    • 参数说明
      • lines:写入的数据。
    • 返回值:错误信息。

ws/schemaless 包中使用 WebSocket 无模式写入的接口

  • func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
    • 接口说明:无模式写入数据。
    • 参数说明
      • lines:写入的数据。
      • protocol:写入的数据协议支持的协议 InfluxDBLineProtocol = 1 OpenTSDBTelnetLineProtocol = 2 OpenTSDBJsonFormatProtocol = 3
      • precision:时间精度。
      • ttl:数据过期时间,0 表示不过期。
      • reqID:请求 ID。
    • 返回值:错误信息。

执行 SQL

Go 驱动提供了符合 database/sql 标准的接口,支持以下功能:

  1. 执行 SQL 语句:执行静态 SQL 语句,并返回其生成的结果对象。
  2. 查询执行:可以执行返回数据集的查询(SELECT 语句)。
  3. 更新执行:可以执行影响行数的 SQL 语句,如 INSERTUPDATEDELETE 等。
  4. 获取结果:可以获取查询执行后返回的结果集,并遍历查询返回的数据。
  5. 获取更新计数:对于非查询 SQL 语句,可以获取执行后影响的行数。
  6. 关闭资源:释放数据库资源。

标准接口

  • func (db *DB) Close() error

    • 接口说明:关闭连接。
    • 返回值:错误信息。
  • func (db *DB) Exec(query string, args ...any) (Result, error)

    • 接口说明:执行查询但不返回任何行。
    • 参数说明
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:Result 对象(只有影响行数),错误信息。
  • func (db *DB) Query(query string, args ...any) (*Rows, error)

    • 接口说明:执行查询并返回行的结果。
    • 参数说明
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:Rows 对象,错误信息。
  • func (db *DB) QueryRow(query string, args ...any) *Row

    • 接口说明:执行查询并返回一行结果。
    • 参数说明
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:Row 对象。

扩展接口

  • func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

    • 接口说明:执行查询但不返回任何行。
    • 参数说明
      • ctx:上下文,使用 Value 传递请求 id 进行链路追踪,key 为 taos_req_id value 为 int64 类型值。
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:结果 Result 对象(只有影响行数),错误信息。
  • func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

    • 接口说明:执行查询并返回行结果。
    • 参数说明
      • ctx:上下文,使用 Value 传递请求 id 进行链路追踪,key 为 taos_req_id value 为 int64 类型值。
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:结果集 Rows 对象,错误信息。
  • func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row

    • 接口说明:执行查询并返回一行结果,错误信息会在扫描 Row 时延迟返回。
    • 参数说明
      • ctx:上下文,使用 Value 传递请求 id 进行链路追踪,key 为 taos_req_id value 为 int64 类型值。
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:单行结果 Row 对象。

结果获取

Go 驱动支持获取查询结果集,以及对应的结果集元数据,提供了用于读取结果集中元数据和数据的方法。

结果集

通过 Rows 对象获取查询结果集,提供了以下方法:

  • func (rs *Rows) Next() bool

    • 接口说明:准备下一行数据。
    • 返回值:是否有下一行数据。
  • func (rs *Rows) Columns() ([]string, error)

    • 接口说明:返回列名。
    • 返回值:列名,错误信息。
  • func (rs *Rows) Scan(dest ...any) error

    • 接口说明:将当前行的列值复制到 dest 指向的值中。
    • 参数说明
      • dest:目标值。
    • 返回值:错误信息。
  • func (rs *Rows) Close() error

    • 接口说明:关闭行。
    • 返回值:错误信息。
  • func (r *Row) Scan(dest ...any) error

    • 接口说明:将当前行的列值复制到 dest 指向的值中。
    • 参数说明
      • dest:目标值。
    • 返回值:错误信息。

通过 Result 对象获取更新结果集,提供了以下方法:

  • func (dr driverResult) RowsAffected() (int64, error)
    • 接口说明:返回受影响的行数。
    • 返回值:受影响的行数,错误信息。

结果集元数据

通过 Rows 对象获取查询结果集元数据,提供了以下方法:

  • func (rs *Rows) ColumnTypes() ([]*ColumnType, error)

    • 接口说明:返回列类型。
    • 返回值:列类型,错误信息。
  • func (ci *ColumnType) Name() string

    • 接口说明:返回列名。
    • 返回值:列名。
  • func (ci *ColumnType) Length() (length int64, ok bool)

    • 接口说明:返回列长度。
    • 返回值:列长度,是否有长度。
  • func (ci *ColumnType) ScanType() reflect.Type

    • 接口说明:返回列类型对应的 Go 类型。
    • 返回值:列类型。
  • func (ci *ColumnType) DatabaseTypeName() string

    • 接口说明:返回列类型数据库名称。
    • 返回值:列类型名称。

参数绑定

Prepare 允许使用预编译的 SQL 语句,可以提高性能并提供参数化查询的能力,从而增加安全性。

标准接口

使用 sql/driverConn 接口中的 Prepare 方法准备一个与此连接绑定的准备好的语句,返回 Stmt 对象,使用。

  • Prepare(query string) (Stmt, error)

    • 接口说明:准备返回一个与此连接绑定的准备好的语句(statement)。
    • 参数说明
      • query:要进行参数绑定的语句。
    • 返回值:Stmt 对象,错误信息。
  • func (s *Stmt) Exec(args ...any) (Result, error)

    • 接口说明:使用给定的参数执行准备好的语句并返回总结该语句效果的结果(只可以绑定列值,不支持绑定表名和 tag)。
    • 参数说明
      • args:命令参数,Go 原始类型会自动转换数据库类型,类型不匹配可能会丢精度,建议使用与数据库相同的类型,时间类型使用 int64 或 RFC3339Nano 格式化后的字符串。
    • 返回值:结果 Result 对象(只有影响行数),错误信息。
  • func (s *Stmt) Query(args ...any) (*Rows, error)

    • 接口说明:使用给定的参数执行准备好的语句并返回行的结果。
    • 参数说明
      • args:命令参数,Go 原始类型会自动转换数据库类型,类型不匹配可能会丢精度,建议使用与数据库相同的类型,时间类型使用 int64 或 RFC3339Nano 格式化后的字符串。
    • 返回值:结果集 Rows 对象,错误信息。
  • func (s *Stmt) Close() error

    • 接口说明:关闭语句。
    • 返回值:错误信息。

扩展接口

af 包中提供了使用原生连接进行参数绑定的更多接口

  • func (conn *Connector) Stmt() *Stmt

    • 接口说明:返回一个与此连接绑定的 Stmt 对象。
    • 返回值:Stmt 对象。
  • func (s *Stmt) Prepare(sql string) error

    • 接口说明:准备一个 sql 。
    • 参数说明
      • sql:要进行参数绑定的语句。
    • 返回值:错误信息。
  • func (s *Stmt) NumParams() (int, error)

    • 接口说明:返回参数数量。
    • 返回值:参数数量,错误信息。
  • func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param)

    • 接口说明:设置表名和 tag。
    • 参数说明
      • tableName:表名。
      • tags:tag。
    • 返回值:错误信息。
  • func (s *Stmt) SetTableName(tableName string) error

    • 接口说明:设置表名。
    • 参数说明
      • tableName:表名。
    • 返回值:错误信息。
  • func (s *Stmt) BindRow(row *param.Param) error

    • 接口说明:绑定行。
    • 参数说明
      • row:行数据。
    • 返回值:错误信息。
  • func (s *Stmt) GetAffectedRows() int

    • 接口说明:获取受影响的行数。
    • 返回值:受影响的行数。
  • func (s *Stmt) AddBatch() error

    • 接口说明:添加批处理。
    • 返回值:错误信息。
  • func (s *Stmt) Execute() error

    • 接口说明:执行批处理。
    • 返回值:错误信息。
  • func (s *Stmt) UseResult() (driver.Rows, error)

    • 接口说明:使用结果。
    • 返回值:结果集 Rows 对象,错误信息。
  • func (s *Stmt) Close() error

    • 接口说明:关闭语句。
    • 返回值:错误信息。

ws/stmt 包提供了通过 WebSocket 进行参数绑定的接口

  • func (c *Connector) Init() (*Stmt, error)

    • 接口说明:初始化。
    • 返回值:Stmt 对象,错误信息。
  • func (s *Stmt) Prepare(sql string) error

    • 接口说明:准备一个 sql 。
    • 参数说明
      • sql:要进行参数绑定的语句。
    • 返回值:错误信息。
  • func (s *Stmt) SetTableName(name string) error

    • 接口说明:设置表名。
    • 参数说明
      • name:表名。
    • 返回值:错误信息。
  • func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)

    • 接口说明:设置 tag。
    • 参数说明
      • tags:tag。
      • bindType:类型信息。
    • 返回值:错误信息。
  • func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error

    • 接口说明:绑定参数。
    • 参数说明
      • params:参数。
      • bindType:类型信息。
    • 返回值:错误信息。
  • func (s *Stmt) AddBatch() error

    • 接口说明:添加批处理。
    • 返回值:错误信息。
  • func (s *Stmt) Exec() error

    • 接口说明:执行批处理。
    • 返回值:错误信息。
  • func (s *Stmt) GetAffectedRows() int

    • 接口说明:获取受影响的行数。
    • 返回值:受影响的行数。
  • func (s *Stmt) UseResult() (*Rows, error)

    • 接口说明:使用结果。
    • 返回值:Rows 对象,错误信息。
  • func (s *Stmt) Close() error

    • 接口说明:关闭语句。
    • 返回值:错误信息。

Rows 行结果参考 sql/driver 包中的 Rows 接口,提供以下接口

  • func (rs *Rows) Columns() []string

    • 接口说明:返回列名。
    • 返回值:列名。
  • func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string

    • 接口说明:返回列类型数据库名称。
    • 参数说明
      • i:列索引。
    • 返回值:列类型名称。
  • func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)

    • 接口说明:返回列长度。
    • 参数说明
      • i:列索引。
    • 返回值:列长度,是否有长度。
  • func (rs *Rows) ColumnTypeScanType(i int) reflect.Type

    • 接口说明:返回列类型对应的 Go 类型。
    • 参数说明
      • i:列索引。
    • 返回值:列类型。
  • func (rs *Rows) Next(dest []driver.Value) error

    • 接口说明:准备下一行数据,并赋值给目标。
    • 参数说明
      • dest:目标值。
    • 返回值:错误信息。
  • func (rs *Rows) Close() error

    • 接口说明:关闭行。
    • 返回值:错误信息。

common/param 包中提供了参数绑定数据结构

以下是按照偏移设置参数的接口:

  • func NewParam(size int) *Param

    • 接口说明:创建一个参数绑定数据结构。
    • 参数说明
      • size:参数数量。
    • 返回值:Param 对象。
  • func (p *Param) SetBool(offset int, value bool)

    • 接口说明:设置布尔值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:布尔值。
  • func (p *Param) SetNull(offset int)

    • 接口说明:设置空值。
    • 参数说明
      • offset:偏移量(列或标签)。
  • func (p *Param) SetTinyint(offset int, value int)

    • 接口说明:设置 Tinyint 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Tinyint 值。
  • func (p *Param) SetSmallint(offset int, value int)

    • 接口说明:设置 Smallint 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Smallint 值。
  • func (p *Param) SetInt(offset int, value int)

    • 接口说明:设置 Int 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Int 值。
  • func (p *Param) SetBigint(offset int, value int)

    • 接口说明:设置 Bigint 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Bigint 值。
  • func (p *Param) SetUTinyint(offset int, value uint)

    • 接口说明:设置 UTinyint 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:UTinyint 值。
  • func (p *Param) SetUSmallint(offset int, value uint)

    • 接口说明:设置 USmallint 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:USmallint 值。
  • func (p *Param) SetUInt(offset int, value uint)

    • 接口说明:设置 UInt 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:UInt 值。
  • func (p *Param) SetUBigint(offset int, value uint)

    • 接口说明:设置 UBigint 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:UBigint 值。
  • func (p *Param) SetFloat(offset int, value float32)

    • 接口说明:设置 Float 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Float 值。
  • func (p *Param) SetDouble(offset int, value float64)

    • 接口说明:设置 Double 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Double 值。
  • func (p *Param) SetBinary(offset int, value []byte)

    • 接口说明:设置 Binary 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Binary 值。
  • func (p *Param) SetVarBinary(offset int, value []byte)

    • 接口说明:设置 VarBinary 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:VarBinary 值。
  • func (p *Param) SetNchar(offset int, value string)

    • 接口说明:设置 Nchar 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Nchar 值。
  • func (p *Param) SetTimestamp(offset int, value time.Time, precision int)

    • 接口说明:设置 Timestamp 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Timestamp 值。
      • precision:时间精度。
  • func (p *Param) SetJson(offset int, value []byte)

    • 接口说明:设置 Json 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Json 值。
  • func (p *Param) SetGeometry(offset int, value []byte)

    • 接口说明:设置 Geometry 值。
    • 参数说明
      • offset:偏移量(列或标签)。
      • value:Geometry 值。

以下是链式调用设置参数的接口:

  • func (p *Param) AddBool(value bool) *Param
    • 接口说明:添加布尔值。
    • 参数说明
      • value:布尔值。
    • 返回值:Param 对象。

其他类型与布尔值类似,具体接口如下:

  • AddNull
  • AddTinyint
  • AddSmallint
  • AddInt
  • AddBigint
  • AddUTinyint
  • AddUSmallint
  • AddUInt
  • AddUBigint
  • AddFloat
  • AddDouble
  • AddBinary
  • AddVarBinary
  • AddNchar
  • AddTimestamp
  • AddJson
  • AddGeometry

以下是设置列类型信息的接口:

  • func NewColumnType(size int) *ColumnType

    • 接口说明:创建一个列类型信息数据结构。
    • 参数说明
      • size:列数量。
    • 返回值:ColumnType 对象。
  • func (c *ColumnType) AddBool() *ColumnType

    • 接口说明:添加布尔类型。
    • 返回值:ColumnType 对象。

其他类型与布尔类型类似,具体接口如下:

  • AddTinyint
  • AddSmallint
  • AddInt
  • AddBigint
  • AddUTinyint
  • AddUSmallint
  • AddUInt
  • AddUBigint
  • AddFloat
  • AddDouble
  • AddBinary
  • AddVarBinary
  • AddNchar
  • AddTimestamp
  • AddJson
  • AddGeometry

数据订阅

Go 驱动支持数据订阅功能,提供了基于原生连接和 WebSocket 连接的数据订阅接口。原生实现在 af/tmq 包中,WebSocket 实现在 ws/tmq 包中。

消费者

  • func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
    • 接口说明:创建一个消费者。
    • 参数说明
      • conf:配置信息。
    • 返回值:Consumer 对象,错误信息。

配置信息定义为:

type ConfigValue interface{}
type ConfigMap map[string]ConfigValue

创建消费者支持属性列表:

  • ws.url:WebSocket 连接地址。
  • ws.message.channelLen:WebSocket 消息通道缓存长度,默认 0。
  • ws.message.timeout:WebSocket 消息超时时间,默认 5m。
  • ws.message.writeWait:WebSocket 写入消息超时时间,默认 10s。
  • ws.message.enableCompression:WebSocket 是否启用压缩,默认 false。
  • ws.autoReconnect:WebSocket 是否自动重连,默认 false。
  • ws.reconnectIntervalMs:WebSocket 重连间隔时间毫秒,默认 2000。
  • ws.reconnectRetryCount:WebSocket 重连重试次数,默认 3。

其他参数请参考:Consumer 参数列表, 注意TDengine服务端自 3.2.0.0 版本开始消息订阅中的 auto.offset.reset 默认值发生变化。

  • func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

    • 接口说明:订阅主题。
    • 参数说明
      • topic:主题。
      • rebalanceCb:平衡回调(未使用)。
    • 返回值:错误信息。
  • func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error

    • 接口说明:订阅主题列表。
    • 参数说明
      • topics:主题列表。
      • rebalanceCb:平衡回调(未使用)。
    • 返回值:错误信息。
  • func (c *Consumer) Unsubscribe() error

    • 接口说明:取消订阅。
    • 返回值:错误信息。
  • func (c *Consumer) Poll(timeoutMs int) tmq.Event

    • 接口说明:轮询事件。
    • 参数说明
      • timeoutMs:超时时间。
    • 返回值:事件。
  • func (c *Consumer) Commit() ([]tmq.TopicPartition, error)

    • 接口说明:提交偏移量。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)

    • 接口说明:获取分配信息。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error

    • 接口说明:跳转到偏移量。
    • 参数说明
      • partition:分区和偏移信息。
      • ignoredTimeoutMs:超时时间(未使用)。
    • 返回值:错误信息。
  • func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)

    • 接口说明:获取提交的偏移量。
    • 参数说明
      • partitions:分区列表。
      • timeoutMs:超时时间。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)

    • 接口说明:提交偏移量。
    • 参数说明
      • offsets:偏移量列表。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)

    • 接口说明:获取当前偏移量。
    • 参数说明
      • partitions:分区列表。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) Close() error

    • 接口说明:关闭消费者。
    • 返回值:错误信息。

消费记录

Poll 返回 tmq.Event 事件时,可以通过判断 tmq.Event 的类型获取消费记录或错误信息。当类型为 *tmq.DataMessage 时,可以获取消费记录。

  • func (m *DataMessage) Topic() string

    • 接口说明:获取主题。
    • 返回值:主题。
  • func (m *DataMessage) DBName() string

    • 接口说明:获取数据库名称。
    • 返回值:数据库名称。
  • func (m *DataMessage) Offset() Offset

    • 接口说明:获取偏移量。
    • 返回值:偏移量。
  • func (m *DataMessage) Value() interface{}

    • 接口说明:获取值,具体值为 []*tmq.data
    • 返回值:消费到的值。

tmq.data 结构如下:

type Data struct {
TableName string
Data [][]driver.Value
}
  • TableName 为表名
  • Data 为数据,每个元素为一行数据,每行数据为一个数组,数组元素为列值。

当 Poll 返回类型为 tmq.Error 时,可以使用 func (e Error) Error() string 获取错误信息。

分区信息

当消费到数据类型为 *tmq.DataMessage 时,可以从 TopicPartition 属性中获取分区信息。

type TopicPartition struct {
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
}
  • Topic:主题。
  • Partition:分区。
  • Offset:偏移量。
  • Metadata:元数据(未使用)。
  • Error:错误信息。

可以使用 func (p TopicPartition) String() string 获取分区信息。

偏移量元数据

TopicPartition 中获取的偏移量信息,可以通过 Offset 属性获取偏移量元数据。当偏移量为 -2147467247 时表示未设置偏移量。

反序列化

当消费到数据类型为 *tmq.DataMessage 时,可以使用 func (m *DataMessage) Value() interface{} 获取数据,数据类型为 []*tmq.data

附录