Skip to main content

TDengine Python Connector

taospy 是 TDengine 的官方 Python 连接器。taospy 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。

Python 连接器的源码托管在 GitHub

连接方式

taospy主要提供三种形式的连接器。一般我们推荐使用 Websocket 连接

  • 原生连接,对应 taospy 包的 taos 模块。通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 实例,支持数据写入、查询、数据订阅、schemaless 接口和参数绑定接口等功能。
  • REST 连接,对应 taospy 包的 taosrest 模块。通过 taosAdapter 提供的 HTTP 接口连接 TDengine 实例,不支持 schemaless 和数据订阅等特性。
  • Websocket 连接,对应 taos-ws-py 包,可以选装。通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例,WebSocket 连接实现的功能集合和原生连接有少量不同。

连接方式的详细介绍请参考:连接器建立连接的方式

除了对原生接口和 REST 接口的封装,taospy 还提供了符合 Python 数据访问规范(PEP 249) 的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemypandas

使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口或 WebSocket 接口与服务端建立的连接的方式下文中称为“REST 连接”或“WebSocket 连接”。

支持的平台

  • 原生连接支持的平台和 TDengine 客户端支持的平台一致。
  • REST 连接支持所有能运行 Python 的平台。

支持的功能

  • 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
  • REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。

历史版本

无论使用什么版本的 TDengine 都建议使用最新版本的 taospy

Python Connector 版本主要变化
2.7.121. 新增 varbinary 类型支持(STMT暂不支持 varbinary )
2. query 性能提升(感谢贡献者hadrianl
2.7.9数据订阅支持获取消费进度和重置消费进度
2.7.8新增 execute_many
Python Websocket Connector 版本主要变化
0.2.9已知问题修复
0.2.51. 数据订阅支持获取消费进度和重置消费进度
2. 支持 schemaless
3. 支持 STMT
0.2.4数据订阅新增取消订阅方法

处理异常

Python 连接器可能会产生 4 种异常:

  • Python 连接器本身的异常
  • 原生连接方式的异常
  • websocket 连接方式异常
  • 数据订阅异常
  • TDengine 其他功能模块的异常
Error TypeDescriptionSuggested Actions
InterfaceErrortaosc 版本太低,不支持所使用的接口请检查 TDengine 客户端版本
ConnectionError数据库链接错误请检查 TDengine 服务端状态和连接参数
DatabaseError数据库错误请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版
OperationalError操作错误API 使用错误,请检查代码
ProgrammingError
StatementErrorstmt 相关异常
ResultError
SchemalessErrorschemaless 相关异常
TmqErrortmq 相关异常

Python 中通常通过 try-expect 处理异常,异常处理相关请参考 Python 错误和异常文档

Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:

import taos

try:
conn = taos.connect()
conn.execute("CREATE TABLE 123") # wrong sql
except taos.Error as e:
print(e)
print("exception class: ", e.__class__.__name__)
print("error number:", e.errno)
print("error message:", e.msg)
except BaseException as other:
print("exception occur")
print(other)

# output:
# [0x0216]: syntax error near 'Incomplete SQL statement'
# exception class: ProgrammingError
# error number: -2147483114
# error message: syntax error near 'Incomplete SQL statement'

查看源码

TDengine DataType 和 Python DataType

TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:

TDengine DataTypePython DataType
TIMESTAMPdatetime
INTint
BIGINTint
FLOATfloat
DOUBLEint
SMALLINTint
TINYINTint
BOOLbool
BINARYstr
NCHARstr
JSONstr

安装步骤

安装前准备

  1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 Python BeginnersGuide 安装。
  2. 安装 pip。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 pip documentation 安装。
  3. 如果使用原生连接,还需安装客户端驱动。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。

使用 pip 安装

卸载旧版本

如果以前安装过旧版本的 Python 连接器, 请提前卸载。

pip3 uninstall taos taospy
note

较早的 TDengine 客户端软件包含了 Python 连接器。如果从客户端软件的安装目录安装了 Python 连接器,那么对应的 Python 包名是 taos。 所以上述卸载命令包含了 taos, 不存在也没关系。

安装 taospy

安装最新版本

pip3 install taospy

也可以指定某个特定版本安装。

pip3 install taospy==2.3.0

安装 taos-ws-py(可选)

taos-ws-py 包提供了通过 WebSocket 连接 TDengine 的能力,可选安装 taos-ws-py 以获得 WebSocket 连接 TDengine 的能力。

和 taospy 同时安装
pip3 install taospy[ws]
单独安装
pip3 install taos-ws-py

安装验证

对于 REST 连接,只需验证是否能成功导入 taosrest 模块。可在 Python 交互式 Shell 中输入:

import taosrest
tip

如果系统上有多个版本的 Python,则可能有多个 pip 命令。要确保使用的 pip 命令路径是正确的。上面我们用 pip3 命令安装,排除了使用 Python 2.x 版本对应的 pip 的可能性。但是如果系统上有多个 Python 3.x 版本,仍需检查安装路径是否正确。最简单的验证方式是,在命令再次输入 pip3 install taospy, 就会打印出 taospy 的具体安装位置,比如在 Windows 上:

C:\> pip3 install taospy
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Requirement already satisfied: taospy in c:\users\username\appdata\local\programs\python\python310\lib\site-packages (2.3.0)

建立连接

连通性测试

在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。

对于 REST 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:

curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"

上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。 如果测试成功,会输出服务器版本信息,比如:

{
"code": 0,
"column_meta": [
[
"server_version()",
"VARCHAR",
7
]
],
"data": [
[
"3.0.0.0"
]
],
"rows": 1
}

指定 Host 和 Properties 获取连接

以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。

from taosrest import connect, TaosRestConnection, TaosRestCursor

conn = connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)

