TDengine Python Connector
taospy
是 TDengine 的官方 Python 连接器。taospy
提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。
Python 连接器的源码托管在 GitHub。
连接方式
taospy
主要提供三种形式的连接器。一般我们推荐使用 Websocket 连接。
- 原生连接,对应
taospy
包的taos
模块。通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 实例,支持数据写入、查询、数据订阅、schemaless 接 口和参数绑定接口等功能。 - REST 连接,对应
taospy
包的taosrest
模块。通过 taosAdapter 提供的 HTTP 接口连接 TDengine 实例,不支持 schemaless 和数据订阅等特性。 - Websocket 连接,对应
taos-ws-py
包,可以选装。通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例,WebSocket 连接实现的功能集合和原生连接有少量不同。
连接方式的详细介绍请参考:连接方式
除了对原生接口和 REST 接口的封装,taospy
还提供了符合 Python 数据访问规范(PEP 249) 的编程接口。这使得 taospy
和很多第三方工具集成变得简单,比如 SQLAlchemy 和 pandas。
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口或 WebSocket 接口与服务端建立的连接的方式下文中称为“REST 连接”或“WebSocket 连接”。
支持的平台
- 原生连接支持的平台和 TDengine 客户端支持的平台一致。
- REST 连接支持所有能运行 Python 的平台。
支持的功能
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
历史版本
无论使用什么版本的 TDengine 都建议使用最新版本的 taospy
。
Python Connector 版本 | 主要变化 |
---|---|
2.7.15 | 新增 VARBINARY 和 GEOMETRY 类型支持 |
2.7.14 | 修复已知问题 |
2.7.13 | 新增 tmq 同步提交 offset 接口 |
2.7.12 | 1. 新增 varbinary 类型支持(STMT暂不支持 varbinary ) 2. query 性能提升(感谢贡献者hadrianl) |
2.7.9 | 数据订阅支持获取消费进度和重置消费进度 |
2.7.8 | 新增 execute_many |
Python Websocket Connector 版本 | 主要变化 |
---|---|
0.3.2 | 优化 Websocket sql 查询和插入性能,修改 readme 和 文档,修复已知问题 |
0.2.9 | 已知问题修复 |
0.2.5 | 1. 数据订阅支持获取消费进度和重置消费进度 2. 支持 schemaless 3. 支持 STMT |
0.2.4 | 数据订阅新增取消订阅方法 |
处理异常
Python 连接器可能会产生 4 种异常:
- Python 连接器本身的异常
- 原生连接方式的异常
- websocket 连接方式异常
- 数据订阅异常
- TDengine 其他功能模块的报错,请参考 错误码
Error Type | Description | Suggested Actions |
---|---|---|
InterfaceError | taosc 版本太低,不支持所使用的接口 | 请检查 TDengine 客户端版本 |
ConnectionError | 数据库链接错误 | 请检查 TDengine 服务端状态和连接参数 |
DatabaseError | 数据库错误 | 请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版 |
OperationalError | 操作错误 | API 使用错误 ,请检查代码 |
ProgrammingError | 接口调用错误 | 请检查提交的数据是否正确 |
StatementError | stmt 相关异常 | 请检查绑定参数与 sql 是否匹配 |
ResultError | 操作数据错误 | 请检查操作的数据与数据库中的数据类型是否匹配 |
SchemalessError | schemaless 相关异常 | 请检查数据格式及对应的协议类型是否正确 |
TmqError | tmq 相关异常 | 请检查 Topic 及 consumer 配置是否正确 |
Python 中通常通过 try-expect 处理异常,异常处理相关请参考 Python 错误和异常文档。
TDengine 其他功能模块的报错,请参考 错误码
Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
import taos
try:
conn = taos.connect()
conn.execute("CREATE TABLE 123") # wrong sql
except taos.Error as e:
print(e)
print("exception class: ", e.__class__.__name__)
print("error number:", e.errno)
print("error message:", e.msg)
except BaseException as other:
print("exception occur")
print(other)
# output:
# [0x0216]: syntax error near 'Incomplete SQL statement'
# exception class: ProgrammingError
# error number: -2147483114
# error message: syntax error near 'Incomplete SQL statement'
数据类型映射
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:
TDengine DataType | Python DataType |
---|---|
TIMESTAMP | datetime |
INT | int |
BIGINT | int |
FLOAT | float |
DOUBLE | int |
SMALLINT | int |
TINYINT | int |
BOOL | bool |
BINARY | str |
NCHAR | str |
JSON | str |
GEOMETRY | bytearray |
VARBINARY | bytearray |
示例程序汇总
示例程序链接 | 示例程序内容 |
---|---|
bind_multi.py | 参数绑定, 一次绑定多行 |
bind_row.py | 参数绑定,一次绑定一行 |
insert_lines.py | InfluxDB 行协议写入 |
json_tag.py | 使用 JSON 类型的标签 |
tmq_consumer.py | tmq 订阅 |
native_all_type_query.py | 支持全部类型示例 |
native_all_type_stmt.py | 参数绑定支持全部类型示例 |
示例程序源码请参考:
关于纳秒 (nanosecond)
由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。
- https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
- https://www.python.org/dev/peps/pep-0564/
常见问题
欢迎提问或报告问题。
API 参考
WebSocket 连接
URL 规范
[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------------|---|-----------|-----------|------|------|------------|-----------------------|
| protocol | | username | password | host | port | database | params |
- protocol: 使用 websocket 协议建立连接。例如
ws://localhost:6041
- username/password: 数据库的用户名和密码。
- host/port: 主机地址和端口号。例如
localhost:6041
- database: 数据库名称。
- params: 其他参数。 例如token。
建立连接
fn connect(dsn: Option<&str>, args: Option<&PyDict>) -> PyResult<Connection>
- 接口说明:建立 taosAdapter 连接。
- 参数说明:
dsn
: 类型Option<&str>
可选,数据源名称(DSN),用于指定要连接的数据库的位置和认证信息。args
: 类型Option<&PyDict>
可选,以 Python 字典的形式提供, 可用于设置user
: 数据库的用户名password
: 数据库的密码。host
: 主机地址port
: 端口号database
: 数据库名称
- 返回值:连接对象。
- 异常:操作失败抛出
ConnectionError
异常。
fn cursor(&self) -> PyResult<Cursor>
- 接口说明:创建一个新的数据库游标对象,用于执行SQL命令和查询。
- 返回值:数据库游标对象。
- 异常:操作失败抛出
ConnectionError
异常。
执行SQL
fn execute(&self, sql: &str) -> PyResult<i32>
- 接口说明:执行 sql 语句。
- 参数说明:
sql
:待执行的 sql 语句。
- 返回值:影响的条数。
- 异常:操作失败抛出
QueryError
异常。
fn execute_with_req_id(&self, sql: &str, req_id: u64) -> PyResult<i32>
- 接口说明:执行带有 req_id 的 sql 语句。
- 参数说明:
sql
:待执行的 sql 语句。reqId
: 用于问题追踪。
- 返回值:影响的条数。
- 异常:操作失败抛出
QueryError
异常。
fn query(&self, sql: &str) -> PyResult<TaosResult>
- 接口说明:查询数据。
- 参数说明:
sql
:待执行的 sql 语句。
- 返回值:
TaosResult
数据集对象。 - 异常:操作失败抛出
QueryError
异常。
fn query_with_req_id(&self, sql: &str, req_id: u64) -> PyResult<TaosResult>
- 接口说明:查询带有 req_id 的 sql 语句。
- 参数说明:
sql
:待执行的 sql 语句。reqId
: 用于问题追踪。
- 返回值:
TaosResult
数据集对象。 - 异常:操作失败抛出
QueryError
异常。
数据集
TaosResult 对象可以通过循环遍历获取查询到的数据。
fn fields(&self) -> Vec<TaosField>
- 接口说明:获取查询数据的字段信息, 包括:名称,类型及字段长度。
- 返回值:
Vec<TaosField>
字段信息数组。
fn field_count(&self) -> i32
- 接口说明:获取查询到的记录条数。
- 返回值:
i32
查询到的记录条数。
无模式写入
fn schemaless_insert(&self, lines: Vec<String>, protocol: PySchemalessProtocol, precision: PySchemalessPrecision, ttl: i32, req_id: u64) -> PyResult<()>
- 接口说明:无模式写入。
- 参数说明:
lines
:待写入的数据数组,无模式具体的数据格式可参考Schemaless 写 入
。protocol
: 协议类型PySchemalessProtocol::Line
: InfluxDB 行协议(Line Protocol)。PySchemalessProtocol::Telnet
:OpenTSDB 文本行协议。PySchemalessProtocol::Json
: JSON 协议格式
precision
: 时间精度PySchemalessPrecision::Hour
: 小时PySchemalessPrecision::Minute
:分钟PySchemalessPrecision::Second
秒PySchemalessPrecision::Millisecond
:毫秒PySchemalessPrecision::Microsecond
:微秒PySchemalessPrecision::Nanosecond
: 纳秒
ttl
:表过期时间,单位天。reqId
: 用于问题追踪。
- 异常:操作失败抛出
DataError
或OperationalError
异常。
参数绑定
fn statement(&self) -> PyResult<TaosStmt>
- 接口说明:使用 连接 对象创建 stmt 对象。
- 返回值:stmt 对象。
- 异常:操作失败抛出
ConnectionError
异常。
fn prepare(&mut self, sql: &str) -> PyResult<()>
- 接口说明:绑定预编译 sql 语句。
- 参数说明:
sql
: 预编译的 SQL 语句。
- 异常:操作失败抛出
ProgrammingError
异常。
fn set_tbname(&mut self, table_name: &str) -> PyResult<()>
- 接口说明:设置将要写入数据的表名。
- 参数说明:
tableName
: 表名,如果需要指定数据库, 例如:db_name.table_name
即可。
- 异常:操作失败抛出
ProgrammingError
异常。
fn set_tags(&mut self, tags: Vec<PyTagView>) -> PyResult<()>
- 接口说明:设置表 Tags 数据, 用于自动建表。
- 参数说明:
paramsArray
: Tags 数据。
- 异常:操作失败抛出
ProgrammingError
异常。
fn bind_param(&mut self, params: Vec<PyColumnView>) -> PyResult<()>
- 接口说明:绑定数据。
- 参数说明:
paramsArray
: 绑定数据。
- 异常:操作失败抛出
ProgrammingError
异常。
fn add_batch(&mut self) -> PyResult<()>
- 接口说明:提交绑定数据。
- 异常:操作失败抛出
ProgrammingError
异常。
fn execute(&mut self) -> PyResult<usize>
- 接口说明:执行将绑定的数据全部写入。
- 返回值:写入条数。
- 异常:操作失败抛出
QueryError
异常。
fn affect_rows(&mut self) -> PyResult<usize>
- 接口说明: 获取写入条数。
- 返回值:写入条数。
fn close(&self) -> PyResult<()>
- 接口说明: 关闭 stmt 对象。
数据订阅
-
创建消费者支持属性列表:
- host:主机地址。
- port:端口号。
- group.id:所在的 group。
- client.id:客户端id。
- td.connect.user: 数据库用户名。
- td.connect.pass: 数据库密码。
- td.connect.token:数据库的连接token。
- auto.offset.reset:来确定消费位置为最新数据(latest)还是包含旧数据(earliest)。
- enable.auto.commit:是否允许自动提交。
- auto.commit.interval.ms:自动提交间隔
-
fn Consumer(conf: Option<&PyDict>, dsn: Option<&str>) -> PyResult<Self>
- 接口说明 消费者构造函数。
conf
: 类型Option<&PyDict>
可选,以 Python 字典的形式提供, 具体配置参见属性列表。dsn
: 类型Option<&str>
可选,数据源名称(DSN),用于指定要连接的数据库的位置和认证信息。
- 返回值:Consumer 消费者对象。
- 异常:操作失败抛出
ConsumerException
异常。
- 接口说明 消费者构造函数。
-
fn subscribe(&mut self, topics: &PyList) -> PyResult<()>
- 接口说明 订阅一组主题。
- 参数说明:
topics
: 订阅的主题列表。
- 异常:操作失败抛出
ConsumerException
异常。
-
fn unsubscribe(&mut self)
- 接口说明 取消订阅。
- 异常:操作失败抛出
ConsumerException
异常。
-
fn poll(&mut self, timeout: Option<f64>) -> PyResult<Option<Message>>
- 接口说明 轮询消息。
- 参数说明:
timeoutMs
: 表示轮询的超时时间,单位毫秒。
- 返回值:
Message
每个主题对应的数据。 - 异常:操作失败抛出
ConsumerException
异常。
-
fn commit(&mut self, message: &mut Message) -> PyResult<()>
- 接口说明 提交当前处理的消息的偏移量。
- 参数说明:
message
: 类型Message
, 当前处理的消息的偏移量。
- 异常:操作失败抛出
ConsumerException
异常。
-
fn assignment(&mut self) -> PyResult<Option<Vec<TopicAssignment>>>
- 接口说明:获取消费者当前分配的指定的分区或所有分区。
- 返回值:返回值类型为
Vec<TopicAssignment>
,即消费者当前分配的所有分区。 - 异常:操作失败抛出 ConsumerException 异常。
-
fn seek(&mut self, topic: &str, vg_id: i32, offset: i64) -> PyResult<()>
- 接口说明:将给定分区的偏移量设置到指定的位置。
- 参数说明:
topic
: 订阅的主题。vg_id
: vgroupid。offset
:需要设置的偏移量。
- 异常:操作失败抛出 ConsumerException 异常。
-
fn committed(&mut self, topic: &str, vg_id: i32) -> PyResult<i64>
- 接口说明:获取订阅主题的vgroupid分区最后提交的偏移量。
- 参数说明:
topic
: 订阅的主题。vg_id
: vgroupid。
- 返回值:
i64
,分区最后提交的偏移量。 - 异常:操作失败抛出 ConsumerException 异常。
-
fn position(&mut self, topic: &str, vg_id: i32) -> PyResult<i64>
- 接口说明:获取给定分区当前的偏移量。
- 参数说明:
topic
: 订阅的主题。vg_id
: vgroupid。
- 返回值:
i64
,分区最后提交的偏移量。 - 异常:操作失败抛出 ConsumerException 异常。
-
fn close(&mut self)
- 接口说明:关闭 tmq 连接。
- 异常:操作失败抛出 ConsumerException 异常。
Native 连接
建立连接
def connect(*args, **kwargs):
- 接口说明:建立 taosAdapter 连接。
- 参数说明:
kwargs
: 以 Python 字典的形式提供, 可用于设置user
: 数据库的用户名password
: 数据库的密码。host
: 主机地址port
: 端口号database
: 数据库名称timezone
: 时区
- 返回值:
TaosConnection
连接对象。 - 异常:操作失败抛出
AttributeError
或ConnectionError
异常。
def cursor(self)
- 接口说明:创建一个新的数据库游标对象,用于执行SQL命令和查询。
- 返回值:数据库游标对象。
执行SQL
def execute(self, operation, req_id: Optional[int] = None)
- 接口说明:执行 sql 语句。
- 参数说明:
operation
:待执行的 sql 语句。reqId
: 用于问题追踪。
- 返回值:影响的条数。
- 异常:操作失败抛出
ProgrammingError
异常。
def query(self, sql: str, req_id: Optional[int] = None) -> TaosResult
- 接口说明:查询数据。
- 参数说明:
sql
:待执行的 sql 语句。reqId
: 用于问题追踪。
- 返回值:
TaosResult
数据集对象。 - 异常:操作失败抛出
ProgrammingError
异常。
数据集
TaosResult 对象可以通过循环遍历获取查询到的数据。
def fields(&self)
- 接口说明:获取查询数据的字段信息, 包括:名称,类型及字段长度。
- 返回值:
TaosFields
字段信息 list。
def field_count(&self)
- 接口说明:获取查询到的记录条数。
- 返回值:查询到的记录条数。
def fetch_all_into_dict(self)
- 接口说明:将所有的记录转换为字典。
- 返回值:返回字典列表。
无模式写入
def schemaless_insert(&self, lines: List[str], protocol: SmlProtocol, precision: SmlPrecision, req_id: Optional[int] = None, ttl: Optional[int] = None) -> int:
- 接口说明:无模式写入。
- 参数说明:
lines
:待写入的数据数组,无模式具体的数据格式可参考Schemaless 写入
。protocol
: 协议类型SmlProtocol.LINE_PROTOCOL
: InfluxDB 行协议(Line Protocol)。SmlProtocol.TELNET_PROTOCOL
:OpenTSDB 文本行协议。SmlProtocol.JSON_PROTOCOL
: JSON 协议格式
precision
: 时间精度SmlPrecision.Hour
: 小时SmlPrecision.Minute
:分钟SmlPrecision.Second
秒SmlPrecision.Millisecond