Skip to main content

TDengine Python 连接器

taospy 是 TDengine 的官方 Python 连接器。taospy 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。taospy 对 REST 接口都进行了封装。taospy 还提供了符合 Python 数据访问规范(PEP 249) 的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemypandas

Python 连接器的源码托管在 GitHub

安装

准备

  1. 安装 Python。新近版本 taospy 包要求 Python 3.6+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 Python BeginnersGuide 安装。
  2. 安装 pip。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 pip documentation 安装。

使用 pip 安装

pip3 install -U taospy[ws]

使用 conda 安装

conda install -c conda-forge taospy taospyws

安装验证

只需验证是否能成功导入 taosrest 模块。可在 Python 交互式 Shell 中输入:

import taosrest

建立连接

import taosrest
import os

url = os.environ["TDENGINE_CLOUD_URL"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]

conn = taosrest.connect(url=url, token=token)
# test the connection by getting version info
print("server version:", conn.server_info)

查看源码

connect() 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:

  • url: TDengine Cloud 的URL。
  • token: TDengine Cloud 的令牌.
  • timeout: HTTP 请求超时时间。单位为秒。默认为 socket._GLOBAL_DEFAULT_TIMEOUT。 一般无需配置。

示例程序

使用 TaosRestConnection 类

affected_row = conn.execute("DROP DATABASE IF EXISTS power")
print("affected_row", affected_row) # 0
affected_row = conn.execute("CREATE DATABASE power")
print("affected_row", affected_row) # 0
conn.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
print("affected_row", affected_row) # 0
affected_row = conn.execute("""INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)
""")
print("affected_row", affected_row) # 8

result = conn.query("SELECT ts, current FROM power.meters LIMIT 2")

print("metadata of each column:\n", result.fields) # [{'name': 'ts', 'type': 9, 'bytes': 8}, {'name': 'current', 'type': 6, 'bytes': 4}]

print("total rows:", result.rows) # 2

# Iterate over result.
for row in result:
print(row)
# output:
# [datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone.utc), 10.3]
# [datetime.datetime(2018, 10, 3, 14, 38, 15, tzinfo=datetime.timezone.utc), 12.6]

# Or get all rows as a list
print(result.data) # [[datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone.utc), 10.3], [datetime.datetime(2018, 10, 3, 14, 38, 15, tzinfo=datetime.timezone.utc), 12.6]]

查看源码

使用 TaosRestCursor 类

TaosRestCursor 类是对 PEP249 Cursor 接口的实现。

from taosrest import TaosRestCursor
# create STable
cursor: TaosRestCursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power")
cursor.execute("CREATE DATABASE power")
cursor.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")

# insert data
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""")
print("inserted row count:", cursor.rowcount)

# query data
cursor.execute("SELECT * FROM power.meters LIMIT 3")
# get total rows
print("queried row count:", cursor.rowcount)
# get column names from cursor
column_names = [meta[0] for meta in cursor.description]
# get rows
data: list[tuple] = cursor.fetchall()
print(column_names)
for row in data:
print(row)

# output:
# inserted row count: 8
# queried row count: 3
# ['ts', 'current', 'voltage', 'phase', 'location', 'groupid']
# [datetime.datetime(2018, 10, 3, 14, 38, 5, 500000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 11.8, 221, 0.28, 'california.losangeles', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 16, 600000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 13.4, 223, 0.29, 'california.losangeles', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.8, 223, 0.29, 'california.losangeles', 3]

查看源码

  • cursor.execute : 用来执行任意 SQL 语句。
  • cursor.rowcount: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。
  • cursor.description : 返回字段的描述信息。关于描述信息的具体格式请参考TaosRestCursor
RestClient 类的使用

RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。

import os
from taosrest import RestClient

url = os.environ["TDENGINE_CLOUD_URL"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]

client = RestClient(url, token)
res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1")
print(res)

# output:
# {'status': 'succ', 'head': ['ts', 'current'], 'column_meta': [['ts', 9, 8], ['current', 6, 4]], 'data': [[datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.3]], 'rows': 1}

查看源码

对于 sql() 方法更详细的介绍, 请参考 RestClient

其它说明

异常处理

所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:

import taos

try:
conn = taos.connect()
conn.execute("CREATE TABLE 123") # wrong sql
except taos.Error as e:
print(e)
print("exception class: ", e.__class__.__name__)
print("error number:", e.errno)
print("error message:", e.msg)
except BaseException as other:
print("exception occur")
print(other)

# output:
# [0x0216]: syntax error near 'Incomplete SQL statement'
# exception class: ProgrammingError
# error number: -2147483114
# error message: syntax error near 'Incomplete SQL statement'

查看源码

关于纳秒 (nanosecond)

由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。

  1. https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
  2. https://www.python.org/dev/peps/pep-0564/

重要更新

Release Notes

API 参考

常见问题

欢迎提问或报告问题