查看源码

connect() 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:

  • url: taosAdapter REST 服务的 URL。默认是 <http://localhost:6041>。
  • user: TDengine 用户名。默认是 root。
  • password: TDengine 用户密码。默认是 taosdata。
  • timeout: HTTP 请求超时时间。单位为秒。默认为 socket._GLOBAL_DEFAULT_TIMEOUT。 一般无需配置。

配置参数的优先级

如果配置参数在参数和客户端配置文件中有重复,则参数的优先级由高到低分别如下:

  1. 连接参数
  2. 使用原生连接时,TDengine 客户端驱动的配置文件 taos.cfg

使用示例

创建数据库和表

conn = taosrest.connect(url="http://localhost:6041")
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("USE test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")

插入数据

conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")

::: now 为系统内部函数,默认为客户端所在计算机当前时间。 now + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。 :::

基本使用

RestClient 类的使用

RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。

RestClient 的使用
from taosrest import RestClient

client = RestClient("http://localhost:6041", user="root", password="taosdata")
res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1")
print(res)

# output:
# {'status': 'succ', 'head': ['ts', 'current'], 'column_meta': [['ts', 9, 8], ['current', 6, 4]], 'data': [[datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.3]], 'rows': 1}


查看源码

对于 sql() 方法更详细的介绍, 请参考 RestClient

TaosRestCursor 类的使用

TaosRestCursor 类是对 PEP249 Cursor 接口的实现。

TaosRestCursor 的使用
# create STable
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power")
cursor.execute("CREATE DATABASE power keep 36500 ")
cursor.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")

