TDengine TSDB C# Connector
TDengine.Connector 是 TDengine TSDB 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine TSDB 集群数据的 C# 应用软件。
.NET 版本兼容性
- .NET Framework 4.6 及以上版本。
- .NET 5.0 及以上版本。
支持的平台
- 原生连接支持的平台和 TDengine TSDB 客户端驱动支持的平台一致。
- WebSocket 连接支持所有能运行 .NET 运行时的平台。
版本历史
| Connector 版本 | 主要变化 | TDengine TSDB 版本 |
|---|---|---|
| 3.1.9 | stmt 对外接口不变,内部重构为 stmt2 实现。 | - |
| 3.1.8 | 支持连接级别时区设置,严格校验 stmt 绑定类型与数据库类型。 | - |
| 3.1.7 | 支持 IPv6 连接,支持 DECIMAL 类型。 | 3.3.6.0 及更高版本 |
| 3.1.6 | 优化 WebSocket 连接接收消息处理。 | - |
| 3.1.5 | 修复 WebSocket 协议编码中文时长度错误。 | - |
| 3.1.4 | 提升 WebSocket 查询和写入性能。 | 3.3.2.0 及更高版本 |
| 3.1.3 | 支持 WebSocket 自动重连。 | - |
| 3.1.2 | 修复 schemaless 资源释放。 | - |
| 3.1.1 | 支持 varbinary 和 geometry 类型。 | - |
| 3.1.0 | WebSocket 使用原生实现。 | 3.2.1.0 及更高版本 |
处理异常
TDengine.Connector 会抛出异常,应用程序需要处理异常。taosc 异常类型 TDengineError,包含错误码和错误信息,应用程序可以根据错误码和错误信息进行处理。
错误码信息请参考 错误码
数据类型映射
| TDengine TSDB DataType | C# Type |
|---|---|
| TIMESTAMP | DateTime |
| TINYINT | sbyte |
| SMALLINT | short |
| INT | int |
| BIGINT | long |
| TINYINT UNSIGNED | byte |
| SMALLINT UNSIGNED | ushort |
| INT UNSIGNED | uint |
| BIGINT UNSIGNED | ulong |
| FLOAT | float |
| DOUBLE | double |
| BOOL | bool |
| BINARY | byte[] |
| NCHAR | string |
| JSON | byte[] |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
| DECIMAL | decimal |
注意:
- JSON 类型仅在 tag 中支持。
- GEOMETRY 类型是 little endian 字节序的二进制数据,符合 WKB 规范。详细信息请参考 数据类型。 WKB 规范请参考 Well-Known Binary (WKB)。
- DECIMAL 类型在 C# 中使用
decimal类型表示,支持高精度的十进制数值。因为 C# 的decimal类型与 TDengine TSDB 的 DECIMAL 类型在精度和范围上有所不同, C#decimal精度最大 29 位,TDengine TSDB DECIMAL 类型精度最大 38 位,所以在使用时需要注意:- 未超出 C# decimal 类型范围时可以使用
GetDecimal或GetValue获取。 - 当超出 C# decimal 类型范围时
GetDecimal和GetValue方法会抛出OverflowException,此时可以使用GetString方法获取字符串表示形式。
- 未超出 C# decimal 类型范围时可以使用
示例程序汇总
示例程序源码请参考:示例程序
API 参考
ADO.NET 驱动
TDengine.Data.Client 接口实现了 ADO.NET 驱动,支持连接 TDengine TSDB 数据库,进行数据操作。
参数规范
ConnectionStringBuilder 使用 key-value 对方式设置连接参数,key 为参数名,value 为参数值,不同参数之间使用分号 ; 分割。
例如:
"protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false"
原生连接
例如:"host=127.0.0.1;port=6030;username=root;password=taosdata;protocol=Native;db=test"
支持的参数如下:
host:TDengine TSDB 运行实例的地址。port:TDengine TSDB 运行实例的端口。username:连接的用户名。password:连接的密码。protocol:连接的协议,可选值为 Native 或 WebSocket,默认为 Native。db:连接的数据库。timezone:查询结果集解析时间类型使用的时区,默认为本地时区,详情见时区说明。connectionTimezone:连接级别的时区设置(3.1.8 及以上版本支持),仅支持 .NET 6 及以上版本,只支持 IANA 时区格式,与timezone不可同时设置,详情见时区说明。
WebSocket 连接
例如:"protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false;enableCompression=true;autoReconnect=true;reconnectIntervalMs=10;reconnectRetryCount=5"
支持的参数如下:
host:TDengine TSDB 运行实例的地址。port:TDengine TSDB 运行实例的端口。username:连接的用户名。password:连接的密码。protocol:连接的协议,可选值为 Native 或 WebSocket,默认为 Native。db:连接的数据库。timezone:查询结果集解析时间类型使用的时区,默认为本地时区,详情见时区说明。connectionTimezone:连接级别的时区设置(3.1.8 及以上版本支持),仅支持 .NET 6 及以上版本,只支持 IANA 时区格式,与timezone不可同时设置,详情见时区说明。connTimeout:连接超时时间,默认为 1 分钟。readTimeout:读取超时时间,默认为 5 分钟。writeTimeout:发送超时时间,默认为 10 秒。token:连接 TDengine TSDB cloud 的 token。useSSL:是否使用 SSL 连接,默认为 false。enableCompression:是否启用 WebSocket 压缩,默认为 false。autoReconnect:是否自动重连,默认为 false。reconnectRetryCount:重连次数,默认为 3。reconnectIntervalMs:重连间隔毫秒时间,默认为 2000。
时区说明
连接级别时区设置
从 3.1.8 版本开始,C# 驱动支持连接级别的时区设置。可以通过设置 connectionTimezone 参数来指定连接级别的时区。
特性说明
- 仅支持 .NET 6.0 及以上版本
- 仅支持 IANA 时区格式,例如:
America/New_York - 不支持 Windows 平台上使用原生连接
- 当 TDengine server 或 taosAdapter 部署在 Windows 上时不可使用
- 与
timezone参数不可同时设置
功能影响
此参数会影响所有通过该连接执行的查询和写入操作的时间类型解析,以及查询结果集的时间类型解析。
示例
TDengine 所在服务器为 UTC 时区,设置 connectionTimezone=America/New_York:
执行写入 SQL insert into db.tb values('2025-08-08 14:00:00',1)
- 写入的时间为纽约时区的
2025-08-08 14:00:00 - 通过 taos shell 查询会显示为
2025-08-08 18:00:00.000(UTC 时间)
设置 connectionTimezone=America/New_York 的连接执行查询 SQL select * from db.tb,则查询结果集中的时间类型会被解析为纽约时区的时间。
| 方法 | 返回值类型 | 示例输出 | 备注 |
|---|---|---|---|
GetValue | DateTime | 2025-08-08 14:00:00.000 | 表示纽约时区时间 |
GetDateTime | DateTime | 2025-08-08 14:00:00.000 | 与 GetValue 行为相同 |
GetDateTimeOffset | DateTimeOffset | 2025-08-08 14:00:00.000(yyyy-MM-dd HH:mm:ss.fff)或 2025-08-08 14:00:00.000-04:00(yyyy-MM-dd HH:mm:ss.fffK) | 3.1.8 新增方法 |
GetInt64 | long | 1754676000000(毫秒级) | 时间精度与数据库定义相同(3.1.8 支持通过 GetInt64 返回时间戳) |
查询结果集解析时区设置
timezone 参数设置解析查询结果集的时间类型使用的时区,内部使用 TimeZoneInfo.FindSystemTimeZoneById 方法获取时区信息:
- 在 Windows 系统上,
FindSystemTimeZoneById尝试匹配注册表HKEY_LOCAL_MACHINE\Software\Microsoft\Windows NT\CurrentVersion\Time Zones分支的子项名称。 - 在 Linux 和 macOS 上,使用 ICU 库中提供的时区信息。
以纽约时区为例:Windows 上使用 Eastern Standard Time,Linux 和 macOS 上使用 America/New_York。
.Net 6.0 及以上版本,可以统一使用 IANA 时区格式,例如:America/New_York。
参数绑定注意事项
- 由于 DateTime 不包含偏移信息此时获取到的 DateTime 的 Kind 属性为
Unspecified使用ToUniversalTime()方法会当做本地时区来获取 utc 时间,此时获取到的 utc 时间是错误的,因此禁止所有 Kind 属性为Unspecified的 DateTime 用于参数绑定。 - 3.1.8 版本开始绑定支持使用 DateTimeOffset 类型来写入 TDengine
TIMESTAMP类型,使用UtcTicks获取 tick 后转化为时间戳进行写入。 - 3.1.8 版本开始绑定支持使用 long 类型来写入 TDengine
TIMESTAMP类型,时间精度需要与数据库相同。
接口说明
ConnectionStringBuilder 类提供了连接配置字符串的解析功能。
public ConnectionStringBuilder(string connectionString)- 接口说明:ConnectionStringBuilder 构造函数。
- 参数说明:
connectionString:连接配置字符串。
连接功能
C# 驱动支持创建 ADO.NET 连接,返回支持 ADO.NET 标准的 DbConnection 接口的对象,还提供了 ITDengineClient 接口,扩充了一些无模式写入接口。
标准接口
ADO.NET 连接支持的标准接口如下:
-
public TDengineConnection(string connectionString)- 接口说明:TDengineConnection 构造函数。
- 参数说明:
connectionString:连接配置字符串。
- 异常:格式错误抛出
ArgumentException异常。
-
public void ChangeDatabase(string databaseName)- 接口说明:切换数据库。
- 参数说明:
databaseName:数据库名。
- 异常:执行失败抛出
TDengineError异常。
-
public void Close()- 接口说明:关闭连接。
-
public void Open()- 接口说明:打开连接。
- 异常:打开失败抛出
TDengineError异常,WebSocket 连接可能存在网络异常须注意处理。
-
public string ServerVersion- 接口说明:返回服务器版本。
- 返回值:服务器版本字符串。
- 异常:执行失败抛出
TDengineError异常。
-
public string DataSource- 接口说明:返回数据源。
- 返回值:创建连接 host 配置。
-
public string Database- 接口说明:返回连接数据库。
- 返回值:创建连接 db 配置。
-
public TDengineCommand(TDengineConnection connection)- 接口说明:TDengineCommand 构造函数。
- 参数说明:
connection:TDengineConnection 对象。
- 异常:执行失败抛出
TDengineError异常。
-
public void Prepare()- 接口说明:检查连接和命令文本,并准备命令执行。
- 异常:未执行 open 或未设置 CommandText 抛出
InvalidOperationException异常。
-
public string CommandText- 接口说明:获取或设置命令文本。
- 返回值:命令文本。
-
public new virtual TDengineParameterCollection Parameters- 接口说明:获取参数集合。
- 返回值:TDengineParameterCollection 对象。
无模式写入
-
public static ITDengineClient Open(ConnectionStringBuilder builder)- 接口说明:打开连接。
- 参数说明:
builder:连接配置。
- 返回值:ITDengineClient 接口。
- 异常:打开失败抛出
TDengineError异常,WebSocket 连接可能存在网络异常须注意处理。
-
void SchemalessInsert(string[] lines, TDengineSchemalessProtocol protocol,TDengineSchemalessPrecision precision, int ttl, long reqId)- 接口说明:无模式写入。
- 参数说明:
lines:数据行数组。protocol:数据协议,支持协议:TSDB_SML_LINE_PROTOCOL = 1TSDB_SML_TELNET_PROTOCOL = 2TSDB_SML_JSON_PROTOCOL = 3。precision:时间精度,支持配置:TSDB_SML_TIMESTAMP_NOT_CONFIGURED = 0TSDB_SML_TIMESTAMP_HOURS = 1TSDB_SML_TIMESTAMP_MINUTES = 2TSDB_SML_TIMESTAMP_SECONDS = 3TSDB_SML_TIMESTAMP_MILLI_SECONDS = 4TSDB_SML_TIMESTAMP_MICRO_SECONDS = 5TSDB_SML_TIMESTAMP_NANO_SECONDS = 6。ttl:数据过期时间,0 表示不配置。reqId:请求 ID。
- 异常:执行失败抛出
TDengineError异常。
执行 SQL
C# 驱动提供了符合 ADO.NET 标准的 DbCommand 接口,支持以下功能:
- 执行 SQL 语句:执行静态 SQL 语句,并返回其生成的结果对象。
- 查询执行:可以执行返回数据集的查询(
SELECT语句)。 - 更新执行:可以执行影响行数的 SQL 语句,如
INSERT、UPDATE、DELETE等。 - 获取结果:可以获取查询执行后返回的结果集(
ResultSet对象),并遍历查询返回的数据。 - 获取更新计数:对于非查询 SQL 语句,可以获取执行后影响的行数。
- 关闭资源:提供了关闭的方法,以释放数据库资源。
另外 C# 驱动还提供了用于请求链路跟踪的扩展接口。
标准接口
-
public int ExecuteNonQuery()- 接口说明:执行 SQL 语句,返回受影响的行数。
- 返回值:受影响的行数。
- 异常:执行失败抛出
TDengineError异常。
-
public object ExecuteScalar()- 接口说明:执行查询,并返回查询结果的第一行第一列。
- 返回值:查询结果的第一行第一列。
- 异常:执行失败抛出
TDengineError异常。
-
public DbDataReader ExecuteReader()- 接口说明:执行查询,并返回查询结果的数据读取器。
- 返回值:查询结果的数据读取器。
- 异常:执行失败抛出
TDengineError异常。
-
public void Dispose();- 接口说明:释放资源。
扩展接口
扩展接口主要用于请求链路跟踪。
-
IRows Query(string query, long reqId)- 接口说明:执行查询,返回查询结果。
- 参数说明:
query:查询语句。reqId:请求 ID。
- 返回值:查询结果。
- 异常:执行失败抛出
TDengineError异常。
-
long Exec(string query, long reqId)- 接口说明:执行 SQL 语句。
- 参数说明:
query:SQL 语句。reqId:请求 ID。
- 返回值:受影响的行数。
- 异常:执行失败抛出
TDengineError异常。
结果获取
C# 驱动提供了符合 ADO.NET 标准的 DbDataReader 接口,提供了用于读取结果集中元数据和数据的方法。
结果集
DbDataReader 接口提供了以下方法获取结果集:
-
public bool GetBoolean(int ordinal)- 接口说明:获取指定列的布尔值。
- 参数说明:
ordinal:列索引。
- 返回值:布尔值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public byte GetByte(int ordinal)- 接口说明:获取指定列的字节值。
- 参数说明:
ordinal:列索引。
- 返回值:字节值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)- 接口说明:获取指定列的字节值。
- 参数说明:
ordinal:列索引。dataOffset:数据偏移量。buffer:缓冲区。bufferOffset:缓冲区偏移量。length:长度。
- 返回值:字节值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public char GetChar(int ordinal)- 接口说明:获取指定列的字符值。
- 参数说明:
ordinal:列索引。
- 返回值:字符值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length)- 接口说明:获取指定列的字符值。
- 参数说明:
ordinal:列索引。dataOffset:数据偏移量。buffer:缓冲区。bufferOffset:缓冲区偏移量。length:长度。
- 返回值:字符值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public DateTime GetDateTime(int ordinal)- 接口说明:获取指定列的日期时间值。
- 参数说明:
ordinal:列索引。
- 返回值:日期时间值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public decimal GetDecimal(int ordinal)- 接口说明:获取指定列的十进制数值。
- 参数说明:
ordinal:列索引。- 返回值:十进制数值。
- 异常:
- 类型不对应抛出
InvalidCastException异常。 - 超出范围抛出
OverflowException异常。
- 类型不对应抛出
-
public double GetDouble(int ordinal)- 接口说明:获取指定列的双精度浮点数值。
- 参数说明:
ordinal:列索引。
- 返回值:双精度浮点数值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public float GetFloat(int ordinal)- 接口说明:获取指定列的单精度浮点数值。
- 参数说明:
ordinal:列索引。
- 返回值:单精度浮点数值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public short GetInt16(int ordinal)- 接口说明:获取指定列的 16 位整数值。
- 参数说明:
ordinal:列索引。
- 返回值:16 位整数值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public int GetInt32(int ordinal)- 接口说明:获取指定列的 32 位整数值。
- 参数说明:
ordinal:列索引。
- 返回值:32 位整数值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public long GetInt64(int ordinal)- 接口说明:获取指定列的 64 位整数值。
- 参数说明:
ordinal:列索引。
- 返回值:64 位整数值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public string GetString(int ordinal)- 接口说明:获取指定列的字符串值。
- 参数说明:
ordinal:列索引。
- 返回值:字符串值。
- 异常:类型不对应抛出
InvalidCastException异常。
-
public object GetValue(int ordinal)- 接口说明:获取指定列的值。
- 参数说明:
ordinal:列索引。
- 返回值:结果对象。
-
public int GetValues(object[] values)- 接口说明:获取所有列的值。
- 参数说明:
values:值数组。
- 返回值:值数量。
-
public bool IsDBNull(int ordinal)- 接口说明:判断指定列是否为 NULL。
- 参数说明:
ordinal:列索引。
- 返回值:是否为 NULL。
-
public int RecordsAffected- 接口说明:获取受影响的行数。
- 返回值:受影响的行数。
-
public bool HasRows- 接口说明:结果是否有行数据。
- 返回值:结果是否有行数据。
-
public bool Read()- 接口说明:读取下一行。
- 返回值:是否读取成功。
-
public IEnumerator GetEnumerator()- 接口说明:获取枚举器。
- 返回值:枚举器。
-
public void Close()- 接口说明:关闭结果集。
结果集元数据
DbDataReader 接口提供了以下方法获取结果集元数据:
-
public DataTable GetSchemaTable()- 接口说明:获取结果集元数据。
- 返回值:结果集元数据。
-
public string GetDataTypeName(int ordinal)- 接口说明:获取指定列的数据类型名称。
- 参数说明:
ordinal:列索引。
- 返回值:数据类型名称。
-
public Type GetFieldType(int ordinal)- 接口说明:获取指定列的数据类型。
- 参数说明:
ordinal:列索引。
- 返回值:数据类型。
-
public string GetName(int ordinal)- 接口说明:获取指定列的名称。
- 参数说明:
ordinal:列索引。
- 返回值:列名称。
-
public int GetFieldSize(int ordinal)- 接口说明:获取指定列的大小。
- 参数说明:
ordinal:列索引。
- 返回值:列大小。
-
public int GetOrdinal(string name)- 接口说明:获取指定列的索引。
- 参数说明:
name:列名称。
- 返回值:列索引。
-
public int FieldCount- 接口说明:获取列数。
- 返回值:列数。
参数绑定
TDengineCommand 类支持参数绑定。
标准接口
TDengineCommand 类继承了 DbCommand 接口,支持以下功能:
-
public string CommandText- 接口说明:获取或设置命令文本,支持参数绑定。
- 返回值:命令文本。
-
public new virtual TDengineParameterCollection Parameters- 接口说明:获取参数集合。
- 返回值:
TDengineParameterCollection对象。
参数元数据
TDengineParameterCollection 继承了 DbParameterCollection 接口,支持以下功能:
-
public int Add(object value)- 接口说明:添加参数。
- 参数说明:
value:参数值。
- 返回值:参数索引。
-
public void Clear()- 接口说明:清空参数。
-
public bool Contains(object value)- 接口说明:是否包含参数。
- 参数说明:
value:参数值。
- 返回值:是否包含参数。
-
public int IndexOf(object value)- 接口说明:获取参数索引。
- 参数说明:
value:参数值。
- 返回值:参数索引。
-
public void Insert(int index, object value)- 接口说明:插入参数。
- 参数说明:
index:索引。value:参数值。
-
public void Remove(object value)- 接口说明:移除参数。
- 参数说明:
value:参数值。
-
public void RemoveAt(int index)- 接口说明:移除参数。
- 参数说明:
index:索引。
-
public void RemoveAt(string parameterName)- 接口说明:移除参数。
- 参数说明:
parameterName:参数名。
-
public int Count- 接口说明:获取参数数量。
- 返回值:参数数量。
-
public int IndexOf(string parameterName)- 接口说明:获取参数索引。
- 参数说明:
parameterName:参数名。
- 返回值:参数索引。
-
public bool Contains(string value)- 接口说明:是否包含参数。
- 参数说明:
value:参数名。
- 返回值:是否包含参数。
-
public void CopyTo(Array array, int index)- 接口说明:复制参数。
- 参数说明:
array:目标数组。index:索引。
-
public IEnumerator GetEnumerator()- 接口说明:获取枚举器。
- 返回值:枚举器。
-
public void AddRange(Array values)- 接口说明:添加参数。
- 参数说明:
values:参数数组。
TDengineParameter 继承了 DbParameter 接口,支持以下功能:
-
public TDengineParameter(string name, object value)- 接口说明:TDengineParameter 构造函数。
- 参数说明:
name:参数名,需要以 @ 开头,如 @0、@1、@2 等。value:参数值,需要 C# 列类型与 TDengine TSDB 列类型一一对应。
-
public string ParameterName- 接口说明:获取或设置参数名。
- 返回值:参数名。
-
public object Value- 接口说明:获取或设置参数值。
- 返回值:参数值。
扩展接口
ITDengineClient 接口提供了扩展的参数绑定接口。
IStmt StmtInit(long reqId)- 接口说明:初始化 statement 对象。
- 参数说明:
reqId:请求 ID。
- 返回值:实现 IStmt 接口的对象。
- 异常:执行失败抛出
TDengineError异常。
IStmt 接口提供了扩展的参数绑定接口。
-
void Prepare(string query)- 接口说明:准备 statement。
- 参数说明:
query:查询语句。
- 异常:执行失败抛出
TDengineError异常。
-
bool IsInsert()- 接口说明:判断是否为插入语句。
- 返回值:是否为插入语句。
- 异常:执行失败抛出
TDengineError异常。
-
void SetTableName(string tableName)- 接口说明:设置表名。
- 参数说明:
tableName:表名。
- 异常:执行失败抛出
TDengineError异常。
-
void SetTags(object[] tags)- 接口说明:设置标签。
- 参数说明:
tags:标签数组。
- 异常:执行失败抛出
TDengineError异常。
-
TaosFieldE[] GetTagFields()- 接口说明:获取标签属性。
- 返回值:标签属性数组。
- 异常:执行失败抛出
TDengineError异常。
-
TaosFieldE[] GetColFields()- 接口说明:获取列属性。
- 返回值:列属性数组。
- 异常:执行失败抛出
TDengineError异常。
-
void BindRow(object[] row)- 接口说明:绑定行。
- 参数说明:
row:行数据数组。
- 异常:执行失败抛出
TDengineError异常。
-
void BindColumn( TaosFieldE[] fields,params Array[] arrays)- 接口说明:绑定全部列。
- 参数说明:
fields:字段属性数组。arrays:多列数据数组。
- 异常:执行失败抛出
TDengineError异常。
-
void AddBatch()- 接口说明:添加批处理。
- 异常:执行失败抛出
TDengineError异常。
-
void Exec()- 接口说明:执行参数绑定。
- 异常:执行失败抛出
TDengineError异常。
-
long Affected()- 接口说明:获取受影响的行数。
- 返回值:受影响的行数。
- 异常:执行失败抛出
TDengineError异常。
-
IRows Result()- 接口说明:获取结果。
- 返回值:结果对象。
- 异常:执行失败抛出
TDengineError异常。
数据订阅
ConsumerBuilder 类提供了消费者构建相关接口,ConsumeResult 类提供了消费结果相关接口,TopicPartitionOffset 类提供了分区偏移量相关接口。ReferenceDeserializer 和 DictionaryDeserializer 提供了反序列化的支持。
消费者
public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)- 接口说明:ConsumerBuilder 构造函数。
- 参数说明:
config:消费配置。
创建消费者支持属性列表:
useSSL:是否使用 SSL 连接,默认为 false。token:连接 TDengine TSDB cloud 的 token。ws.message.enableCompression:是否启用 WebSocket 压缩,默认为 false。ws.autoReconnect:是否自动重连,默认为 false。ws.reconnect.retry.count:重连次数,默认为 3。ws.reconnect.interval.ms:重连间隔毫秒时间,默认为 2000。connectionTimezone:连接级别的时区设置(3.1.8 及以上版本支持,目前只有解析结果集使用的时区功能),仅支持 .NET 6 及以上版本,只支持 IANA 时区格式,详情见时区说明。
其他参数请参考:Consumer 参数列表,注意 TDengine TSDB 服务端自 3.2.0.0 版本开始消息订阅中的 auto.offset.reset 默认值发生变化。
public IConsumer<TValue> Build()- 接口说明:构建消费者。
- 返回值:消费者对象。
IConsumer 接口提供了消费者相关 API:
-
ConsumeResult<TValue> Consume(int millisecondsTimeout)- 接口说明:消费消息。
- 参数说明:
millisecondsTimeout:毫秒超时时间。
- 返回值:消费结果。
- 异常:执行失败抛出
TDengineError异常。
-
List<TopicPartition> Assignment { get; }- 接口说明:获取分配信息。
- 返回值:分配信息。
- 异常:执行失败抛出
TDengineError异常。
-
List<string> Subscription()- 接口说明:获取订阅的主题。
- 返回值:主题列表。
- 异常:执行失败抛出
TDengineError异常。
-
void Subscribe(IEnumerable<string> topic)- 接口说明:订阅主题列表。
- 参数说明:
topic:主题列表。
- 异常:执行失败抛出
TDengineError异常。
-
void Subscribe(string topic)- 接口说明:订阅单个主题。
- 参数说明:
topic:主题。
- 异常:执行失败抛出
TDengineError异常。
-
void Unsubscribe()- 接口说明:取消订阅。
- 异常:执行失败抛出
TDengineError异常。
-
void Commit(ConsumeResult<TValue> consumerResult)- 接口说明:提交消费结果。
- 参数说明:
consumerResult:消费结果。
- 异常:执行失败抛出
TDengineError异常。
-
List<TopicPartitionOffset> Commit()- 接口说明:提交全部消费结果。
- 返回值:分区偏移量。
- 异常:执行失败抛出
TDengineError异常。
-
void Commit(IEnumerable<TopicPartitionOffset> offsets)- 接口说明:提交消费结果。
- 参数说明:
offsets:分区偏移量。
- 异常:执行失败抛出
TDengineError异常。
-
void Seek(TopicPartitionOffset tpo)- 接口说明:跳转到分区偏移量。
- 参数说明:
tpo:分区偏移量。
- 异常:执行失败抛出
TDengineError异常。
-
List<TopicPartitionOffset> Committed(TimeSpan timeout)- 接口说明:获取分区偏移量。
- 参数说明:
timeout:超时时间 (未使用)。
- 返回值:分区偏移量。
- 异常:执行失败抛出
TDengineError异常。
-
List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitions, TimeSpan timeout)- 接口说明:获取指定分区偏移量。
- 参数说明:
partitions:分区列表。timeout:超时时间 (未使用)。
- 返回值:分区偏移量。
- 异常:执行失败抛出
TDengineError异常。
-
Offset Position(TopicPartition partition)- 接口说明:获取消费位置。
- 参数说明:
partition:分区。
- 返回值:偏移量。
- 异常:执行失败抛出
TDengineError异常。
-
void Close()- 接口说明:关闭消费者。
消费记录
ConsumeResult 类提供了消费结果相关接口:
public List<TmqMessage<TValue>> Message- 接口说明:获取消息列表。
- 返回值:消息列表。
TmqMessage 类提供了消息具体内容:
public class TmqMessage<TValue>
{
public string TableName { get; set; }
public TValue Value { get; set; }
}
TableName:表名Value:消息内容
分区信息
从 ConsumeResult 获取 TopicPartitionOffset:
public TopicPartitionOffset TopicPartitionOffset
TopicPartitionOffset 类提供了获取分区信息的接口:
-
public string Topic { get; }- 接口说明:获取主题。
- 返回值:主题。
-
public Partition Partition { get; }- 接口说明:获取分区。
- 返回值:分区。
-
public Offset Offset { get; }- 接口说明:获取偏移量。
- 返回值:偏移量。
-
public TopicPartition TopicPartition- 接口说明:获取主题分区。
- 返回值:主题分区。
-
public string ToString()- 接口说明:转换为字符串。
- 返回值:字符串信息。
偏移量元数据
Offset 类提供了偏移量相关接口:
public long Value- 接口说明:获取偏移量值。
- 返回值:偏移量值。
反序列化
C# 驱动提供了两个反序列化类:ReferenceDeserializer 和 DictionaryDeserializer。它们都实现了 IDeserializer 接口。
ReferenceDeserializer 用来将消费到的一条记录反序列化为一个对象,需要保证对象类的属性名与消费到的数据的列名能够对应,且类型能够匹配。
DictionaryDeserializer 则会将消费到的一行数据反序列化为一个 Dictionary<string, object> 对象,其 key 为列名,值为对象。
ReferenceDeserializer 和 DictionaryDeserializer 的接口不会被用户直接调用,请参考使用样例。







