TDengine Go Connector
driver-go
是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 database/sql 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。
Go 版本兼容性
支持 Go 1.14 及以上版本。
支持的平台
- 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
- WebSocket/REST 连接支持所有能运行 Go 的平台。
版本历史
driver-go 版本 | 主要变化 | TDengine 版本 |
---|---|---|
v3.5.8 | 修复空指针异常 | - |
v3.5.7 | taosWS 和 taosRestful 支持传入 request id | - |
v3.5.6 | 提升 websocket 查询和写入性能 | 3.3.2.0 及更高版本 |
v3.5.5 | restful 支持跳过 ssl 证书检查 | - |
v3.5.4 | 兼容 TDengine 3.3.0.0 tmq raw data | - |
v3.5.3 | 重构 taosWS | - |
v3.5.2 | websocket 压缩和优化消息订阅性能 | 3.2.3.0 及更高版本 |
v3.5.1 | 原生 stmt 查询和 geometry 类型支持 | 3.2.1.0 及更高版本 |
v3.5.0 | 获取消费进度及按照指定进度开始消费 | 3.0.5.0 及更高版本 |
v3.3.1 | 基于 websocket 的 schemaless 协议写入 | 3.0.4.1 及更高版本 |
v3.1.0 | 提供贴近 kafka 的订阅 api | - |
v3.0.4 | 新增 request id 相关接口 | 3.0.2.2 及更高版本 |
v3.0.3 | 基于 websocket 的 statement 写入 | - |
v3.0.2 | 基于 websocket 的数据查询和写入 | 3.0.1.5 及更高版本 |
v3.0.1 | 基于 websocket 的消息订阅 | - |
v3.0.0 | 适配 TDengine 3.0 查询和写入 | 3.0.0.0 及更高版本 |
异常和错误码
如果是 TDengine 错误可以通过以下方式获取错误码和错误信息。
// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}
TDengine 其他功能模块的报错,请参考 错误码
数据类型映射
TDengine DataType | Go Type |
---|---|
TIMESTAMP | time.Time |
TINYINT | int8 |
SMALLINT | int16 |
INT | int32 |
BIGINT | int64 |
TINYINT UNSIGNED | uint8 |
SMALLINT UNSIGNED | uint16 |
INT UNSIGNED | uint32 |
BIGINT UNSIGNED | uint64 |
FLOAT | float32 |
DOUBLE | float64 |
BOOL | bool |
BINARY | string |
NCHAR | string |
JSON | []byte |
GEOMETRY | []byte |
VARBINARY | []byte |
注意:JSON 类型仅在 tag 中支持。 GEOMETRY类型是 little endian 字节序的二进制数据,符合 WKB 规范。详细信息请参考 数据类型 WKB规范请参考Well-Known Binary (WKB)
示例程序汇总
示例程序源码请参考:示例程序
常见问题
-
database/sql 中 stmt(参数绑定)相关接口崩溃
REST 不支持参数绑定相关接口,建议使用
db.Exec
和db.Query
。 -
使用
use db
语句后执行其他语句报错[0x217] Database not specified or available
在 REST 接口中 SQL 语句的执行无上下文关联,使用
use db
语句不会生效,解决办法见上方使用限制章节。 -
使用 taosSql 不报错使用 taosRestful 报错
[0x217] Database not specified or available
因为 REST 接口无状态,使用
use db
语句不会生效,解决办法见上方使用限制章节。 -
readBufferSize
参数调大后无明显效果readBufferSize
调大后会减少获取结果时syscall
的调用。如果查询结果的数据量不大,修改该参数不会带来明显提升,如果该参数修改过大,瓶颈会在解析 JSON 数据。如果需要优化查询速度,需要根据实际情况调整该值来达到查询效果最优。 -
disableCompression
参数设置为false
时查询效率降低当
disableCompression
参数设置为false
时查询结果会使用gzip
压缩后传输,拿到数据后要先进行gzip
解压。 -
go get
命令无法获取包,或者获取包超时
设置 Go 代理 go env -w GOPROXY=https://goproxy.cn,direct
。
API 参考
database/sql 驱动
driver-go
实现了 Go 的 database/sql/driver
接口,可以直接使用 Go 的 database/sql
包。提供了三个驱动:github.com/taosdata/driver-go/v3/taosSql
、github.com/taosdata/driver-go/v3/taosRestful
和 github.com/taosdata/driver-go/v3/taosWS
分别对应 原生连接
、REST 连接
和 WebSocket 连接
。
DSN 规范
数据源名称具有通用格式,例如 PEAR DB,但没有类型前缀(方括号表示可选):
[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...¶mN=valueN]
完整形式的 DSN:
username:password@protocol(address)/dbname?param=value
原生连接
导入驱动:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
使用 taosSql
作为 driverName
并且使用一个正确的 DSN 作为 dataSourceName
如下:
var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)
支持的 DSN 参数:
cfg
指定 taos.cfg 目录cgoThread
指定 cgo 同时执行的数量,默认为系统核数cgoAsyncHandlerPoolSize
指定异步函数的 handle 大小,默认为 10000
Rest 连接
导入驱动:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
使用 taosRestful
作为 driverName
并且使用一个正确的 DSN 作为 dataSourceName
如下:
var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)
支持的 DSN 参数:
disableCompression
是否接受压缩数据,默认为 true 不接受压缩数据,如果传输数据使用 gzip 压缩设置为 false。readBufferSize
读取数据的缓存区大小默认为 4K(4096),当查询结果数据量多时可以适当调大该值。token
连接云服务时使用的 token。skipVerify
是否跳过证书验证,默认为 false 不跳过证书验证,如果连接的是不安全的服务设置为 true。
WebSocket 连接
导入驱动:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosWS"
)
使用 taosWS
作为 driverName
并且使用一个正确的 DSN 作为 dataSourceName
如下:
var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)
支持的 DSN 参数:
enableCompression
是否发送压缩数据,默认为 false 不发送压缩数据,如果传输数据使用压缩设置为 true。readTimeout
读取数据的超时时间,默认为 5m。writeTimeout
写入数据的超时时间,默认为 10s。
- 与原生连接方式不同,REST 接口是无状态的。在使用 REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。
- 如果在 DSN 中指定了 dbname,那么,REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。
连接功能
Go 驱动支持创建连接,返回支持 sql/driver
标准的 Connector
接口的对象,还提供了 af
包,扩充了一些无模式写入接口。
标准接口
database/sql
包中创建连接的接口
func Open(driverName, dataSourceName string) (*DB, error)
- 接口说明:(
database/sql
)连接数据库 - 参数说明:
driverName
:驱动名称。dataSourceName
:连接参数 DSN。
- 返回值:连接对象,错误信息。
- 接口说明:(
扩展接口
af
包中创建连接的接口
func Open(host, user, pass, db string, port int) (*Connector, error)
- 接口说明:连接数据库。
- 参数说明:
host
:主机地址。user
:用户名。pass
:密码。db
:数据库名称。port
:端口号。
- 返回值:连接对象,错误信息。
无模式写入
af
包中使用原生连接进行无模式写入的接口。
-
func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error
- 接口说明:无模式写入 influxDB 格式数 据。
- 参数说明:
lines
:写入的数据。precision
:时间精度。
- 返回值:错误信息。
-
func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error
- 接口说明:无模式写入 OpenTSDB JSON 格式数据。
- 参数说明:
payload
:写入的数据。
- 返回值:错误信息。
-
func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error
- 接口说明:无模式写入 OpenTSDB Telnet 格式数据。
- 参数说明:
lines
:写入的数据。
- 返回值:错误信息。
ws/schemaless
包中使用 WebSocket 无模式写入的接口
func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
- 接口说明:无模式写入数据。
- 参数说明:
lines
:写入的数据。protocol
:写入的数据协议支持的协议InfluxDBLineProtocol = 1
OpenTSDBTelnetLineProtocol = 2
OpenTSDBJsonFormatProtocol = 3
。precision
:时间精度。ttl
:数据过期时间,0 表示不过期。reqID
:请求 ID。
- 返回值:错误信息。
执行 SQL
Go 驱动提供了符合 database/sql
标准的接口,支持以下功能:
- 执行 SQL 语句:执行静态 SQL 语句,并返回其生成的结果对象。
- 查询执行:可以执行返回数据集的查询(
SELECT
语句)。 - 更新执行:可以执行影响行数的 SQL 语句,如
INSERT
、UPDATE
、DELETE
等。 - 获取结果:可以获取查询执行后返回的结果集,并遍历查询返回的数据。
- 获取更新计数:对于非查询 SQL 语句,可以获取执行后影响的行数。
- 关闭资源:释放数据库资源。
标准接口
-
func (db *DB) Close() error
- 接口说明:关闭连接。
- 返回值:错误信息。
-
func (db *DB) Exec(query string, args ...any) (Result, error)
- 接口说明:执行查询但不返回任何行。
- 参数说明:
query
:要执行的命令。args
:命令参数。
- 返回值:Result 对象(只有影响行数),错误信息。
-
func (db *DB) Query(query string, args ...any) (*Rows, error)
- 接口说明:执行查询并返回行的结果。
- 参数说明:
query
:要执行的命令。args
:命令参数。
- 返回值:Rows 对象,错误信息。
-
func (db *DB) QueryRow(query string, args ...any) *Row
- 接口说明:执行查询并返回一行结果。
- 参数说明:
query
:要执行的命令。args
:命令参数。
- 返回值:Row 对象。
扩展接口
-
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)
- 接口说明:执行查询但不返回任何行。
- 参数说明:
ctx
:上下文,使用 Value 传递请求 id 进行链路追踪,key 为taos_req_id
value 为 int64 类型值。query
:要执行的命令。args
:命令参数。
- 返回值:结果 Result 对象(只有影响行数),错误信息。
-
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)
- 接口说明:执行查询并返回行结果。
- 参数说明:
ctx
:上下文,使用 Value 传递请求 id 进行链路追踪,key 为taos_req_id
value 为 int64 类型值。query
:要执行的命令。args
:命令参数。
- 返回值:结果集 Rows 对象,错误信息。
-
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row
- 接口说明:执行查询并返回一行结果,错误信息会在扫描 Row 时延迟返回。
- 参数说明:
ctx
:上下文,使用 Value 传递请求 id 进行链路追踪,key 为taos_req_id
value 为 int64 类型值。query
:要执行的命令。args
:命令参数。
- 返回值:单行结果 Row 对象。
结果获取
Go 驱动支持获取查询结果集,以及对应的结果集元数据,提供了用于读取结果集中元数据和数据的方法。
结果集
通过 Rows
对象获取查询结果集,提供了以下方法:
-
func (rs *Rows) Next() bool
- 接口说明:准备下一行数据。
- 返回值:是否有下一行数据。
-
func (rs *Rows) Columns() ([]string, error)
- 接口说明:返回列名。
- 返回值:列名,错误信息。
-
func (rs *Rows) Scan(dest ...any) error
- 接口说明:将当前行的列值复制到 dest 指向的值中。
- 参数说明:
dest
:目标值。
- 返回值:错误信息。
-
func (rs *Rows) Close() error
- 接口说明:关闭行。
- 返回值:错误信息。
-
func (r *Row) Scan(dest ...any) error
- 接口说明:将当前行的列值复制到 dest 指向的值中。
- 参数说明:
dest
:目标值。
- 返回值:错误信息。
通过 Result
对象获取更新结果集,提供了以下方法:
func (dr driverResult) RowsAffected() (int64, error)
- 接口说明:返回受影响的行数。
- 返回值:受影响的行数,错误信息。
结果集元数据
通过 Rows
对象获取查询结果集元数据,提供了以下方法:
-
func (rs *Rows) ColumnTypes() ([]*ColumnType, error)
- 接口说明:返回列类型。
- 返回值:列类型,错误信息。
-
func (ci *ColumnType) Name() string
- 接口说明:返回列名。
- 返回值:列名。
-
func (ci *ColumnType) Length() (length int64, ok bool)
- 接口说明:返回列长度。
- 返回值:列长度,是否有长度。
-
func (ci *ColumnType) ScanType() reflect.Type
- 接口说明:返回列类型对应的 Go 类型。
- 返回值:列类型。
-
func (ci *ColumnType) DatabaseTypeName() string
- 接口说明:返回列类型数据库名称。
- 返回值:列类型名称。
参数绑定
Prepare 允许使用预编译的 SQL 语句,可以提高性能并提供参数化查询的能力,从而增加安全性。
标准接口
使用 sql/driver
的 Conn
接口中的 Prepare
方法准备一个与此连接绑定的准备好的语句,返回 Stmt
对象,使用。
-
Prepare(query string) (Stmt, error)
- 接口说明:准备返回一个与此连接绑定的准备好的语句(statement)。
- 参数说明:
query
:要进行参数绑定的语句。
- 返回值:Stmt 对象,错误信息。
-
func (s *Stmt) Exec(args ...any) (Result, error)
- 接口说明:使用给定的参数执行准备好的语句并返回总结该语句效果的结果(只可以绑定列值,不支持绑定表名和 tag)。
- 参数说明:
args
:命令参数,Go 原始类型会自动转换数据库类型,类型不匹配可能会丢精度,建议使用与数据库相同的类型,时间类型使用 int64 或RFC3339Nano
格式化后的字符串。
- 返回值:结果 Result 对象(只有影响行数),错误信息。
-
func (s *Stmt) Query(args ...any) (*Rows, error)
- 接口说明:使用给定的参数执行准备好的语句并返回行的结果。
- 参数说明:
args
:命令参数,Go 原始类型会自动转换数据库类型,类型不匹配可能会丢精度,建议使用与数据库相同的类型,时间类型使用 int64 或RFC3339Nano
格式化后的字符串。
- 返回值:结果集 Rows 对象,错误信息。
-
func (s *Stmt) Close() error
- 接口说明:关闭语句。
- 返回值:错误信息。
扩展接口
af
包中提供了使用原生连接进行参数绑定的更多接口
-
func (conn *Connector) Stmt() *Stmt
- 接口说明:返回一个与此连接绑定的 Stmt 对象。
- 返回值:Stmt 对象。
-
func (s *Stmt) Prepare(sql string) error
- 接口说明:准备一个 sql 。
- 参数说明:
sql
:要进行参数绑定的语句。
- 返回值:错误信息。
-
func (s *Stmt) NumParams() (int, error)
- 接口说明:返回参数数量。
- 返回值:参数数量,错误信息。
-
func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param)
- 接口说明:设置表名和 tag。
- 参数说明:
tableName
:表名。tags
:tag。
- 返回值:错误信息。
-
func (s *Stmt) SetTableName(tableName string) error
- 接口说明:设置表名。
- 参数说明:
tableName
:表名。
- 返回值:错误信息。
-
func (s *Stmt) BindRow(row *param.Param) error
- 接口说明:绑定行。
- 参数说明:
row
:行数据。
- 返回值:错误信息。
-
func (s *Stmt) GetAffectedRows() int
- 接口说明:获取受影响的行数。
- 返回值:受影响的行数。
-
func (s *Stmt) AddBatch() error
- 接口说明:添加批处理。
- 返回值:错误信息。
-
func (s *Stmt) Execute() error
- 接口说明:执行批处理。
- 返回值:错误信息。
-
func (s *Stmt) UseResult() (driver.Rows, error)
- 接口说明:使用结果。
- 返回值:结果集 Rows 对象,错误信息。
-
func (s *Stmt) Close() error
- 接口说明:关闭语句。
- 返回值:错误信息。
ws/stmt
包提供了通过 WebSocket 进行参数绑定的接口
-
func (c *Connector) Init() (*Stmt, error)
- 接口说明:初始化。
- 返回值:Stmt 对象,错误信息。
-
func (s *Stmt) Prepare(sql string) error
- 接口说明:准备一个 sql 。
- 参数说明:
sql
:要进行参数绑定的语句。
- 返回值:错误信息。
-
func (s *Stmt) SetTableName(name string) error
- 接口说明:设置表名。
- 参数说明:
name
:表名。
- 返回值:错误信息。
-
func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)
- 接口说明:设置 tag。
- 参数说明:
tags
:tag。bindType
:类型信息。
- 返回值:错误信息。
-
func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error
- 接口说明:绑定参数。
- 参数说明:
params
:参数。bindType
:类型信息。
- 返回值:错误信息。
-
func (s *Stmt) AddBatch() error
- 接口说明:添加批处理。
- 返回值:错误信息。
-
func (s *Stmt) Exec() error
- 接口说明:执行批处理。
- 返回值:错误信息。
-
func (s *Stmt) GetAffectedRows() int
- 接口说明:获取受影响的行数。
- 返回值:受影响的行数。
-
func (s *Stmt) UseResult() (*Rows, error)
- 接口说明:使用结果。
- 返回值:Rows 对象,错误信息。
-
func (s *Stmt) Close() error
- 接口说明:关闭语句。
- 返回值:错误信息。
Rows 行结果参考 sql/driver
包中的 Rows
接口,提供以下接口
-
func (rs *Rows) Columns() []string
- 接口说明:返回列名。
- 返回值:列名。
-
func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string
- 接口说明:返回列类型数据库名称。
- 参数说明:
i
:列索引。
- 返回值:列类型名称。
-
func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)
- 接口说明:返回列长度。
- 参数说明:
i
:列索引。
- 返回值:列长度,是否有长度。
-
func (rs *Rows) ColumnTypeScanType(i int) reflect.Type
- 接口说明:返回列类型对应的 Go 类型。
- 参数说明:
i
:列索引。
- 返回值:列类型。
-
func (rs *Rows) Next(dest []driver.Value) error
- 接口说明:准备下一行数据,并赋值给目标。
- 参数说明:
dest
:目标值。
- 返回值:错误信息。
-
func (rs *Rows) Close() error
- 接口说明:关闭行。
- 返回值:错误信息。
common/param
包中提供了参数绑定数据结构
以下是按照偏移设置参数的接口:
-
func NewParam(size int) *Param
- 接口说明:创建一个参数绑定数据结构。
- 参数说明:
size
:参数数量。
- 返回值:Param 对象。
-
func (p *Param) SetBool(offset int, value bool)
- 接口说明:设置布尔值。
- 参数说明:
offset
:偏移量(列或标签)。value
:布尔值。
-
func (p *Param) SetNull(offset int)
- 接口说明:设置空值。
- 参数说明:
offset
:偏移量(列或标签)。
-
func (p *Param) SetTinyint(offset int, value int)
- 接口说明:设置 Tinyint 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Tinyint 值。
-
func (p *Param) SetSmallint(offset int, value int)
- 接口说明:设置 Smallint 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Smallint 值。
-
func (p *Param) SetInt(offset int, value int)
- 接口说明:设置 Int 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Int 值。
-
func (p *Param) SetBigint(offset int, value int)
- 接口说明:设置 Bigint 值。
- 参数说明:
offset
:偏移量(列 或标签)。value
:Bigint 值。
-
func (p *Param) SetUTinyint(offset int, value uint)
- 接口说明:设置 UTinyint 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:UTinyint 值。
-
func (p *Param) SetUSmallint(offset int, value uint)
- 接口说明:设置 USmallint 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:USmallint 值。
-
func (p *Param) SetUInt(offset int, value uint)
- 接口说明:设置 UInt 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:UInt 值。
-
func (p *Param) SetUBigint(offset int, value uint)
- 接口说明:设置 UBigint 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:UBigint 值。
-
func (p *Param) SetFloat(offset int, value float32)
- 接口说明:设置 Float 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Float 值。
-
func (p *Param) SetDouble(offset int, value float64)
- 接口说明:设置 Double 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Double 值。
-
func (p *Param) SetBinary(offset int, value []byte)
- 接口说明:设置 Binary 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Binary 值。
-
func (p *Param) SetVarBinary(offset int, value []byte)
- 接口说明:设置 VarBinary 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:VarBinary 值。
-
func (p *Param) SetNchar(offset int, value string)
- 接口说明:设置 Nchar 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Nchar 值。
-
func (p *Param) SetTimestamp(offset int, value time.Time, precision int)
- 接口说明:设置 Timestamp 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Timestamp 值。precision
:时间精度。
-
func (p *Param) SetJson(offset int, value []byte)
- 接口说明:设置 Json 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Json 值。
-
func (p *Param) SetGeometry(offset int, value []byte)
- 接口说明:设置 Geometry 值。
- 参数说明:
offset
:偏移量(列或标签)。value
:Geometry 值。
以下是链式调用设置参数的接口:
func (p *Param) AddBool(value bool) *Param
- 接口说明:添加布尔值。
- 参数说明:
value
:布尔值。
- 返回值:Param 对象。
其他类型与布尔值类似,具体接口如下:
- AddNull
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry
以下是设置列类型信息的接口:
-
func NewColumnType(size int) *ColumnType
- 接口说明:创建一个列类型信息数据结构。
- 参数说明:
size
:列数量。
- 返回值:ColumnType 对象。
-
func (c *ColumnType) AddBool() *ColumnType
- 接口说明:添加布尔类型。
- 返回值:ColumnType 对象。
其他类型与布尔类型类似,具体接口如下:
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry
数据订阅
Go 驱动支持数据订阅功能,提供了基于原生连接和 WebSocket 连接的数据订阅接口。原生实现在 af/tmq
包中,WebSocket 实现在 ws/tmq
包中。
消费者
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
- 接口说明:创建一个消费者。
- 参数说明:
conf
:配置信息。
- 返回值:Consumer 对象,错误信息。
配置信息定义为:
type ConfigValue interface{}
type ConfigMap map[string]ConfigValue
创建消费者支持属性列表:
ws.url
:WebSocket 连接地址。ws.message.channelLen
:WebSocket 消息通道缓存长度,默认 0。ws.message.timeout
:WebSocket 消息超时时间,默认 5m。ws.message.writeWait
:WebSocket 写入消息超时时间,默认 10s。ws.message.enableCompression
:WebSocket 是否启用压缩,默认 false。ws.autoReconnect
:WebSocket 是否自动重连,默认 false。ws.reconnectIntervalMs
:WebSocket 重连间隔时间毫秒,默认 2000。ws.reconnectRetryCount
:WebSocket 重连重试次数,默认 3。
其他参数请参考:Consumer 参数列表, 注意TDengine服务端自 3.2.0.0 版本开始消息订阅中的 auto.offset.reset 默认值发生变化。
-
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
- 接口说明:订阅主题。
- 参数说明:
topic
:主题。rebalanceCb
:平衡回调(未使用)。
- 返回值:错误信息。
-
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
- 接口说明:订阅主题列表。
- 参数说明:
topics
:主题列表。rebalanceCb
:平衡回调(未使用)。
- 返回值:错误信息。
-
func (c *Consumer) Unsubscribe() error
- 接口说明:取消订阅。
- 返回值:错误信息。
-
func (c *Consumer) Poll(timeoutMs int) tmq.Event
- 接口说明:轮询事件。
- 参数说明:
timeoutMs
:超时时间。
- 返回值:事件。
-
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
- 接口说明:提交偏移量。
- 返回值:TopicPartition 列表,错误信息。
-
func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)
- 接口说明:获取分配信息。
- 返回值:TopicPartition 列表,错误信息。
-
func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error
- 接口说明:跳转到偏移量。
- 参数说明:
partition
:分区和偏移信息。ignoredTimeoutMs
:超时时间(未使用)。
- 返回值:错误信息。
-
func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)
- 接口说明:获取提交的偏移量。
- 参数说明:
partitions
:分区列表。timeoutMs
:超时时间。
- 返回值:TopicPartition 列表,错误信息。
-
func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)
- 接口说明:提交偏移量。
- 参数说明:
offsets
:偏移量列表。
- 返回值:TopicPartition 列表,错误信息。
-
func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)
- 接口说明:获取当前偏移量。
- 参数说明:
partitions
:分区列表。
- 返回值:TopicPartition 列表,错误信息。
-
func (c *Consumer) Close() error
- 接口说明:关闭消费者。
- 返回值:错误信息。
消费记录
当 Poll
返回 tmq.Event
事件时,可以通过判断 tmq.Event
的类型获取消费记录或错误信息。当类型为 *tmq.DataMessage
时,可以获取消费记录。
-
func (m *DataMessage) Topic() string
- 接口说明:获取主题。
- 返回值:主题。
-
func (m *DataMessage) DBName() string
- 接口说明:获取数据库名称。
- 返回值:数据库名称。
-
func (m *DataMessage) Offset() Offset
- 接口说明:获取偏移量。
- 返回值:偏移量。
-
func (m *DataMessage) Value() interface{}
- 接口说明:获取值,具体值为
[]*tmq.data
。 - 返回值:消费到的值。
- 接口说明:获取值,具体值为
tmq.data 结构如下:
type Data struct {
TableName string
Data [][]driver.Value
}
- TableName 为表名
- Data 为数据,每个元素为一行数据,每行数据为一个数组,数组元素为列值。
当 Poll 返回类型为 tmq.Error
时,可以使用 func (e Error) Error() string
获取错误信息。
分区信息
当消费到数据类型为 *tmq.DataMessage
时,可以从 TopicPartition
属性中获取分区信息。
type TopicPartition struct {
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
}
Topic
:主题。Partition
:分区。Offset
:偏移量。Metadata
:元数据(未使用)。Error
:错误信息。
可以使用 func (p TopicPartition) String() string
获取分区信息。
偏移量元数据
从 TopicPartition
中获取的偏移量信息,可以通过 Offset
属性获取偏移量元数据。当偏移量为 -2147467247
时表示未设置偏移量。
反序列化
当消费到数据类型为 *tmq.DataMessage
时,可以使用 func (m *DataMessage) Value() interface{}
获取数据,数据类型为 []*tmq.data
。