TDengine Python Connector
taospy
是 TDengine 的官方 Python 连接器。taospy
提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。taospy
对 TDengine 的原生接口和 REST 接口都进行了封装, 分别对应 taospy
包的 taos
模块 和 taosrest
模块。
除了对原生接口和 REST 接口的封装,taospy
还提供了符合 Python 数据访问规范(PEP 249) 的编程接口。这使得 taospy
和很多第三方工具集成变得简单,比如 SQLAlchemy 和 pandas。
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。
Python 连接器的源码托管在 GitHub。
支持的平台
- 原生连接支持的平台和 TDengine 客户端支持的平台一致。
- REST 连接支持所有能运行 Python 的平台。
版本选择
无论使用什么版本的 TDengine 都建议使用最新版本的 taospy
。
支持的功能
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
安装
准备
- 安装 Python。建议使用 Python >= 3.6。如果系统上还没有 Python 可参考 Python BeginnersGuide 安装。
- 安装 pip。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 pip docuemntation 安装。
- 如果使用原生连接,还需安装客户端驱动。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
使用 pip 安装
卸载旧版本
如果以前安装过旧版本的 Python 连接器, 请提前卸载。
pip3 uninstall taos taospy
较早的 TDengine 客户端软件包含了 Python 连接器。如果从客户端软件的安装目录安装了 Python 连接器,那么对应的 Python 包名是 taos
。 所以上述卸载命令包含了 taos
, 不存在也没关系。
安装 taospy
- 从 PyPI 安装
- 从 GitHub 安装
安装最新版本
pip3 install taospy
也可以指定某个特定版本安装。
pip3 install taospy==2.3.0
pip3 install git+https://github.com/taosdata/taos-connector-python.git
安装验证
- 原生连接
- REST 连接
对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 taos
模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入:
import taos
对于 REST 连接,只需验证是否能成功导入 taosrest
模块。可在 Python 交互式 Shell 中输入:
import taosrest
如果系统上有多个版本的 Python,则可能有多个 pip
命令。要确保使用的 pip
命令路径是正确的。上面我们用 pip3
命令安装,排除了使用 Python 2.x 版本对应的 pip
的可能性。但是如果系统上有多个 Python 3.x 版本,仍需检查安装路径是否正确。最简单的验证方式是,在命令再次输入 pip3 install taospy
, 就会打印出 taospy
的具体安装位置,比如在 Windows 上:
C:\> pip3 install taospy
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Requirement already satisfied: taospy in c:\users\username\appdata\local\programs\python\python310\lib\site-packages (2.3.0)
建立连接
连通性测试
在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。
- 原生连接
- REST 连接
请确保 TDengine 集群已经启动, 且集群中机器的 FQDN (如果启动的是单机版,FQDN 默认为 hostname)在本机能够解析, 可用 ping
命令进行测试:
ping <FQDN>
然后测试用 TDengine CLI 能否正常连接集群:
taos -h <FQDN> -p <PORT>
上面的 FQDN 可以为集群中任意一个 dnode 的 FQDN, PORT 为这个 dnode 对应的 serverPort。
对于 REST 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:
curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"
上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。 如果测试成功,会输出服务器版本信息,比如:
{
"status": "succ",
"head": ["server_version()"],
"column_meta": [["server_version()", 8, 8]],
"data": [["2.4.0.16"]],
"rows": 1
}
使用连接器建立连接
以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。
- 原生连接
- REST 连接
import taos
conn: taos.TaosConnection = taos.connect(host="localhost",
user="root",
password="taosdata",
database="test",
port=6030,
config="/etc/taos", # for windows the default value is C:\TDengine\cfg
timezone="Asia/Shanghai") # default your host's timezone
server_version = conn.server_info
print("server_version", server_version)
client_version = conn.client_info
print("client_version", client_version) # 2.4.0.16
conn.close()
# possible output:
# 2.4.0.16
# 2.4.0.16
connect
函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:
host
: 要连接的节点的 FQDN。 没有默认值。如果不同提供此参数,则会连接客户端配置文件中的 firstEP。user
:TDengine 用户名。 默认值是 root。password
: TDengine 用户密码。 默认值是 taosdata。port
: 要连接的数据节点的起始端口,即 serverPort 配置。默认值是 6030。只有在提供了 host 参数的时候,这个参数才生效。config
: 客户端配置文件路径。 在 Windows 系统上默认是C:\TDengine\cfg
。 在 Linux 系统上默认是/etc/taos/
。timezone
: 查询结果中 TIMESTAMP 类型的数据,转换为 python 的 datetime 对象时使用的时区。默认为本地时区。
config
和 timezone
都是进程级别的配置。建议一个进程建立的所有连接都使用相同的参数值。否则可能产生无法预知的错误。
connect
函数返回 taos.TaosConnection
实例。 在客户端多线程的场景下,推荐每个线程申请一个独立的连接实例,而不建议多线程共享一个连接。
from taosrest import connect, TaosRestConnection, TaosRestCursor
conn: TaosRestConnection = connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
connect()
函数的所 有参数都是可选的关键字参数。下面是连接参数的具体说明:
url
: taosAdapter REST 服务的 URL。默认是 <http://localhost:6041>。user
: TDenigne 用户名。默认是 root。password
: TDengine 用户密码。默认是 taosdata。timeout
: HTTP 请求超时时间。单位为秒。默认为socket._GLOBAL_DEFAULT_TIMEOUT
。 一般无需配置。
示例程序
基本使用
- 原生连接
- REST 连接
TaosConnection 类的使用
TaosConnection
类既包含对 PEP249 Connection 接口的实现(如:cursor
方法和 close
方法),也包含很多扩展功能(如: execute
、 query
、schemaless_insert
和 subscribe
方法。
conn = taos.connect()
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
affected_row: int = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m 24.4)")
print("affected_row", affected_row)
# output:
# affected_row 3
# Execute a sql and get its result set. It's useful for SELECT statement
result: taos.TaosResult = conn.query("SELECT * from weather")
# Get fields from result
fields: taos.field.TaosFields = result.fields
for field in fields:
print(field) # {name: ts, type: 9, bytes: 8}
# output:
# {name: ts, type: 9, bytes: 8}
# {name: temperature, type: 6, bytes: 4}
# {name: location, type: 4, bytes: 4}
# Get data from result as list of tuple
data = result.fetch_all()
print(data)
# output:
# [(datetime.datetime(2022, 4, 27, 9, 4, 25, 367000), 23.5, 1), (datetime.datetime(2022, 4, 27, 9, 5, 25, 367000), 23.5, 1), (datetime.datetime(2022, 4, 27, 9, 6, 25, 367000), 24.399999618530273, 1)]
# Or get data from result as a list of dict
# map_data = result.fetch_all_into_dict()
# print(map_data)
# output:
# [{'ts': datetime.datetime(2022, 4, 27, 9, 1, 15, 343000), 'temperature': 23.5, 'location': 1}, {'ts': datetime.datetime(2022, 4, 27, 9, 2, 15, 343000), 'temperature': 23.5, 'location': 1}, {'ts': datetime.datetime(2022, 4, 27, 9, 3, 15, 343000), 'temperature': 24.399999618530273, 'location': 1}]
查询结果只能获取一次。比如上面的示例中 fetch_all()
和 fetch_all_into_dict()
只能用一个。重复获取得到的结果为空列表。
TaosResult 类的使用
上面 TaosConnection
类的使用示例中,我们已经展示了两种获取查询结果的方法: fetch_all()
和 fetch_all_into_dict()
。除此之外 TaosResult
还提供了按行迭代(rows_iter
)或按数据块迭代(blocks_iter
)结果集的方法。在查询数据量较大的场景,使用这两个方法会更高效。
import taos
conn = taos.connect()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
# prepare data
for i in range(2000):
location = str(i % 10)
tb = "t" + location
conn.execute(f"INSERT INTO {tb} USING weather TAGS({location}) VALUES (now+{i}a, 23.5) (now+{i + 1}a, 23.5)")
result: taos.TaosResult = conn.query("SELECT * FROM weather")
block_index = 0
blocks: taos.TaosBlocks = result.blocks_iter()
for rows, length in blocks:
print("block ", block_index, " length", length)
print("first row in this block:", rows[0])
block_index += 1
conn.close()
# possible output:
# block 0 length 1200
# first row in this block: (datetime.datetime(2022, 4, 27, 15, 14, 52, 46000), 23.5, 0)
# block 1 length 1200
# first row in this block: (datetime.datetime(2022, 4, 27, 15, 14, 52, 76000), 23.5, 3)
# block 2 length 1200
# first row in this block: (datetime.datetime(2022, 4, 27, 15, 14, 52, 99000), 23.5, 6)
# block 3 length 400
# first row in this block: (datetime.datetime(2022, 4, 27, 15, 14, 52, 122000), 23.5, 9)
TaosCursor 类的使用
TaosConnection
类和 TaosResult
类已经实现了原生接口的所有功能。如果你对 PEP249 规范中的接口比较熟悉也可以使用 TaosCursor
类提供的方法。
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS test")
cursor.execute("CREATE DATABASE test")
cursor.execute("USE test")
cursor.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
for i in range(1000):
location = str(i % 10)
tb = "t" + location
cursor.execute(f"INSERT INTO {tb} USING weather TAGS({location}) VALUES (now+{i}a, 23.5) (now+{i + 1}a, 23.5)")
cursor.execute("SELECT count(*) FROM weather")
data = cursor.fetchall()
print("count:", data[0][0])
cursor.execute("SELECT tbname, * FROM weather LIMIT 2")
col_names = [meta[0] for meta in cursor.description]
print(col_names)
rows = cursor.fetchall()
print(rows)
cursor.close()
conn.close()
# output:
# count: 2000
# ['tbname', 'ts', 'temperature', 'location']
# row_count: -1
# [('t0', datetime.datetime(2022, 4, 27, 14, 54, 24, 392000), 23.5, 0), ('t0', datetime.datetime(2022, 4, 27, 14, 54, 24, 393000), 23.5, 0)]
TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。
TaosRestCursor 类的使用
TaosRestCursor
类是对 PEP249 Cursor 接口的实现。
# create STable
cursor: TaosRestCursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power")
cursor.execute("CREATE DATABASE power")
cursor.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
# insert data
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""")
print("inserted row count:", cursor.rowcount)
# query data
cursor.execute("SELECT * FROM power.meters LIMIT 3")
# get total rows
print("queried row count:", cursor.rowcount)
# get column names from cursor
column_names = [meta[0] for meta in cursor.description]
# get rows
data: list[tuple] = cursor.fetchall()
print(column_names)
for row in data:
print(row)
# output:
# inserted row count: 8
# queried row count: 3
# ['ts', 'current', 'voltage', 'phase', 'location', 'groupid']
# [datetime.datetime(2018, 10, 3, 14, 38, 5, 500000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 11.8, 221, 0.28, 'california.losangeles', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 16, 600000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 13.4, 223, 0.29, 'california.losangeles', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.8, 223, 0.29, 'california.losangeles', 3]
cursor.execute
: 用来执行任意 SQL 语句。cursor.rowcount
: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。cursor.description
: 返回字段的描述信息。关于描述信息的具体格式请参考TaosRestCursor。
RestClient 类的使用
RestClient
类是对于 REST API 的直接封装。它只包含一个 sql()
方法用于执行任意 SQL 语句, 并返回执行结果。
from taosrest import RestClient
client = RestClient("http://localhost:6041", user="root", password="taosdata")
res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1")
print(res)
# output:
# {'status': 'succ', 'head': ['ts', 'current'], 'column_meta': [['ts', 9, 8], ['current', 6, 4]], 'data': [[datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.3]], 'rows': 1}
对于 sql()
方法更详细的介绍, 请参考 RestClient。
与 pandas 一起使用
- 原生连接
- REST 连接
- 原生连接 + SQLAlchemy
- REST 连接 + SQLAlchemy
import taos
import pandas
conn = taos.connect()
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM meters", conn)
# print index
print(df.index)
# print data type of element in ts column
print(type(df.ts[0]))
print(df.head(3))
# output:
# RangeIndex(start=0, stop=8, step=1)
# <class 'pandas._libs.tslibs.timestamps.Timestamp'>
# ts current ... location groupid
# 0 2018-10-03 14:38:05.500 11.8 ... california.losangeles 2
# 1 2018-10-03 14:38:16.600 13.4 ... california.losangeles 2
# 2 2018-10-03 14:38:05.000 10.8 ... california.losangeles 3
import taosrest
import pandas
conn = taosrest.connect()
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", conn)
# print index
print(df.index)
# print data type of element in ts column
print(type(df.ts[0]))
print(df.head(3))
# output:
# RangeIndex(start=0, stop=8, step=1)
# <class 'pandas._libs.tslibs.timestamps.Timestamp'>
# ts current ... location groupid
# 0 2018-10-03 06:38:05.500000+00:00 11.8 ... california.losangeles 2
# 1 2018-10-03 06:38:16.600000+00:00 13.4 ... california.losangeles 2
# 2 2018-10-03 06:38:05+00:00 10.8 ... california.losangeles 3
import pandas
from sqlalchemy import create_engine
engine = create_engine("taos://root:taosdata@localhost:6030/power")
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", engine)
# print index
print(df.index)
# print data type of element in ts column
print(type(df.ts[0]))
print(df.head(3))
# output:
# RangeIndex(start=0, stop=8, step=1)
# <class 'pandas._libs.tslibs.timestamps.Timestamp'>
# ts current ... location groupid
# 0 2018-10-03 14:38:05.500 11.8 ... california.losangeles 2
# 1 2018-10-03 14:38:16.600 13.4 ... california.losangeles 2
# 2 2018-10-03 14:38:05.000 10.8 ... california.losangeles 3
import pandas
from sqlalchemy import create_engine
engine = create_engine("taosrest://root:taosdata@localhost:6041")
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", engine)
# print index
print(df.index)
# print data type of element in ts column
print(type(df.ts[0]))
print(df.head(3))
# output:
# RangeIndex(start=0, stop=8, step=1)
# <class 'pandas._libs.tslibs.timestamps.Timestamp'>
# ts current ... location groupid
# 0 2018-10-03 06:38:05.500000+00:00 11.8 ... california.losangeles 2
# 1 2018-10-03 06:38:16.600000+00:00 13.4 ... california.losangeles 2
# 2 2018-10-03 06:38:05+00:00 10.8 ... california.losangeles 3
其它示例程序
示例程序链接 | 示例程序内容 |
---|---|
bind_multi.py | 参数绑定, 一次绑定多行 |
bind_row.py | 参数绑定,一次绑定一行 |
insert_lines.py | InfluxDB 行协议写入 |
json_tag.py | 使用 JSON 类型的标签 |
subscribe-async.py | 异步订阅 |
subscribe-sync.py | 同步订阅 |
其它说明
异常处理
所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
import taos
try:
conn = taos.connect()
conn.execute("CREATE TABLE 123") # wrong sql
except taos.Error as e:
print(e)
print("exception class: ", e.__class__.__name__)
print("error number:", e.errno)
print("error message:", e.msg)
except BaseException as other:
print("exception occur")
print(other)
# output:
# [0x0216]: syntax error near 'Incomplete SQL statement'
# exception class: ProgrammingError
# error number: -2147483114
# error message: syntax error near 'Incomplete SQL statement'
关于纳秒 (nanosecond)
由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。
- https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
- https://www.python.org/dev/peps/pep-0564/
常见问题
欢迎提问或报告问题。