# insert data
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""")
print("inserted row count:", cursor.rowcount)

# query data
cursor.execute("SELECT * FROM power.meters LIMIT 3")
# get total rows
print("queried row count:", cursor.rowcount)
# get column names from cursor
column_names = [meta[0] for meta in cursor.description]
# get rows
data = cursor.fetchall()
print(column_names)
for row in data:
print(row)
# close cursor
cursor.close()

# output:
# inserted row count: 8
# queried row count: 3
# ['ts', 'current', 'voltage', 'phase', 'location', 'groupid']
# [datetime.datetime(2018, 10, 3, 14, 38, 5, 500000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 11.8, 221, 0.28, 'california.losangeles', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 16, 600000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 13.4, 223, 0.29, 'california.losangeles', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.8, 223, 0.29, 'california.losangeles', 3]

查看源码

  • cursor.execute : 用来执行任意 SQL 语句。
  • cursor.rowcount: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。
  • cursor.description : 返回字段的描述信息。关于描述信息的具体格式请参考TaosRestCursor
note

TaosRestCursor 的最佳实践是,查询开始时创建 cursor,用完之后就关闭,请避免复用同一个 cursor 多次执行。

查询数据

RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。

from taosrest import RestClient

client = RestClient("http://localhost:6041", user="root", password="taosdata")
res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1")
print(res)

# output:
# {'status': 'succ', 'head': ['ts', 'current'], 'column_meta': [['ts', 9, 8], ['current', 6, 4]], 'data': [[datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.3]], 'rows': 1}


查看源码

对于 sql() 方法更详细的介绍, 请参考 RestClient

执行带有 reqId 的 SQL

reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:

  • 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
  • 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
  • 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置

如果用户不设置reqId,连接器也会内部随机生成一个,但是还是建议用户设置,可以更好的跟用户请求关联起来。

类似上文介绍的使用方法,增加 req_id 参数。

RestClient 类的使用

RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。

RestClient 的使用
from taosrest import RestClient

client = RestClient("http://localhost:6041", user="root", password="taosdata")
res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1", req_id=1)
print(res)

# output:
# {'status': 'succ', 'head': ['ts', 'current'], 'column_meta': [['ts', 9, 8], ['current', 6, 4]], 'data': [[datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.3]], 'rows': 1}


查看源码

对于 sql() 方法更详细的介绍, 请参考 RestClient

TaosRestCursor 类的使用

TaosRestCursor 类是对 PEP249 Cursor 接口的实现。

TaosRestCursor 的使用
# create STable
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power", req_id=1)
cursor.execute("CREATE DATABASE power keep 36500", req_id=2)
cursor.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)", req_id=3)

# insert data
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""", req_id=4)
print("inserted row count:", cursor.rowcount)

# query data
cursor.execute("SELECT * FROM power.meters LIMIT 3", req_id=5)
# get total rows
print("queried row count:", cursor.rowcount)
# get column names from cursor
column_names = [meta[0] for meta in cursor.description]
# get rows
data = cursor.fetchall()
print(column_names)
for row in data:
print(row)
# close cursor
cursor.close()

# output:
# inserted row count: 8
# queried row count: 3
# ['ts', 'current', 'voltage', 'phase', 'location', 'groupid']
# [datetime.datetime(2018, 10, 3, 14, 38, 5, 500000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 11.8, 221, 0.28, 'california.losangeles', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 16, 600000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 13.4, 223, 0.29, 'california.losangeles', 2]
# [datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.8, 223, 0.29, 'california.losangeles', 3]

查看源码

  • cursor.execute : 用来执行任意 SQL 语句。
  • cursor.rowcount: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。
  • cursor.description : 返回字段的描述信息。关于描述信息的具体格式请参考TaosRestCursor

与 pandas 一起使用

import pandas
from sqlalchemy import create_engine, text

engine = create_engine("taosrest://root:taosdata@localhost:6041")
conn = engine.connect()
df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()

# print index
print(df.index)
# print data type of element in ts column
print(type(df.ts[0]))
print(df.head(3))

# output:
# RangeIndex(start=0, stop=8, step=1)
# <class 'pandas._libs.tslibs.timestamps.Timestamp'>
# ts current ... location groupid
# 0 2018-10-03 06:38:05.500000+00:00 11.8 ... california.losangeles 2
# 1 2018-10-03 06:38:16.600000+00:00 13.4 ... california.losangeles 2
# 2 2018-10-03 06:38:05+00:00 10.8 ... california.losangeles 3

查看源码

通过参数绑定写入数据

TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 ? 来代表待绑定的参数。

创建 stmt

Python 连接器的 Connection 提供了 statement 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 ? 来代表绑定的参数。

import taos

conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
参数绑定

调用 new_multi_binds 函数创建 params 列表,用于参数绑定。

params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])

