跳到主要内容

TDengine Python 连接器

taospy 是 TDengine 的官方 Python 连接器。taospy 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。 taospy 对 REST 接口进行了封装, 还提供了符合 Python 数据访问规范(PEP 249) 的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemypandas

Python 连接器的源码托管在 GitHub

连接方式

taospy主要提供三种形式的连接器。TDengine Cloud 支持 REST 连接和 WebSocket 连接两种方式

  • 原生连接,对应 taospy 包的 taos 模块。通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 实例,支持数据写入、查询、数据订阅、schemaless 接口和参数绑定接口等功能。
  • REST 连接,对应 taospy 包的 taosrest 模块。通过 taosAdapter 提供的 HTTP 接口连接 TDengine 实例,不支持 schemaless 和数据订阅等特性。
  • WebSocket 连接,对应 taos-ws-py 包,可以选装。通过 taosAdapter 提供的 WebSocket 接口连接 TDengine 实例,WebSocket 连接实现的功能集合和原生连接有少量不同。
IMPORTANT
  1. 使用客户端驱动提供的原生接口直接与服务端建立的连接,下文中称为“原生连接”.

  2. 使用 taosAdapter 提供的 REST 接口或 WebSocket 接口与服务端建立的连接的方式下文中称为“REST 连接”或“WebSocket 连接”。

关于如何建立连接的详细介绍请参考:开发指南-建立连接-Python

示例程序

下面以智能电表为例,展示如何使用 Python 连接器在名为 power 的数据库中,创建一个名为 meters 的超级表(STABLE),插入并查询数据。meters 表结构包含时间戳、电流、电压、相位等列,以及分组 ID 和位置作为标签。

IMPORTANT
  1. 在执行下面样例代码的之前,您必须先在 TDengine Cloud - 数据浏览器 页面创建一个名为 power 的数据库
  2. 如何在代码中建立和 TDengine Cloud 的连接,请参考 开发指南-建立连接

使用 TaosRestConnection 类

import taosrest
import os

url = os.environ["TDENGINE_CLOUD_URL"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]

conn = taosrest.connect(url=url, token=token)
# test the connection by getting version info
print("server version:", conn.server_info)
affected_row = conn.execute("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
print("affected_row", affected_row) # 0
affected_row = conn.execute("""INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:20.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:39:19.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:41:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:42:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:48:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:55:06.500', 11.50000, 221, 0.35000)
""")
print("affected_row", affected_row) # 8

result = conn.query("SELECT ts, current FROM power.meters LIMIT 2")

print("metadata of each column:\n", result.fields)
# [{'name': 'ts', 'type': 'TIMESTAMP', 'bytes': 8}, {'name': 'current', 'type': 'FLOAT', 'bytes': 4}]

print("total rows:", result.rows) # 2

# Iterate over result.
for row in result:
print(row)
# output:
# [datetime.datetime(2018, 10, 3, 14, 38, 5), 10.3]
# [datetime.datetime(2018, 10, 3, 14, 38, 15), 12.6]

# Or get all rows as a list
print(result.data)
# [[datetime.datetime(2018, 10, 3, 14, 38, 5), 10.3], [datetime.datetime(2018, 10, 3, 14, 38, 15), 12.6]]

查看源码

使用 TaosRestCursor 类

TaosRestCursor 类是对 PEP249 Cursor 接口的实现。

import taosrest
import os

url = os.environ["TDENGINE_CLOUD_URL"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]

conn = taosrest.connect(url=url, token=token)
# test the connection by getting version info
print("server version:", conn.server_info)
from taosrest import TaosRestCursor
# create STable
cursor: TaosRestCursor = conn.cursor()
cursor.execute("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")

# insert data
cursor.execute("""INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:20.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:39:19.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:41:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:42:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:48:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:55:06.500', 11.50000, 221, 0.35000)
""")
print("inserted row count:", cursor.rowcount)

# query data
cursor.execute("SELECT * FROM power.meters LIMIT 3")
# get total rows
print("queried row count:", cursor.rowcount)
# get column names from cursor
column_names = [meta[0] for meta in cursor.description]
# get rows
data: list[tuple] = cursor.fetchall()
print(column_names)
for row in data:
print(row)

# output:
# inserted row count: 8
# queried row count: 3
# ['ts', 'current', 'voltage', 'phase', 'location', 'groupid']
# [datetime.datetime(2018, 10, 3, 14, 38, 5), 10.3, 219, 0.31, 'California.SanFrancisco', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 15), 12.6, 218, 0.33, 'California.SanFrancisco', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 20, 800000), 12.3, 221, 0.31, 'California.SanFrancisco', 2]

查看源码

  • cursor.execute : 用来执行任意 SQL 语句。
  • cursor.rowcount: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。
  • cursor.description : 返回字段的描述信息。关于描述信息的具体格式请参考TaosRestCursor

使用 RestClient 类

RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。

import os
from taosrest import RestClient

url = os.environ["TDENGINE_CLOUD_URL"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]

client = RestClient(url, token)
res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1")
print(res)

# output:
# {'status': 'succ', 'head': ['ts', 'current'], 'column_meta': [['ts', 9, 8], ['current', 6, 4]], 'data': [[datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.3]], 'rows': 1}

查看源码

对于 sql() 方法更详细的介绍, 请参考 RestClient

其它说明

异常处理

所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:

import taosrest
import os

url = os.environ["TDENGINE_CLOUD_URL"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]

try:
conn = taosrest.connect(url=url, token=token)
conn.execute("CREATE TABLE 123") # wrong sql
except taosrest.Error as e:
print(e)
print("exception class: ", e.__class__.__name__)
print("error number:", e.errno)
print("error message:", e.msg)
except BaseException as other:
print("exception occur")
print(other)

# output:
# [0x2600]: syntax error near "123"
# exception class: ConnectError
# error number: 9728
# error message: syntax error near "123"

查看源码

关于纳秒 (nanosecond)

由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。

  1. https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
  2. https://www.python.org/dev/peps/pep-0564/

重要更新

Release Notes

API 参考

更多关于 Python 连接器的详细介绍请参考连接器-Python

常见问题

欢迎提问或报告问题