TDengine TSDB Python Connector
taospy 是 TDengine TSDB 数据库面向 Python 语言提供的官方连接器,连接器对外提供对数据库写入、查询、订阅等多种访问接口。
安装连接器命令如下:
# 原生连接
pip3 install taospy
# WebSocket 连接,可选装
pip3 install taos-ws-py
连接器代码对外开源,源码托管在 Github taos-connector-python 仓库。
连接方式
taospy提供两种连接方式,我们推荐使用 WebSocket 连接。
- 原生连接,Python 连接器加载 TDengine TSDB 客户端驱动程序 (libtaos.so/taos.dll),直接连接 TDengine TSDB 实例,特点性能高,速度快。功能上支持数据写入、查询、数据订阅、schemaless 接口和参数绑定接口等功能。对应
taospy包的taos模块。 - WebSocket 连接,Python 连接器通过
taosAdapter提供的 WebSocket 接口连接 TDengine TSDB 实例,特点是兼具前两种连接的优势,即性能高又依赖小。功能上 WebSocket 连接实现功能集合和原生连接有少量不同。对应taos-ws-py包,可以选装。
连接方式的详细介绍请参考:连接方式
除了对原生接口的封装,taospy 还提供了符合 Python 数据访问规范 (PEP 249) 的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemy 和 pandas。
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 WebSocket 接口与服务端建立的连接的方式下文中称为“WebSocket 连接”。
- 如果需要在 taos-ws-py 中使用 SQLAlchemy 需要安装 taospy 2.8.5 以上版本。
- 如果有性能方面的要求建议采用 WebSocket 连接方式,原因是:
- 由于 GIL (全局解释器锁) 的限制,多线程无法利用多核优势,实际仍是串行执行。在原生连接方式下,Python 的数据转换及解析操作会受到 GIL 制约从而影响效率;而采用 WebSocket 方式时,涉及 IO 操作时 (如网络请求、文件读写) 会释放 GIL,使其他线程能获取锁并执行,在 IO 密集场景下可显著提升效率。
- 接口调用方面,原生连接方式需要进行大量 C 与 Python 之间的数据类型转换;而 WebSocket 方式仅需完成相应的接口数据转换,数据的处理和解析工作均由 WebSocket 连接器 (Rust) 和 taosAdapter (Go) 执行,能有效突破 Python 的性能瓶颈。
Python 版本兼容性
支持 Python 3.0 及以上版本。
支持的平台
- 原生连接支持的平台和 TDengine TSDB 客户端驱动支持的平台一致。
- WebSocket 连接支持所有能运行 Python 的平台。
历史版本
Python Connector 历史版本(建议使用最新版本的 taospy):
| Python Connector 版本 | 主要变化 | TDengine TSDB 版本 |
|---|---|---|
| 2.8.6 | 支持 pandas 的 read_sql_table,to_sql 和 read_sql 接口 | - |
| 2.8.5 | 支持 taos-ws-py 的 sqlalchemy 功能 | - |
| 2.8.4 | 支持 DBUtils 连接池 | - |
| 2.8.3 | 支持 BLOB 数据类型 | - |
| 2.8.2 | 连接参数设置支持跨平台 | - |
| 2.8.1 | 增加 connect 属性设置函数 | - |
| 2.8.0 | 移除 Apache Superset 连接驱动 | - |
| 2.7.23 | 支持 DECIMAL 数据类型 | - |
| 2.7.22 | 支持 Python 3.12 及以上版本 | - |
| 2.7.21 | Native 支持 STMT2 写入 | - |
| 2.7.19 | 支持 Apache Superset 连接 TDengine TSDB Cloud 数据源 | - |
| 2.7.18 | 支持 Apache Superset 产品连接本地 TDengine TSDB 数据源 | - |
| 2.7.16 | 新增订阅配置 (session.timeout.ms, max.poll.interval.ms) | - |
| 2.7.15 | 新增 VARBINARY 和 GEOMETRY 类型支持 | - |
| 2.7.14 | 修复已知问题 | - |
| 2.7.13 | 新增 tmq 同步提交 offset 接口 | - |
| 2.7.12 | 1. 新增 varbinary 类型支持(STMT 暂不支持 varbinary) 2. query 性能提升(感谢贡献者hadrianl) | 3.1.1.2 及更高版本 |
| 2.7.9 | 数据订阅支持获取消费进度和重置消费进度 | 3.0.2.6 及更高版本 |
| 2.7.8 | 新增 execute_many | 3.0.0.0 及更高版本 |
WebSocket Connector 历史版本:
| WebSocket Connector 版本 | 主要变化 | TDengine TSDB 版本 |
|---|---|---|
| 0.6.3 | 支持配置 WebSocket 连接的响应超时时间(不含数据订阅) | - |
| 0.6.2 | 修复内存泄漏 | - |
| 0.6.1 | 1. 支持 BLOB 数据类型 2. 支持时区 3. 支持负载均衡 | - |
| 0.5.3 | 支持 ipv6 地址格式 | - |
| 0.5.2 | 升级 rust 连接器版本,修复了云服务连接的 token 问题 | - |
| 0.5.1 | 支持 stmt2 参数绑定写入和查询 | - |
| 0.4.0 | 支持订阅新增参数动态扩展 | - |
| 0.3.9 | 修复 fetchmany 自定义行数时获取不完全的问题 | - |
| 0.3.8 | 支持 SuperSet 连接到 TDengine TSDB 云服务实例 | - |
| 0.3.5 | 修复 crypto provider 中的问题 | - |
| 0.3.4 | 支持 VARBINARY 和 GEOMETRY 数据类型 | 3.3.0.0 及更高版本 |
| 0.3.2 | 优化 WebSocket sql 查询和插入性能,修改 readme 和 文档,修复已知问题 | 3.2.3.0 及更高版本 |
| 0.2.9 | 已知问题修复 | - |
| 0.2.5 | 1. 数据订阅支持获取消费进度和重置消费进度 2. 支持 schemaless 3. 支持 STMT | - |
| 0.2.4 | 数据订阅新增取消订阅方法 | 3.0.5.0 及更高版本 |
处理异常
Python 连接器可能会产生 4 种异常:
- Python 连接器本身的异常
- 原生连接方式的异常
- websocket 连接方式异常
- 数据订阅异常
- TDengine TSDB 其他功能模块的报错,请参考 错误码
| Error Type | Description | Suggested Actions |
|---|---|---|
| InterfaceError | taosc 版本太低,不支持所使用的接口 | 请检查 TDengine TSDB 客户端版本 |
| ConnectionError | 数据库链接错误 | 请检查 TDengine TSDB 服务端状态和连接参数 |
| DatabaseError | 数据库错误 | 请检查 TDengine TSDB 服务端版本,并将 Python 连接器升级到最新版 |
| OperationalError | 操作错误 | API 使用错误,请检查代码 |
| ProgrammingError | 接口调用错误 | 请检查提交的数据是否正确 |
| StatementError | stmt 相关异常 | 请检查绑定参数与 sql 是否匹配 |
| ResultError | 操作数据错误 | 请检查操作的数据与数据库中的数据类型是否匹配 |
| SchemalessError | schemaless 相关异常 | 请检查数据格式及对应的协议类型是否正确 |
| TmqError | tmq 相关异常 | 请检查 Topic 及 consumer 配置是否正确 |
Python 中通常通过 try-expect 处理异常,异常处理相关请参考 Python 错误和异常文档。
TDengine TSDB 其他功能模块的报错,请参考 错误码
Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
import taos
try:
conn = taos.connect()
conn.execute("CREATE TABLE 123") # wrong sql
except taos.Error as e:
print(e)
print("exception class: ", e.__class__.__name__)
print("error number:", e.errno)
print("error message:", e.msg)
except BaseException as other:
print("exception occur")
print(other)
# output:
# [0x0216]: syntax error near 'Incomplete SQL statement'
# exception class: ProgrammingError
# error number: -2147483114
# error message: syntax error near 'Incomplete SQL statement'
数据类型映射
TDengine TSDB 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:
| TDengine TSDB DataType | Python DataType |
|---|---|
| TIMESTAMP | datetime |
| INT | int |
| BIGINT | int |
| FLOAT | float |
| DOUBLE | int |
| SMALLINT | int |
| TINYINT | int |
| BOOL | bool |
| BINARY | str |
| NCHAR | str |
| JSON | str |
| GEOMETRY | bytearray |
| VARBINARY | bytearray |
| DECIMAL | Decimal |
| BLOB | bytearray |
示例程序汇总
| 示例程序链接 | 示例程序内容 |
|---|---|
| bind_multi.py | 参数绑定,一次绑定多行 |
| bind_row.py | 参数绑定,一次绑定一行 |
| insert_lines.py | InfluxDB 行协议写入 |
| json_tag.py | 使用 JSON 类型的标签 |
| tmq_consumer.py | tmq 订阅 |
| native_all_type_query.py | 支持全部类型示例 |
| native_all_type_stmt.py | 参数绑定 stmt 全部类型示例 |
| test_stmt2.py | 参数绑定 stmt2 写入示例 |
示例程序源码请参考:
关于纳秒 (nanosecond)
由于目前 Python 对 nanosecond 支持的不完善 (见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。
- https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
- https://www.python.org/dev/peps/pep-0564/
常见问题
欢迎提问或报告问题。
API 参考
WebSocket 连接
URL 规范
[+<protocol>]://[<username>:<password>@][<host1>:<port1>[,...<hostN>:<portN>]][/<database>][?<key1>=<value1>[&...<keyN>=<valueN>]]
|-----------|---|----------|-----------|-------------------------------------|------------|--------------------------------------|
| protocol | | username | password | addresses | database | params |
- protocol:指定使用的协议,例如
ws://localhost:6041表示通过 WebSocket 协议建立连接。- ws:通过 WebSocket 协议建立连接。
- wss:通过 WebSocket 协议并启用 SSL/TLS 加密建立连接。
- username/password:数据库的用户名和密码。
- addresses:指定创建连接的服务器地址,多个地址间用英文逗号分隔。对于 IPv6 地址,必须使用中括号括起来(如
[::1]或[2001:db8:1234:5678::1]),以避免端口号解析冲突。- 示例:
ws://host1:6041,host2:6041或ws://(等同于ws://localhost:6041)。
- 示例:
- database:数据库名称。
- params:
token:用于 TDengine TSDB 云服务的身份验证。timezone:时区,IANA 格式(如Asia/Shanghai),默认为本地时区。compression:是否启用数据压缩,默认为false。conn_retries:连接失败时的最大重试次数,默认为 5。retry_backoff_ms:连接失败时的初始等待时间(毫秒),默认为 200。此值会随着连续失败而指数增长,直到达到最大等待时间。retry_backoff_max_ms:连接失败时的最大等待时间(毫秒),默认为 2000。read_timeout:WebSocket 连接的响应超时时间(秒),不含数据订阅,默认为 300(5 分钟)。
建立连接
fn connect(dsn: Option<&str>, args: Option<&PyDict>) -> PyResult<Connection>- 接口说明:建立 taosAdapter 连接。
- 参数说明:
dsn:可选参数,类型为Option<&str>。数据源名称(DSN),用于指定数据库的连接信息,包括协议、用户名、密码、主机、端口、数据库名及参数等。args:可选参数,类型为Option<&PyDict>。以 Python 字典形式提供,字典中的值均为字符串类型。可用于配置以下选项:user:数据库的用户名password:数据库的密码host:主机地址port:端口号database:数据库名称timezone:时区,IANA 格式(如Asia/Shanghai),默认为本地时区。compression:是否启用数据压缩,默认为false。conn_retries:连接失败时的最大重试次数,默认为 5。retry_backoff_ms:连接失败时的初始等待时间(毫秒),默认为 200。此值会随着连续失败而指数增长,直到达到最大等待时间。retry_backoff_max_ms:连接失败时的最大等待时间(毫秒),默认为 2000。read_timeout:WebSocket 连接的响应超时时间(秒),不含数据订阅,默认为 300(5 分钟)。
- 返回值:连接对象。
- 异常:操作失败抛出
ConnectionError异常。
fn cursor(&self) -> PyResult<Cursor>- 接口说明:创建一个新的数据库游标对象,用于执行 SQL 命令和查询。
- 返回值:数据库游标对象。
- 异常:操作失败抛出
ConnectionError异常。
执行 SQL
fn execute(&self, sql: &str) -> PyResult<i32>- 接口说明:执行 sql 语句。
- 参数说明:
sql:待执行的 sql 语句。
- 返回值:影响的条数。
- 异常:操作失败抛出
QueryError异常。
fn execute_with_req_id(&self, sql: &str, req_id: u64) -> PyResult<i32>- 接口说明:执行带有 req_id 的 sql 语句。
- 参数说明:
sql:待执行的 sql 语句。reqId:用于问题追踪。
- 返回值:影响的条数。
- 异常:操作失败抛出
QueryError异常。
fn query(&self, sql: &str) -> PyResult<TaosResult>- 接口说明:查询数据。
- 参数说明:
sql:待执行的 sql 语句。
- 返回值:
TaosResult数据集对象。 - 异常:操作失败抛出
QueryError异常。
fn query_with_req_id(&self, sql: &str, req_id: u64) -> PyResult<TaosResult>- 接口说明:查询带有 req_id 的 sql 语句。
- 参数说明:
sql:待执行的 sql 语句。reqId:用于问题追踪。
- 返回值:
TaosResult数据集对象。 - 异常:操作失败抛出
QueryError异常。
数据集
TaosResult 对象可以通过循环遍历获取查询到的数据。
fn fields(&self) -> Vec<TaosField>- 接口说明:获取查询数据的字段信息,包括:名称,类型及字段长度。
- 返回值:
Vec<TaosField>字段信息数组。
fn field_count(&self) -> i32- 接口说明:获取查询到的记录条数。
- 返回值:
i32查询到的记录条数。
无模式写入
fn schemaless_insert(&self, lines: Vec<String>, protocol: PySchemalessProtocol, precision: PySchemalessPrecision, ttl: i32, req_id: u64) -> PyResult<()>- 接口说明:无模式写入。
- 参数说明:
lines:待写入的数据数组,无模式具体的数据格式可参考Schemaless 写入。protocol:协议类型PySchemalessProtocol::Line:InfluxDB 行协议(Line Protocol)。PySchemalessProtocol::Telnet:OpenTSDB 文本行协议。PySchemalessProtocol::Json:JSON 协议格式
precision:时间精度PySchemalessPrecision::Hour:小时PySchemalessPrecision::Minute:分钟PySchemalessPrecision::Second秒PySchemalessPrecision::Millisecond:毫秒PySchemalessPrecision::Microsecond:微秒PySchemalessPrecision::Nanosecond:纳秒
ttl:表过期时间,单位天。reqId:用于问题追踪。
- 异常:操作失败抛出
DataError或OperationalError异常。
参数绑定
从 0.5.1 版本开始,提供 stmt2 绑定参数的接口,用于实现高效写入。
fn stmt2_statement(&self) -> PyResult<TaosStmt2>- 接口说明:使用 连接 对象创建 stmt2 对象。
- 返回值:stmt2 对象。
- 异常:操作失败抛出
ConnectionError异常。
fn prepare(&mut self, sql: &str) -> PyResult<()>- 接口说明:绑定预编译 sql 语句。
- 参数说明:
sql:预编译的 SQL 语句。
- 异常:操作失败抛出
ProgrammingError异常。
fn bind(&mut self, params: Vec<PyStmt2BindParam>) -> PyResult<()>- 接口说明:绑定数据。
- 参数说明:
params:绑定数据。
- 异常:操作失败抛出
ProgrammingError异常。
fn execute(&mut self) -> PyResult<usize>- 接口说明:执行将绑定的数据全部写入。
- 返回值:写入条数。
- 异常:操作失败抛出
QueryError异常。
fn result_set(&mut self) -> PyResult<TaosResult>- 接口说明:获取绑定查询结果集。
- 返回值:查询结果集。
- 异常:操作失败抛出
QueryError异常。
fn affect_rows(&mut self) -> PyResult<usize>- 接口说明:获取写入条数。
- 返回值:写入条数。
fn close(&self) -> PyResult<()>- 接口说明:关闭 stmt2 对象。
标准的 Stmt 绑定参数的接口。
fn statement(&self) -> PyResult<TaosStmt>- 接口说明:使用 连接 对象创建 stmt 对象。
- 返回值:stmt 对象。
- 异常:操作失败抛出
ConnectionError异常。
fn prepare(&mut self, sql: &str) -> PyResult<()>- 接口说明:绑定预编译 sql 语句。
- 参数说明:
sql:预编译的 SQL 语句。
- 异常:操作失败抛出
ProgrammingError异常。
fn set_tbname(&mut self, table_name: &str) -> PyResult<()>- 接口说明:设置将要写入数据的表名。
- 参数说明:
tableName:表名,如果需要指定数据库,例如:db_name.table_name即可。
- 异常:操作失败抛出
ProgrammingError异常。
fn set_tags(&mut self, tags: Vec<PyTagView>) -> PyResult<()>- 接口说明:设置表 Tags 数据,用于自动建表。
- 参数说明:
paramsArray:Tags 数据。
- 异常:操作失败抛出
ProgrammingError异常。
fn bind_param(&mut self, params: Vec<PyColumnView>) -> PyResult<()>- 接口说明:绑定数据。
- 参数说明:
paramsArray:绑定数据。
- 异常:操作失败抛出
ProgrammingError异常。
fn add_batch(&mut self) -> PyResult<()>- 接口说明:提交绑定数据。
- 异常:操作失败抛出
ProgrammingError异常。
fn execute(&mut self) -> PyResult<usize>- 接口说明:执行将绑定的数据全部写入。
- 返回值:写入条数。
- 异常:操作失败抛出
QueryError异常。
fn affect_rows(&mut self) -> PyResult<usize>- 接口说明:获取写入条数。
- 返回值:写入条数。
fn close(&self) -> PyResult<()>- 接口说明:关闭 stmt 对象。
数据订阅
-
创建消费者支持属性列表:
- host:主机地址。
- port:端口号。
- group.id:所在的 group。
- client.id:客户端 id。
- td.connect.user:数据库用户名。
- td.connect.pass:数据库密码。
- td.connect.token:数据库的连接 token。
- auto.offset.reset:来确定消费位置为最新数据(latest)还是包含旧数据(earliest)。
- enable.auto.commit:是否允许自动提交。
- auto.commit.interval.ms:自动提交间隔
-
fn Consumer(conf: Option<&PyDict>, dsn: Option<&str>) -> PyResult<Self>- 接口说明 消费者构造函数。
conf:类型Option<&PyDict>可选,以 Python 字典的形式提供,具体配置参见属性列表。dsn:类型Option<&str>可选,数据源名称(DSN),用于指定要连接的数据库的位置和认证信息。
- 返回值:Consumer 消费者对象。
- 异常:操作失败抛出
ConsumerException异常。
- 接口说明 消费者构造函数。
-
fn subscribe(&mut self, topics: &PyList) -> PyResult<()>- 接口说明 订阅一组主题。
- 参数说明:
topics:订阅的主题列表。
- 异常:操作失败抛出
ConsumerException异常。
-
fn unsubscribe(&mut self)- 接口说明 取消订阅。
- 异常:操作失败抛出
ConsumerException异常。
-
fn poll(&mut self, timeout: Option<f64>) -> PyResult<Option<Message>>- 接口说明 轮询消息。
- 参数说明:
timeoutMs:表示轮询的超时时间,单位毫秒。
- 返回值:
Message每个主题对应的数据。 - 异常:操作失败抛出
ConsumerException异常。
-
fn commit(&mut self, message: &mut Message) -> PyResult<()>- 接口说明 提交当前处理的消息的偏移量。
- 参数说明:
message:类型Message, 当前处理的消息的偏移量。
- 异常:操作失败抛出
ConsumerException异常。
-
fn assignment(&mut self) -> PyResult<Option<Vec<TopicAssignment>>>- 接口说明:获取消费者当前分配的指定的分区或所有分区。
- 返回值:返回值类型为
Vec<TopicAssignment>,即消费者当前分配的所有分区。 - 异常:操作失败抛出 ConsumerException 异常。
-
fn seek(&mut self, topic: &str, vg_id: i32, offset: i64) -> PyResult<()>- 接口说明:将给定分区的偏移量设置到指定的位置。
- 参数说明:
topic:订阅的主题。vg_id:vgroupid。offset:需要设置的偏移量。
- 异常:操作失败抛出 ConsumerException 异常。
-
fn committed(&mut self, topic: &str, vg_id: i32) -> PyResult<i64>- 接口说明:获取订阅主题的 vgroupid 分区最后提交的偏移量。
- 参数说明:
topic:订阅的主题。vg_id:vgroupid。
- 返回值:
i64,分区最后提交的偏移量。 - 异常:操作失败抛出 ConsumerException 异常。
-
fn position(&mut self, topic: &str, vg_id: i32) -> PyResult<i64>- 接口说明:获取给定分区当前的偏移量。
- 参数说明:
topic:订阅的主题。vg_id:vgroupid。
- 返回值:
i64,分区最后提交的偏移量。 - 异常:操作失败抛出 ConsumerException 异常。
-
fn close(&mut self)- 接口说明:关闭 tmq 连接。
- 异常:操作失败抛出 ConsumerException 异常。
Native 连接
建立连接
def connect(*args, **kwargs):- 接口说明:建立 taosAdapter 连接。
- 参数说明:
kwargs:以 Python 字典的形式提供,可用于设置user:数据库的用户名password:数据库的密码。host:主机地址port:端口号database:数据库名称timezone:时区
- 返回值:
TaosConnection连接对象。 - 异常:操作失败抛出
AttributeError或ConnectionError异常。
def cursor(self)- 接口说明:创建一个新的数据库游标对象,用于执行 SQL 命令和查询。
- 返回值:数据库游标对象。
执行 SQL
def execute(self, operation, req_id: Optional[int] = None)- 接口说明:执行 sql 语句。
- 参数说明:
operation:待执行的 sql 语句。reqId:用于问题追踪。
- 返回值:影响的条数。
- 异常:操作失败抛出
ProgrammingError异常。
def query(self, sql: str, req_id: Optional[int] = None) -> TaosResult- 接口说明:查询数据。
- 参数说明:
sql:待执行的 sql 语句。reqId:用于问题追踪。
- 返回值:
TaosResult数据集对象。 - 异常:操作失败抛出
ProgrammingError异常。
数据集
TaosResult 对象可以通过循环遍历获取查询到的数据。
def fields(&self)- 接口说明:获取查询数据的字段信息,包括:名称,类型及字段长度。
- 返回值:
TaosFields字段信息 list。
def field_count(&self)- 接口说明:获取查询到的记录条数。
- 返回值:查询到的记录条数。
def fetch_all_into_dict(self)- 接口说明:将所有的记录转换为字典。
- 返回值:返回字典列表。
无模式写入
def schemaless_insert(&self, lines: List[str], protocol: SmlProtocol, precision: SmlPrecision, req_id: Optional[int] = None, ttl: Optional[int] = None) -> int:- 接口说明:无模式写入。
- 参数说明:
lines:待写入的数据数组,无模式具体的数据格式可参考Schemaless 写入。protocol:协议类型SmlProtocol.LINE_PROTOCOL:InfluxDB 行协议(Line Protocol)。SmlProtocol.TELNET_PROTOCOL:OpenTSDB 文本行协议。SmlProtocol.JSON_PROTOCOL:JSON 协议格式
precision:时间精度SmlPrecision.Hour:小时SmlPrecision.Minute:分钟SmlPrecision.Second秒SmlPrecision.Millisecond:毫秒SmlPrecision.Microsecond:微秒SmlPrecision.Nanosecond:纳秒
ttl:表过期时间,单位天。reqId:用于问题追踪。
- 返回值:影响的条数。
- 异常:操作失败抛出
SchemalessError异常。
参数绑定
def statement2(self, sql=None, option=None)- 接口说明:使用连接对象创建 stmt2 对象
- 参数说明
sql:绑定的 SQL 语句,如果不为空会调用prepare函数option传入 TaosStmt2Option 类实例选项
- 返回值:stmt2 对象。
- 异常:操作失败抛出
ConnectionError异常。
def prepare(self, sql)- 接口说明:绑定预编译 sql 语句
- 参数说明:
sql:绑定的 SQL 语句
- 异常:操作失败抛出
StatementError异常。
def bind_param(self, tbnames, tags, datas)- 接口说明:以独立数组方式绑定数据
- 参数说明:
tbnames:绑定表名数组,数据类型为 listtags:绑定 tag 列值数组,数据类型为 listdatas:绑定普通列值数组,数据类型为 list
- 异常:操作失败抛出
StatementError异常
def bind_param_with_tables(self, tables)- 接口说明:以独立表方式绑定数据,独立表是以表为组织单位,每张表中有表名,TAG 值及普通列数值属性
- 参数说明:
tables:BindTable独立表对象数组
- 异常:操作失败抛出
StatementError异常。
def execute(self) -> int:- 接口说明:执行将绑定数据全部写入
- 返回值:影响行数
- 异常:操作失败抛出
QueryError异常。
def result(self)- 接口说明:获取参数绑定查询结果集
- 返回值:返回 TaosResult 对象
def close(self)- 接口说明:关闭 stmt2 对象
数据订阅
- 创建消费者支持属性列表:
- td.connect.ip:主机地址。
- td.connect.port:端口号。
- group.id:所在的 group。
- client.id:客户端 id。
- td.connect.user:数据库用户名。
- td.connect.pass:数据库密码。
- td.connect.token:数据库的连接 token。
- auto.offset.reset:来确定消费位置为最新数据(latest)还是包含旧数据(earliest)。
- enable.auto.commit:是否允许自动提交。
- auto.commit.interval.ms:自动提交间隔
def Consumer(configs)- 接口说明 消费者构造函数。
configs:Python 字典的形式提供,具体配置参见属性列表。
- 返回值:Consumer 消费者对象。
- 异常:操作失败抛出
TmqError异常。
- 接口说明 消费者构造函数。
def subscribe(self, topics)- 接口说明 订阅一组主题。
- 参数说明:
topics:订阅的主题列表。
- 异常:操作失败抛出
TmqError异常。
def unsubscribe(self)- 接口说明 取消订阅。
- 异常:操作失败抛出
TmqError异常。
def poll(self, timeout: float = 1.0)- 接口说明 轮询消息。
- 参数说明:
timeout:表示轮询的超时时间,单位毫秒。
- 返回值:
Message每个主题对应的数据。 - 异常:操作失败抛出
TmqError异常。
def commit(self, message: Message = None, offsets: [TopicPartition] = None)- 接口说明 提交当前处理的消息的偏移量。
- 参数说明:
message:类型Message, 当前处理的消息的偏移量。offsets:类型[TopicPartition], 提交一批消息的偏移量。
- 异常:操作失败抛出
TmqError异常。
def assignment(self)- 接口说明:获取消费者当前分配的指定的分区或所有分区。
- 返回值:返回值类型为
[TopicPartition],即消费者当前分配的所有分区。 - 异常:操作失败抛出 TmqError 异常。
def seek(self, partition)- 接口说明:将给定分区的偏移量设置到指定的位置。
- 参数说明:
partition:需要设置的偏移量。topic:订阅的主题partition:分区offset:偏移量
- 异常:操作失败抛出
TmqError异常。
def committed(self, partitions)- 接口说明:获取订阅主题的分区最后提交的偏移量。
- 参数说明:
partition:需要设置的偏移量。topic:订阅的主题partition:分区
- 返回值:
partition,分区最后提交的偏移量。 - 异常:操作失败抛出
TmqError异常。
def position(self, partitions)- 接口说明:获取给定分区当前的偏移量。
- 参数说明:
partition:需要设置的偏移量。topic:订阅的主题partition:分区
- 返回值:
partition,分区最后提交的偏移量。 - 异常:操作失败抛出 TmqError 异常。
def close(self)- 接口说明:关闭 tmq 连接。
- 异常:操作失败抛出 TmqError 异常。







