TDengine C# Connector
TDengine.Connector
是 TDengine 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine 集群数据的 C# 应用软件。
连接方式
TDengine.Connector
提供两种形式的连接器
- 原生连接,通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 实例,支持数据写入、查询、数据订阅、schemaless 接口和参数绑定接口等功能。
- Websocket 连接,通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例,WebSocket 连接实现的功能集合和原生连接有少量不同。(自 v3.0.1 起)
连接方式的详细介绍请参考:连接器建立连接的方式
兼容性
TDengine.Connector
3.1.0 版本进行了完整的重构,不再兼容 3.0.2 及以前版本。3.0.2 文档请参考 nugetTDengine.Connector
3.x 不兼容 TDengine 2.x,如果在运行 TDengine 2.x 版本的环境下需要使用 C# 连接器请使用 TDengine.Connector 的 1.x 版本。
支持的平台
支持的平台和 TDengine 客户端驱动支持的平台一致。
TDengine 不再支持 32 位 Windows 平台。
版本支持
Connector version | TDengine version |
---|---|
3.1.0 | 3.2.1.0/3.1.1.18 |
处理异常
TDengine.Connector
会抛出异常,应用程序需要处理异常。taosc 异常类型 TDengineError
,包含错误码和错误信息,应用程序可以根据错误码和错误信息进行处理。
TDengine DataType 和 C# DataType
TDengine DataType | C# Type |
---|---|
TIMESTAMP | DateTime |
TINYINT | sbyte |
SMALLINT | short |
INT | int |
BIGINT | long |
TINYINT UNSIGNED | byte |
SMALLINT UNSIGNED | ushort |
INT UNSIGNED | uint |
BIGINT UNSIGNED | ulong |
FLOAT | float |
DOUBLE | double |
BOOL | bool |
BINARY | byte[] |
NCHAR | string (utf-8编码) |
JSON | byte[] |
JSON 类型仅在 tag 中支持。
安装步骤
安装前准备
安装连接器
可以在当前 .NET 项目的路径下,通过 dotnet CLI 添加 Nuget package TDengine.Connector
到当前项目。
dotnet add package TDengine.Connector
也可以修改当前项目的 .csproj
文件,添加如下 ItemGroup。
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="3.1.*" />
</ItemGroup>
建立连接
- 原生连接
- WebSocket 连接
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
Console.WriteLine("connected");
}
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
Console.WriteLine("connected");
}
ConnectionStringBuilder 支持的参数如下:
- protocol: 连接协议,可选值为 Native 或 WebSocket,默认为 Native
- host: TDengine 或 taosadapter 运行实例的地址
- port: TDengine 或 taosadapter 运行实例的端口
- 当 protocol 为 WebSocket 时 useSSL 为 false 时,port 默认为 6041
- 当 protocol 为 WebSocket 时 useSSL 为 true 时,port 默认为 443
- useSSL: 是否使用 SSL 连接,仅当 protocol 为 WebSocket 时有效,默认为 false
- token: 连接 TDengine cloud 的 token,仅当 protocol 为 WebSocket 时有效
- username: 连接 TDengine 的用户名
- password: 连接 TDengine 的密码
- db: 连接 TDengine 的数据库
- timezone: 解析时间结果的时区,默认为
TimeZoneInfo.Local
,使用TimeZoneInfo.FindSystemTimeZoneById
方法解析字符串为TimeZoneInfo
对象。 - connTimeout: WebSocket 连接超时时间,仅当 protocol 为 WebSocket 时有效,默认为 1 分钟,使用
TimeSpan.Parse
方法解析字符串为TimeSpan
对象。 - readTimeout: WebSocket 读超时时间,仅当 protocol 为 WebSocket 时有效,默认为 5 分钟,使用
TimeSpan.Parse
方法解析字符串为TimeSpan
对象。 - writeTimeout: WebSocket 写超时时间,仅当 protocol 为 WebSocket 时有效,默认为 10 秒,使用
TimeSpan.Parse
方法解析字符串为TimeSpan
对象。
指定 URL 和 Properties 获取连接
C# 连接器不支持此功能
配置参数的优先级
C# 连接器不支持此功能
使用示例
创建数据库和表
- 原生连接
- WebSocket 连接
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
插入数据
- 原生连接
- WebSocket 连接
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
string insertQuery =
"INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"('2023-10-03 14:38:05.000', 10.30000, 219, 0.31000) " +
"('2023-10-03 14:38:15.000', 12.60000, 218, 0.33000) " +
"('2023-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"('2023-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
"power.d1003 USING power.meters TAGS(2,'California.LosAngeles') " +
"VALUES " +
"('2023-10-03 14:38:05.500', 11.80000, 221, 0.28000) " +
"('2023-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
"power.d1004 USING power.meters TAGS(3,'California.LosAngeles') " +
"VALUES " +
"('2023-10-03 14:38:05.000', 10.80000, 223, 0.29000) " +
"('2023-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
client.Exec(insertQuery);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
string insertQuery =
"INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"('2023-10-03 14:38:05.000', 10.30000, 219, 0.31000) " +
"('2023-10-03 14:38:15.000', 12.60000, 218, 0.33000) " +
"('2023-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"('2023-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
"power.d1003 USING power.meters TAGS(2,'California.LosAngeles') " +
"VALUES " +
"('2023-10-03 14:38:05.500', 11.80000, 221, 0.28000) " +
"('2023-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
"power.d1004 USING power.meters TAGS(3,'California.LosAngeles') " +
"VALUES " +
"('2023-10-03 14:38:05.000', 10.80000, 223, 0.29000) " +
"('2023-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
client.Exec(insertQuery);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
查询数据
- 原生连接
- WebSocket 连接
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("use power");
string query = "SELECT * FROM meters";
using (var rows = client.Query(query))
{
while (rows.Read())
{
Console.WriteLine($"{((DateTime)rows.GetValue(0)):yyyy-MM-dd HH:mm:ss.fff}, {rows.GetValue(1)}, {rows.GetValue(2)}, {rows.GetValue(3)}, {rows.GetValue(4)}, {Encoding.UTF8.GetString((byte[])rows.GetValue(5))}");
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("use power");
string query = "SELECT * FROM meters";
using (var rows = client.Query(query))
{
while (rows.Read())
{
Console.WriteLine($"{((DateTime)rows.GetValue(0)):yyyy-MM-dd HH:mm:ss.fff}, {rows.GetValue(1)}, {rows.GetValue(2)}, {rows.GetValue(3)}, {rows.GetValue(4)}, {Encoding.UTF8.GetString((byte[])rows.GetValue(5))}");
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
执行带有 reqId 的 SQL
reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:
- 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
- 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
- 故障诊断:当一个请求失败时,可 以通过查看与该请求关联的 reqId 来找出问题发生的位置
如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。
- 原生连接
- WebSocket 连接
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeQueryWithReqID
{
internal abstract class QueryWithReqID
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec($"create database if not exists test_db",ReqId.GetReqId());
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSQueryWithReqID
{
internal abstract class QueryWithReqID
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec($"create database if not exists test_db",ReqId.GetReqId());
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
通过参数绑定写入数据
- 原生连接
- WebSocket 连接
using System;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeStmt
{
internal abstract class NativeStmt
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
stmt.Prepare(
"Insert into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(?,?,?,?)");
var ts = new DateTime(2023, 10, 03, 14, 38, 05, 000);
stmt.BindRow(new object[] { ts, (float)10.30000, (int)219, (float)0.31000 });
stmt.AddBatch();
stmt.Exec();
var affected = stmt.Affected();
Console.WriteLine($"affected rows: {affected}");
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
}
using System;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSStmt
{
internal abstract class WSStmt
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
stmt.Prepare(
"Insert into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(?,?,?,?)");
var ts = new DateTime(2023, 10, 03, 14, 38, 05, 000);
stmt.BindRow(new object[] { ts, (float)10.30000, (int)219, (float)0.31000 });
stmt.AddBatch();
stmt.Exec();
var affected = stmt.Affected();
Console.WriteLine($"affected rows: {affected}");
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
}
注意:使用 BindRow 需要注意原始 C# 列类型与 TDengine 列类型的需要一一对应,具体对应关系请参考 TDengine DataType 和 C# DataType。
无模式写入
- 原生连接
- WebSocket 连接
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeSchemaless
{
internal class Program
{
public static void Main(string[] args)
{
var builder =
new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("create database sml");
client.Exec("use sml");
var influxDBData =
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000";
client.SchemalessInsert(new string[] { influxDBData },
TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NANO_SECONDS, 0, ReqId.GetReqId());
var telnetData = "stb0_0 1626006833 4 host=host0 interface=eth0";
client.SchemalessInsert(new string[] { telnetData },
TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
var jsonData =
"{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
client.SchemalessInsert(new string[] { jsonData }, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
}
}
}
}
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSSchemaless
{
internal class Program
{
public static void Main(string[] args)
{
var builder =
new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("create database sml");
client.Exec("use sml");
var influxDBData =
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000";
client.SchemalessInsert(new string[] { influxDBData },
TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NANO_SECONDS, 0, ReqId.GetReqId());
var telnetData = "stb0_0 1626006833 4 host=host0 interface=eth0";
client.SchemalessInsert(new string[] { telnetData },
TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
var jsonData =
"{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
client.SchemalessInsert(new string[] { jsonData }, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
}
}
}
}
执行带有 reqId 的无模式写入
public void SchemalessInsert(string[] lines, TDengineSchemalessProtocol protocol,
TDengineSchemalessPrecision precision,
int ttl, long reqId)
数据订阅
创建 Topic
- 原生连接
- WebSocket 连接
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeSubscription
{
internal class Program
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC topic_meters as SELECT * from power.meters");
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSSubscription
{
internal class Program
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC topic_meters as SELECT * from power.meters");
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
创建 Consumer
- 原生连接
- WebSocket 连接
var cfg = new Dictionary<string, string>()
{
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "127.0.0.1" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "td.connect.port", "6030" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "localhost" },
{ "td.connect.port", "6041" },
{ "useSSL", "false" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
consumer 支持的配置参数如下:
-
td.connect.type: 连接类型,可选值为 Native 或 WebSocket,默认为 Native
-
td.connect.ip: TDengine 或 taosadapter 运行实例的地址
-
td.connect.port: TDengine 或 taosadapter 运行实例的端口
- 当 td.connect.type 为 WebSocket 且 useSSL 为 false 时,td.connect.port 默认为 6041
- 当 td.connect.type 为 WebSocket 且 useSSL 为 true 时,td.connect.port 默认为 443
-
useSSL: 是否使用 SSL 连接,仅当 td.connect.type 为 WebSocket 时有效,默认为 false
-
token: 连接 TDengine cloud 的 token,仅当 td.connect.type 为 WebSocket 时有效
-
td.connect.user: 连接 TDengine 的用户名
-
td.connect.pass: 连接 TDengine 的密码
-
group.id: 消费者组 ID
-
client.id: 消费者 ID
-
enable.auto.commit: 是否自动提交 offset,默认为 true
-
auto.commit.interval.ms: 自动提交 offset 的间隔时间,默认为 5000 毫秒
-
auto.offset.reset: 当 offset 不存在时,从哪里开始消费,可选值为 earliest 或 latest,默认为 latest
-
msg.with.table.name: 消息是否包含表名
支持订阅结果集 Dictionary<string, object>
key 为列名,value 为列值。
如果使用 object 接收列值,需要注意:
- 原始 C# 列类型与 TDengine 列类型的需要一一对应,具体对应关系请参考 TDengine DataType 和 C# DataType。
- 列名与 class 属性名一致,并可读写。
- 明确设置 value 解析器
ConsumerBuilder.SetValueDeserializer(new ReferenceDeserializer<T>());
样例如下
结果 class
class Result
{
public DateTime ts { get; set; }
public float current { get; set; }
public int voltage { get; set; }
public float phase { get; set; }
}
设置解析器
var tmqBuilder = new ConsumerBuilder<Result>(cfg);
tmqBuilder.SetValueDeserializer(new ReferenceDeserializer<Result>());
var consumer = tmqBuilder.Build();
也可实现自定义解析器,实现 IDeserializer<T>
接口并通过ConsumerBuilder.SetValueDeserializer
方法传入。
public interface IDeserializer<T>
{
T Deserialize(ITMQRows data, bool isNull, SerializationContext context);
}
订阅消费数据
consumer.Subscribe(new List<string>() { "topic_meters" });
while (true)
{
using (var cr = consumer.Consume(500))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
Console.WriteLine(
$"message {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
指定订阅 Offset
consumer.Assignment.ForEach(a =>
{
Console.WriteLine($"{a}, seek to 0");
consumer.Seek(new TopicPartitionOffset(a.Topic, a.Partition, 0));
Thread.Sleep(TimeSpan.FromSeconds(1));
});
提交 Offset
public void Commit(ConsumeResult<TValue> consumerResult)
public List<TopicPartitionOffset> Commit()
public void Commit(IEnumerable<TopicPartitionOffset> offsets)
关闭订阅
consumer.Unsubscribe();
consumer.Close();
完整示例
- 原生连接
- WebSocket 连接
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace NativeSubscription
{
internal class Program
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("CREATE DATABASE power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC topic_meters as SELECT * from power.meters");
var cfg = new Dictionary<string, string>()
{
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "127.0.0.1" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "td.connect.port", "6030" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
consumer.Subscribe(new List<string>() { "topic_meters" });
Task.Run(InsertData);
while (true)
{
using (var cr = consumer.Consume(500))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
Console.WriteLine(
$"message {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
consumer.Commit();
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
static void InsertData()
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec("INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace WSSubscription
{
internal class Program
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("CREATE DATABASE power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC topic_meters as SELECT * from power.meters");
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "localhost" },
{ "td.connect.port", "6041" },
{ "useSSL", "false" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
consumer.Subscribe(new List<string>() { "topic_meters" });
Task.Run(InsertData);
while (true)
{
using (var cr = consumer.Consume(500))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
Console.WriteLine(
$"message {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
consumer.Commit();
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
static void InsertData()
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec("INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
}
}
ADO.NET
C# 连接器支持 ADO.NET 接口,可以通过 ADO.NET 接口连接 TDengine 运行实例,进行数据写入、查询等操作。
- 原生连接
- WebSocket 连接
using System;
using TDengine.Data.Client;
namespace NativeADO
{
internal class Program
{
public static void Main(string[] args)
{
const string connectionString = "host=localhost;port=6030;username=root;password=taosdata";
using (var connection = new TDengineConnection(connectionString))
{
try
{
connection.Open();
using (var command = new TDengineCommand(connection))
{
command.CommandText = "create database power";
command.ExecuteNonQuery();
connection.ChangeDatabase("power");
command.CommandText =
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))";
command.ExecuteNonQuery();
command.CommandText = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(?,?,?,?)";
var parameters = command.Parameters;
parameters.Add(new TDengineParameter("@0", new DateTime(2023,10,03,14,38,05,000)));
parameters.Add(new TDengineParameter("@1", (float)10.30000));
parameters.Add(new TDengineParameter("@2", (int)219));
parameters.Add(new TDengineParameter("@3", (float)0.31000));
command.ExecuteNonQuery();
command.Parameters.Clear();
command.CommandText = "SELECT * FROM meters";
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine(
$"{((DateTime) reader.GetValue(0)):yyyy-MM-dd HH:mm:ss.fff}, {reader.GetValue(1)}, {reader.GetValue(2)}, {reader.GetValue(3)}, {reader.GetValue(4)}, {System.Text.Encoding.UTF8.GetString((byte[]) reader.GetValue(5))}");
}
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
}
using System;
using TDengine.Data.Client;
namespace WSADO
{
internal class Program
{
public static void Main(string[] args)
{
const string connectionString = "protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata";
using (var connection = new TDengineConnection(connectionString))
{
try
{
connection.Open();
using (var command = new TDengineCommand(connection))
{
command.CommandText = "create database power";
command.ExecuteNonQuery();
connection.ChangeDatabase("power");
command.CommandText =
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))";
command.ExecuteNonQuery();
command.CommandText = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(?,?,?,?)";
var parameters = command.Parameters;
parameters.Add(new TDengineParameter("@0", new DateTime(2023,10,03,14,38,05,000)));
parameters.Add(new TDengineParameter("@1", (float)10.30000));
parameters.Add(new TDengineParameter("@2", (int)219));
parameters.Add(new TDengineParameter("@3", (float)0.31000));
command.ExecuteNonQuery();
command.Parameters.Clear();
command.CommandText = "SELECT * FROM meters";
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine(
$"{((DateTime) reader.GetValue(0)):yyyy-MM-dd HH:mm:ss.fff}, {reader.GetValue(1)}, {reader.GetValue(2)}, {reader.GetValue(3)}, {reader.GetValue(4)}, {System.Text.Encoding.UTF8.GetString((byte[]) reader.GetValue(5))}");
}
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
}
- 连接参数与建立连接中的连接参数一致。
- TDengineParameter 的 name 需要以 @ 开头,如 @0、@1、@2 等,value 需要 C# 列类型与 TDengine 列类型一一对应,具体对应关系请参考 TDengine DataType 和 C# DataType。