TDengine Python Connector
taospy
是 TDengine 的官方 Python 连接器。taospy
提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。
Python 连接器的源码托管在 GitHub。
连接方式
taospy
主要提供三种形式的连接器。一般我们推荐使用 Websocket 连接。
- 原生连接,对应
taospy
包的taos
模块。通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 实例,支持数据写入、查询、数据订阅、schemaless 接口和参数绑定接口等功能。 - REST 连接,对应
taospy
包的taosrest
模块。通过 taosAdapter 提供的 HTTP 接口连接 TDengine 实例,不支持 schemaless 和数据订阅等特性。 - Websocket 连接,对应
taos-ws-py
包,可以选装。通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例,WebSocket 连接实现的功能集合和原生连接有少量不同。
连接方式的详细介绍请参考:连接器建立连接的方式
除了对原生接口和 REST 接口的封装,taospy
还提供了符合 Python 数据访问规范(PEP 249) 的编程接口。这使得 taospy
和很多第三方工具集成变得简单,比如 SQLAlchemy 和 pandas。
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口或 WebSocket 接口与服务端建立的连接的方式下文中称为“REST 连接”或“WebSocket 连接”。
支持的平台
- 原生连接支持的平台和 TDengine 客户端支持的平台一致。
- REST 连接支持所有能运行 Python 的平台。
支持的功能
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
历史版本
无论使用什么版本的 TDengine 都建议使用最新版本的 taospy
。
Python Connector 版本 | 主要变化 |
---|---|
2.7.12 | 1. 新增 varbinary 类型支持(STMT暂不支持 varbinary ) 2. query 性能提升(感谢贡献者hadrianl) |
2.7.9 | 数据订阅支持获取消费进度和重置消费进度 |
2.7.8 | 新增 execute_many |
Python Websocket Connector 版本 | 主要变化 |
---|---|
0.2.9 | 已知问题修复 |
0.2.5 | 1. 数据订阅支持获取消费进度和重置消费进度 2. 支持 schemaless 3. 支持 STMT |
0.2.4 | 数据订阅新增取消订阅方法 |
处理异常
Python 连接器可能会产生 4 种异常:
- Python 连接器本身的异常
- 原生连接方式的异常
- websocket 连接方式异常
- 数据订阅异常
- TDengine 其他功能模块的异常
Error Type | Description | Suggested Actions |
---|---|---|
InterfaceError | taosc 版本太低,不支持所使用的接口 | 请检查 TDengine 客户端版本 |
ConnectionError | 数据库链接错误 | 请检查 TDengine 服务端状态和连接参数 |
DatabaseError | 数据库错误 | 请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版 |
OperationalError | 操作错误 | API 使用错误,请检查代码 |
ProgrammingError | ||
StatementError | stmt 相关异常 | |
ResultError | ||
SchemalessError | schemaless 相关异常 | |
TmqError | tmq 相关异常 |
Python 中通常通过 try-expect 处理异常,异常处理相关请参考 Python 错误和异常文档。
Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
import taos
try:
conn = taos.connect()
conn.execute("CREATE TABLE 123") # wrong sql
except taos.Error as e:
print(e)
print("exception class: ", e.__class__.__name__)
print("error number:", e.errno)
print("error message:", e.msg)
except BaseException as other:
print("exception occur")
print(other)
# output:
# [0x0216]: syntax error near 'Incomplete SQL statement'
# exception class: ProgrammingError
# error number: -2147483114
# error message: syntax error near 'Incomplete SQL statement'
TDengine DataType 和 Python DataType
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:
TDengine DataType | Python DataType |
---|---|
TIMESTAMP | datetime |
INT | int |
BIGINT | int |
FLOAT | float |
DOUBLE | int |
SMALLINT | int |
TINYINT | int |
BOOL | bool |
BINARY | str |
NCHAR | str |
JSON | str |
安装步骤
安装前准备
- 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 Python BeginnersGuide 安装。
- 安装 pip。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 pip documentation 安装。
- 如果使用原生连接,还需安装客户端驱动。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
使用 pip 安装
卸载旧版本
如果以前安装过旧版本的 Python 连接器, 请提前卸载。
pip3 uninstall taos taospy
较早的 TDengine 客户端软件包含了 Python 连接器。如果从客户端软件的安装目录安装了 Python 连接器,那么对应的 Python 包名是 taos
。 所以上述卸载命令包含了 taos
, 不存在也没关系。
安装 taospy
- 从 PyPI 安装
- 从 GitHub 安装
安装最新版本
pip3 install taospy
也可以指定某个特定版本安装。
pip3 install taospy==2.3.0
pip3 install git+https://github.com/taosdata/taos-connector-python.git
安装 taos-ws-py
(可选)
taos-ws-py 包提供了通过 WebSocket 连接 TDengine 的能力,可选安装 taos-ws-py 以获得 WebSocket 连接 TDengine 的能力。
和 taospy 同时安装
pip3 install taospy[ws]
单独安装
pip3 install taos-ws-py
安装验证
- 原生连接
- REST 连接
- WebSocket 连接
对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 taos
模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入:
import taos
对于 REST 连接,只需验证是否能成功导入 taosrest
模块。可在 Python 交互式 Shell 中输入:
import taosrest
对于 WebSocket 连接,只需验证是否能成功导入 taosws
模块。可在 Python 交互式 Shell 中输入:
import taosws
如果系统上有多个版本的 Python,则可能有多个 pip
命令。要确保使用的 pip
命令路径是正确的。上面我们用 pip3
命令安装,排除了使用 Python 2.x 版本对应的 pip
的可能性。但是如果系统上有多个 Python 3.x 版本,仍需检查安装路径是否正确。最简单的验证方式是,在命令再次输入 pip3 install taospy
, 就会打印出 taospy
的具体安装位置,比如在 Windows 上:
C:\> pip3 install taospy
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Requirement already satisfied: taospy in c:\users\username\appdata\local\programs\python\python310\lib\site-packages (2.3.0)
建立连接
连通性测试
在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。
- 原生连接
- REST 连接
- WebSocket 连接
请确保 TDengine 集群已经启动, 且集群中机器的 FQDN (如果启动的是单机版,FQDN 默认为 hostname)在本机能够解析, 可用 ping
命令进行测试:
ping <FQDN>
然后测试用 TDengine CLI 能否正常连接集群:
taos -h <FQDN> -p <PORT>
上面的 FQDN 可以为集群中任意一个 dnode 的 FQDN, PORT 为这个 dnode 对应的 serverPort。
对于 REST 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:
curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"
上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。 如果测试成功,会输出服务器版本信息,比如:
{
"code": 0,
"column_meta": [
[
"server_version()",
"VARCHAR",
7
]
],
"data": [
[
"3.0.0.0"
]
],
"rows": 1
}
对于 WebSocket 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:
curl -i -N -d "show databases" -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Host: <FQDN>:<PORT>" -H "Origin: http://<FQDN>:<PORT>" http://<FQDN>:<PORT>/rest/sql
上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。 如果测试成功,会输出服务器版本信息,比如:
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Tue, 21 Mar 2023 09:29:17 GMT
Transfer-Encoding: chunked
{"status":"succ","head":["server_version()"],"column_meta":[["server_version()",8,8]],"data":[["2.6.0.27"]],"rows":1}
指定 Host 和 Properties 获取连接
以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。
- 原生连接
- REST 连接
- WebSocket 连接
import taos
conn = taos.connect(
host="localhost",
user="root",
password="taosdata",
database="test",
port=6030,
config="/etc/taos", # for windows the default value is C:\TDengine\cfg
timezone="Asia/Shanghai",
) # default your host's timezone
server_version = conn.server_info
print("server_version", server_version)
client_version = conn.client_info
print("client_version", client_version) # 3.0.0.0
conn.close()
# possible output:
# 3.0.0.0
# 3.0.0.0
connect
函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:
host
: 要连接的节点的 FQDN。 没有默认值。如果不同提供此参数,则会连接客户端配置文件中的 firstEP。user
:TDengine 用户名。 默认值是 root。password
: TDengine 用户密码。 默认值是 taosdata。port
: 要连接的数据节点的起始端 口,即 serverPort 配置。默认值是 6030。只有在提供了 host 参数的时候,这个参数才生效。config
: 客户端配置文件路径。 在 Windows 系统上默认是C:\TDengine\cfg
。 在 Linux/macOS 系统上默认是/etc/taos/
。timezone
: 查询结果中 TIMESTAMP 类型的数据,转换为 python 的 datetime 对象时使用的时区。默认为本地时区。
config
和 timezone
都是进程级别的配置。建议一个进程建立的所有连接都使用相同的参数值。否则可能产生无法预知的错误。
connect
函数返回 taos.TaosConnection
实例。 在客户端多线程的场景下,推荐每个线程申请一个独立的连接实例,而不建议多线程共享一个连接。
from taosrest import connect, TaosRestConnection, TaosRestCursor
conn = connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
connect()
函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:
url
: taosAdapter REST 服务的 URL。默认是 <http://localhost:6041>。user
: TDengine 用户名。默认是 root。password
: TDengine 用户密码。默认是 taosdata。timeout
: HTTP 请求超时时间。单位为秒。默认 为socket._GLOBAL_DEFAULT_TIMEOUT
。 一般无需配置。
import taosws
conn = taosws.connect("taosws://root:taosdata@localhost:6041")
connect()
函数参数为连接 url,协议为 taosws
或 ws
配置参数的优先级
如果配置参数在参数和客户端配置文件中有重复,则参数的优先级由高到低分别如下:
- 连接参数
- 使用原生连接时,TDengine 客户端驱动的配置文件 taos.cfg
使用示例
创建数据库和表
- 原生连接
- REST 连接
- WebSocket 连接
import taos
conn = taos.connect(
host="localhost",
user="root",
password="taosdata",
port=6030,
)
db = "power"
conn.execute(f"DROP DATABASE IF EXISTS {db}")
conn.execute(f"CREATE DATABASE {db}")
# change database. same as execute "USE db"
conn.select_db(db)
# create super table
conn.execute(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# create table
conn.execute("CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')")
conn.close()
import taosrest
conn = taosrest.connect(url="http://localhost:6041")
db = "power"
conn.execute(f"DROP DATABASE IF EXISTS {db}")
conn.execute(f"CREATE DATABASE {db}")
# create super table
conn.execute(
f"CREATE TABLE `{db}`.`meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# create table
conn.execute(f"CREATE TABLE `{db}`.`d0` USING `{db}`.`meters` TAGS(0, 'Los Angles')")
conn.close()
import taosws
dsn = "taosws://root:taosdata@localhost:6041"
conn = taosws.connect(dsn)
db = "power"
conn.execute(f"DROP DATABASE IF EXISTS {db}")
conn.execute(f"CREATE DATABASE {db}")
# change database.
conn.execute(f"USE {db}")
# create super table
conn.execute(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# create table
conn.execute("CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')")
conn.close()
插入数据
- 原生连接
- REST 连接
- WebSocket 连接
# insert data
sql = """
INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000)
('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2)
VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3)
VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)
"""
inserted = conn.execute(sql)
assert inserted == 8
# rest insert data
sql = """
INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000)
('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2)
VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3)
VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)
"""
inserted = conn.execute(sql)
assert inserted == 8
# ws insert data
sql = """
INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000)
('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2)
VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3)
VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)
"""
inserted = conn.execute(sql)
assert inserted == 8
NOW 为系统内部函数,默认为客户端所在计算机当前时间。
NOW + 1s
代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。
查询数据
- 原生连接
- REST 连接
- WebSocket 连接
TaosConnection
类的 query
方法可以用来查询数据,返回 TaosResult
类型的结果数据。
# Execute a sql and get its result set. It's useful for SELECT statement
result = conn.query("SELECT * from meters")
# Get fields from result
fields = result.fields
for field in fields:
print(field)
"""
output:
{name: ts, type: 9, bytes: 8}
{name: current, type: 6, bytes: 4}
{name: voltage, type: 4, bytes: 4}
{name: phase, type: 6, bytes: 4}
{name: location, type: 8, bytes: 64}
{name: groupid, type: 4, bytes: 4}
"""
# Get data from result as list of tuple
data = result.fetch_all()
for row in data:
print(row)
"""
output:
(datetime.datetime(2018, 10, 3, 14, 38, 16, 650000), 10.300000190734863, 218, 0.25, 'California.SanFrancisco', 3)
...
"""
查询结果只能获取一次。比如上面的示例中 fetch_all()
和 fetch_all_into_dict()
只能用一个。重复获取得到的结果为空列表。
RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。
client = taosrest.RestClient("http://localhost:6041")
result = client.sql(f"SELECT * from {db}.meters")
print(result)
"""
output:
{'code': 0, 'column_meta': [['ts', 'TIMESTAMP', 8], ['current', 'FLOAT', 4], ['voltage', 'INT', 4], ['phase', 'FLOAT', 4], ['location', 'VARCHAR', 64], ['groupid', 'INT', 4]], 'data': [[datetime.datetime(2018, 10, 3, 14, 38, 5), 10.3, 219, 0.31, 'California.SanFrancisco', 2], [datetime.datetime(2018, 10, 3, 14, 38, 15), 12.6, 218, 0.33, 'California.SanFrancisco', 2], [datetime.datetime(2018, 10, 3, 14, 38, 16, 800000), 12.3, 221, 0.31, 'California.SanFrancisco', 2], [datetime.datetime(2018, 10, 3, 14, 38, 16, 650000), 10.3, 218, 0.25, 'California.SanFrancisco', 3], [datetime.datetime(2018, 10, 3, 14, 38, 5, 500000), 11.8, 221, 0.28, 'California.LosAngeles', 2], [datetime.datetime(2018, 10, 3, 14, 38, 16, 600000), 13.4, 223, 0.29, 'California.LosAngeles', 2], [datetime.datetime(2018, 10, 3, 14, 38, 5), 10.8, 223, 0.29, 'California.LosAngeles', 3], [datetime.datetime(2018, 10, 3, 14, 38, 6, 500000), 11.5, 221, 0.35, 'California.LosAngeles', 3]], 'rows': 8}
"""
对于 sql()
方法更详细的介绍, 请参考 RestClient。
TaosConnection
类的 query
方法可以用来查询数据,返回 TaosResult
类型的结果数据。
# Execute a sql and get its result set. It's useful for SELECT statement
result = conn.query("SELECT * from meters")
# Get fields from result
fields = result.fields
for field in fields:
print(field)
"""
output:
{name: ts, type: TIMESTAMP, bytes: 8}
{name: current, type: FLOAT, bytes: 4}
{name: voltage, type: INT, bytes: 4}
{name: phase, type: FLOAT, bytes: 4}
{name: location, type: BINARY, bytes: 64}
{name: groupid, type: INT, bytes: 4}
"""
# Get rows from result
for row in result:
print(row)
"""
output:
('2018-10-03 14:38:05 +08:00', 10.300000190734863, 219, 0.3100000023841858, 'California.SanFrancisco', 2)
...
"""
执行带有 reqId 的 SQL
reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:
- 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
- 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
- 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置
如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。
- 原生连接
- REST 连接
- WebSocket 连接
通过参数绑定写入数据
TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 ?
来代表待绑定的参数。
- 原生连接
- WebSocket 连接
sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
stmt = conn.statement(sql)
tbname = "power.d1001"
tags = taos.new_bind_params(2)
tags[0].binary(["California.SanFrancisco"])
tags[1].int([2])
stmt.set_tbname_tags(tbname, tags)
params = taos.new_bind_params(4)
params[0].timestamp((1626861392589, 1626861392591, 1626861392592))
params[1].float((10.3, 12.6, 12.3))
params[2].int([194, 200, 201])
params[3].float([0.31, 0.33, 0.31])
stmt.bind_param_batch(params)
stmt.execute()
stmt.close()
sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
stmt = conn.statement()
stmt.prepare(sql)
tbname = "power.d1001"
tags = [
taosws.varchar_to_tag("California.SanFrancisco"),
taosws.int_to_tag(2),
]
stmt.set_tbname_tags(tbname, tags)
stmt.bind_param(
[
taosws.millis_timestamps_to_column(
[1626861392589, 1626861392591, 1626861392592]
),
taosws.floats_to_column([10.3, 12.6, 12.3]),
taosws.ints_to_column([194, 200, 201]),
taosws.floats_to_column([0.31, 0.33, 0.31]),
]
)
stmt.add_batch()
rows = stmt.execute()
assert rows == 3
stmt.close()
无模式写入
连接器支持无模式写入功能。
- 原生连接
- WebSocket 连接
import taos
conn = taos.connect(
host="localhost",
user="root",
password="taosdata",
port=6030,
)
db = "power"
conn.execute(f"DROP DATABASE IF EXISTS {db}")
conn.execute(f"CREATE DATABASE {db}")
# change database. same as execute "USE db"
conn.select_db(db)
lineDemo = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000"
]
telnetDemo = ["stb0_0 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "meter_current","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
conn.schemaless_insert(
lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
)
conn.schemaless_insert(
telnetDemo, taos.SmlProtocol.TELNET_PROTOCOL, taos.SmlPrecision.MICRO_SECONDS
)
conn.schemaless_insert(
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
)
conn.close()
import taosws
dsn = "taosws://root:taosdata@localhost:6041"
conn = taosws.connect(dsn)
db = "power"
conn.execute(f"DROP DATABASE IF EXISTS {db}")
conn.execute(f"CREATE DATABASE {db}")
# change database.
conn = taosws.connect(f"{dsn}/{db}")
lineDemo = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000"
]
telnetDemo = ["stb0_0 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "meter_current","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
conn.schemaless_insert(
lines=lineDemo,
protocol=taosws.PySchemalessProtocol.Line,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=1,
)
conn.schemaless_insert(
lines=telnetDemo,
protocol=taosws.PySchemalessProtocol.Telnet,
precision=taosws.PySchemalessPrecision.Microsecond,
ttl=1,
req_id=2,
)
conn.schemaless_insert(
lines=jsonDemo,
protocol=taosws.PySchemalessProtocol.Json,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=3,
)
conn.close()
执行带有 reqId 的无模式写入
连接器的 schemaless_insert
和 schemaless_insert_raw
方法支持 req_id
可选参数,此 req_id
可用于请求链路追踪。
conn.schemaless_insert(
lines=lineDemo,
protocol=taos.SmlProtocol.LINE_PROTOCOL,
precision=taos.SmlPrecision.NANO_SECONDS,
req_id=1,
)
数据订阅
连接器支持数据订阅功能,数据订阅功能请参考 数据订阅文档。
创建 Topic
# create topic
conn.execute(
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
)
创建 Consumer
from taos.tmq import Consumer
consumer = Consumer(
{
"group.id": "1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
订阅 topics
consumer.subscribe([topic])
消费数据
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
获取消费进度
Consumer API 的 assignment
方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)
Consumer API 的 seek
方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
关闭订阅
消费结束后,应当取消订阅,并关闭 Consumer。
consumer.unsubscribe()
consumer.close()
完整示例
import taos
conn = taos.connect(
host="localhost",
user="root",
password="taosdata",
port=6030,
)
db = "power"
topic = "topic_meters"
conn.execute(f"DROP TOPIC IF EXISTS {topic}")
conn.execute(f"DROP DATABASE IF EXISTS {db}")
conn.execute(f"CREATE DATABASE {db}")
# change database. same as execute "USE db"
conn.select_db(db)
# create super table
conn.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
)
# ANCHOR: create_topic
# create topic
conn.execute(
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
)
# ANCHOR_END: create_topic
# ANCHOR: create_consumer
from taos.tmq import Consumer
consumer = Consumer(
{
"group.id": "1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
# ANCHOR_END: create_consumer
# ANCHOR: subscribe
consumer.subscribe([topic])
# ANCHOR_END: subscribe
try:
# ANCHOR: consume
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
# ANCHOR_END: consume
# ANCHOR: assignment
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)
# ANCHOR_END: assignment
# ANCHOR: seek
offset = taos.tmq.TopicPartition(
topic=topic,
partition=assignment.partition,
offset=0,
)
consumer.seek(offset)
# ANCHOR_END: seek
finally:
# ANCHOR: unsubscribe
consumer.unsubscribe()
consumer.close()
# ANCHOR_END: unsubscribe
conn.close()
更多示例程序
示例程序链接 | 示例程序内容 |
---|---|
bind_multi.py | 参数绑定, 一次绑定多行 |
bind_row.py | 参数绑定,一次绑定一行 |
insert_lines.py | InfluxDB 行协议写入 |
json_tag.py | 使用 JSON 类型的标签 |
tmq_consumer.py | tmq 订阅 |
其它说明
关于纳秒 (nanosecond)
由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。
- https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
- https://www.python.org/dev/peps/pep-0564/
重要更新
API 参考
常见问题
欢迎提问或报告问题。