调用 stmt 的 bind_param 以单行的方式设置 values 或 bind_param_batch 以多行的方式设置 values 方法绑定参数。

stmt.bind_param_batch(params)
执行 sql

调用 stmt 的 execute 方法执行 sql

stmt.execute()
关闭 stmt

最后需要关闭 stmt。

stmt.close()
示例代码
#!

import taosws

import taos

db_name = 'test_ws_stmt'


def before():
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.execute("create database %s keep 36500" % db_name)
taos_conn.select_db(db_name)
taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))")
taos_conn.execute(
"create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))")
taos_conn.close()


def stmt_insert():
before()

conn = taosws.connect('taosws://root:taosdata@localhost:6041/%s' % db_name)

while True:
try:
stmt = conn.statement()
stmt.prepare("insert into t1 values (?, ?, ?, ?)")

stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])

stmt.add_batch()
rows = stmt.execute()
print(rows)
stmt.close()
except Exception as e:
if 'Retry needed' in e.args[0]: # deal with [0x0125] Retry needed
continue
else:
raise e

break


def stmt_insert_into_stable():
before()

conn = taosws.connect("taosws://root:taosdata@localhost:6041/%s" % db_name)

while True:
try:
stmt = conn.statement()
stmt.prepare("insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)")
stmt.set_tbname('stb1_1')
stmt.set_tags([
taosws.int_to_tag(1),
taosws.varchar_to_tag('aaa'),
])
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])

stmt.add_batch()
rows = stmt.execute()
print(rows)
stmt.close()
except Exception as e:
if 'Retry needed' in e.args[0]: # deal with [0x0125] Retry needed
continue
else:
raise e

break

查看源码

无模式写入

连接器支持无模式写入功能。

简单写入
import taos

conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us' keep 36500" % dbname)
conn.select_db(dbname)

lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000',
]
conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
print("inserted")

conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)

result = conn.query("show tables")
for row in result:
print(row)

conn.execute("drop database if exists %s" % dbname)

查看源码

带有 ttl 参数的写入
import taos
from taos import SmlProtocol, SmlPrecision

conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us' keep 36500" % dbname)
conn.select_db(dbname)

lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000',
]
conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, ttl=1000)
print("inserted")

conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, ttl=1000)

result = conn.query("show tables")
for row in result:
print(row)

conn.execute("drop database if exists %s" % dbname)

查看源码

带有 req_id 参数的写入
import taos
from taos import SmlProtocol, SmlPrecision

conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us' keep 36500" % dbname)
conn.select_db(dbname)

lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000',
]
conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=1)
print("inserted")

conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=2)

result = conn.query("show tables")
for row in result:
print(row)

conn.execute("drop database if exists %s" % dbname)

查看源码

执行带有 reqId 的无模式写入

连接器的 schemaless_insertschemaless_insert_raw 方法支持 req_id 可选参数,此 req_Id 可用于请求链路追踪。

import taos
from taos import SmlProtocol, SmlPrecision

conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us' keep 36500" % dbname)
conn.select_db(dbname)

lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000',
]
conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=1)
print("inserted")

conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=2)

result = conn.query("show tables")
for row in result:
print(row)

conn.execute("drop database if exists %s" % dbname)

查看源码

import taos
from taos import utils
from taos import TaosConnection
from taos.cinterface import *
from taos.error import OperationalError, SchemalessError

conn = taos.connect()
dbname = "taos_schemaless_insert"
try:
conn.execute("drop database if exists %s" % dbname)

if taos.IS_V3:
conn.execute("create database if not exists %s schemaless 1 precision 'ns' keep 36500" % dbname)
else:
conn.execute("create database if not exists %s update 2 precision 'ns' keep 36500" % dbname)

conn.select_db(dbname)

lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000
st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000
stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000'''

ttl = 1000
req_id = utils.gen_req_id()
res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id)
print("affected rows: ", res)
assert (res == 3)

lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000'''
ttl = 1000
req_id = utils.gen_req_id()
res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id)
print("affected rows: ", res)
assert (res == 1)

result = conn.query("select * from st")
dict2 = result.fetch_all_into_dict()
print(dict2)
print(result.row_count)

all = result.rows_iter()
for row in all:
print(row)
result.close()
assert (result.row_count == 2)

# error test
lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000'''
try:
ttl = 1000
req_id = utils.gen_req_id()
res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id)
print(res)
# assert(False)
except SchemalessError as err:
print('**** error: ', err)
# assert (err.msg == 'Invalid data format')

result = conn.query("select * from st")
print(result.row_count)
all = result.rows_iter()
for row in all:
print(row)
result.close()

conn.execute("drop database if exists %s" % dbname)
conn.close()
except InterfaceError as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
print(err)
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
print(err)
raise err

查看源码

数据订阅

连接器支持数据订阅功能,数据订阅功能请参考 数据订阅文档

创建 Topic

创建 Topic 相关请参考 数据订阅文档

创建 Consumer

Consumer 提供了 Python 连接器订阅 TMQ 数据的 API。创建 Consumer 语法为 consumer = Consumer(configs),参数定义请参考 数据订阅文档

from taos.tmq import Consumer

consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})

订阅 topics

Consumer API 的 subscribe 方法用于订阅 topics,consumer 支持同时订阅多个 topic。

consumer.subscribe(['topic1', 'topic2'])

消费数据

Consumer API 的 poll 方法用于消费数据,poll 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),poll 方法在超时之前返回一条 Message 类型的数据或超时返回 None。消费者必须通过 Message 的 error() 方法校验返回数据的 error 信息。

while True:
message = consumer.poll(1)
if not message:
continue
err = message.error()
if err is not None:
raise err
val = message.value()

for block in val:
print(block.fetchall())

获取消费进度

Consumer API 的 assignment 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。

assignments = consumer.assignment()

Consumer API 的 seek 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。

tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)

关闭订阅

消费结束后,应当取消订阅,并关闭 Consumer。

consumer.unsubscribe()
consumer.close()

完整示例

from taos.tmq import Consumer
import taos


def init_tmq_env(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
conn.execute("create database if not exists {} wal_retention_period 3600 keep 36500".format(db))
conn.select_db(db)
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")


def cleanup(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))


if __name__ == '__main__':
init_tmq_env("tmq_test", "tmq_test_topic") # init env
consumer = Consumer(
{
"group.id": "tg2",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
consumer.subscribe(["tmq_test_topic"])

try:
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()

for block in val:
print(block.fetchall())
finally:
consumer.unsubscribe()
consumer.close()
cleanup("tmq_test", "tmq_test_topic")

查看源码

import taos
from taos.tmq import Consumer
import taosws


def prepare():
conn = taos.connect()
conn.execute("drop topic if exists tmq_assignment_demo_topic")
conn.execute("drop database if exists tmq_assignment_demo_db")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600 keep 36500")
conn.select_db("tmq_assignment_demo_db")
conn.execute(
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute(
"create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')")


def taos_get_assignment_and_seek_demo():
prepare()
consumer = Consumer(
{
"group.id": "0",
}
)
consumer.subscribe(["tmq_assignment_demo_topic"])

# get topic assignment
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)

# poll
consumer.poll(1)
consumer.poll(1)

# get topic assignment again
after_pool_assignments = consumer.assignment()
for assignment in after_pool_assignments:
print(assignment)

# seek to the beginning
for assignment in assignments:
consumer.seek(assignment)

# now the assignment should be the same as before poll
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)


if __name__ == '__main__':
taos_get_assignment_and_seek_demo()

查看源码

更多示例程序

示例程序链接示例程序内容
bind_multi.py参数绑定, 一次绑定多行
bind_row.py参数绑定,一次绑定一行
insert_lines.pyInfluxDB 行协议写入
json_tag.py使用 JSON 类型的标签
tmq_consumer.pytmq 订阅

其它说明

关于纳秒 (nanosecond)

由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。

  1. https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
  2. https://www.python.org/dev/peps/pep-0564/

重要更新

Release Notes

API 参考

常见问题

欢迎提问或报告问题