与 Pandas 集成
Pandas 是 Python 编程语言中最为流行的数据处理和分析库,自 2008 年由 Wes McKinney 创建以来,已成为数据科学领域不可或缺的核心工具。它专门为解决现实世界中的数据分析任务而设计,使得在 Python 中处理结构化数据变得异常简单。无论是处理商业报表、科学研究数据,还是进行金融分析,Pandas 都能提供专业的解决方案。通过简洁的 API 和丰富的功能,Pandas 极大地降低了数据处理的技术门槛,让使用者能够更专注于数据本身的价值挖掘,而非陷入繁琐的技术细节之中。
通过 TDengine TSDB 的 Python 连接器,Pandas 可支持 TDengine TSDB 数据源并提供数据展现、分析等功能。
前置条件
准备以下环境:
- TDengine TSDB 3.3.7.0 或以上版本已安装(企业及社区版均可)。
- SQLAlchemy v2.0.0 或以上版本已安装,安装参考 官方文档。
- pandas v2.1.0 或以上版本已安装,安装参考 官方文档。
- Python 连接器 taospy 2.8.6 或以上版本已 安装。
配置数据源
Pandas 使用 SQLAlchemy 连接到 TDengine TSDB 数据源,连接 URL 格式为:
taos://[username]:[password]@[<host1>:<port1>]/[database_name]
建立连接
from datetime import datetime
import pandas
from sqlalchemy import create_engine, text
from sqlalchemy.types import Integer, Float, TIMESTAMP, String
def connect():
"""Create a connection to TDengine using SQLAlchemy"""
engine = create_engine(f"taos://root:taosdata@localhost:6030?timezone=Asia/Shanghai")
conn = engine.connect()
print("Connected to TDengine successfully.")
return conn
数据交互
下面介绍了如何通过调用 Pandas 接口,并结合 SQLAlchemy 与 TDengine TSDB 数据库进行写入和查询操作。 关于 Pandas 接口的详细说明请参照 Pandas Api。
数据类型映射
TDengine TSDB 目前支持时间戳、数字、字符、布尔类型,与 sqlalchemy.types 对应类型转换如下:
| Sqlalchemy Types | TDengine TSDB DataType |
|---|---|
| sqltypes.Boolean | BOOL |
| sqltypes.TIMESTAMP | TIMESTAMP |
| sqltypes.Integer | INT |
| sqltypes.Integer | INT UNSIGNED |
| sqltypes.BigInteger | BIGINT |
| sqltypes.BigInteger | BIGINT UNSIGNED |
| sqltypes.FLOAT | FLOAT |
| sqltypes.FLOAT | DOUBLE |
| sqltypes.SmallInteger | TINYINT |
| sqltypes.SmallInteger | TINYINT UNSIGNED |
| sqltypes.SmallInteger | SMALLINT |
| sqltypes.SmallInteger | SMALLINT UNSIGNED |
| sqltypes.String | BINARY |
| sqltypes.String | VARCHAR |
| sqltypes.BINARY | VARBINARY |
| sqltypes.Unicode | NCHAR |
| sqltypes.JSON | JSON |
| sqltypes.BLOB | BLOB |
| sqltypes.BINARY | GEOMETRY |
数据写入
使用 Pandas 的 to_sql 方式写入数据:
def pandas_to_sql_example(conn):
"""Test writing data to TDengine using pandas DataFrame.to_sql() method and verify the results"""
try:
conn.execute(text("CREATE DATABASE IF NOT EXISTS power"))
conn.execute(text(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"))
conn.execute(text("USE power"))
data = {
"ts": [1729653691000, "2024-09-19 10:00:00", datetime(2024, 9, 20, 10, 11, 12, 456)],
"current": [11.5, 12.3, 13.7],
"voltage": [220, 230, 240],
"phase": [1.0, 1.1, 1.2],
"location": ["california.losangeles", "california.sandiego", "california.sanfrancisco"],
"groupid": [2, 2, 3],
"tbname": ["california", "sandiego", "sanfrancisco"]
}
df = pandas.DataFrame(data)
rows_affected = df.to_sql("meters", conn, if_exists="append", index=False,
dtype={
"ts": TIMESTAMP,
"current": Float,
"voltage": Integer,
"phase": Float,
"location": String,
"groupid": Integer,
})
assert rows_affected == 3, f"Expected to insert 3 rows, affected {rows_affected} rows"
except Exception as err:
print(f"Failed to insert data into power.meters, ErrMessage:{err}")
raise err
数据查询
使用 Pandas 的 read_sql 方式进行查询:
def pandas_read_sql_example(conn):
"""Test reading data from TDengine using pandas read_sql() method"""
try:
sql = text("SELECT * FROM power.meters WHERE current > :current AND phase > :phase")
sql_df = pandas.read_sql(
sql=sql,
con=conn,
params={"current": 10, "phase": 1}
)
print(sql_df.head(3))
print("Read data from TDengine successfully.")
except Exception as err:
print(f"Failed to read data from power.meters, ErrMessage:{err}")
raise err
使用 Pandas 的 read_sql_table 方式读取表数据:
def pandas_read_sql_table_example(conn):
"""Test reading data from TDengine using pandas read_sql_table() method"""
try:
table_df = pandas.read_sql_table(
table_name='meters',
con=conn,
index_col='ts',
parse_dates=['ts'],
chunksize=1000, # optional, read data rows in chunks
columns=[
'ts',
'current',
'voltage',
'phase',
'location',
'groupid'
],
)
total_rows = 0
for i, chunk in enumerate(table_df, start=1):
print(f"Processing chunk {i}")
for index, row in chunk.iterrows():
total_rows += 1
print(f"no: {total_rows}")
print(f"ts: {index}")
print(f"current: {row['current']}")
print(f"voltage: {row['voltage']}")
print(f"phase: {row['phase']}")
print(f"location: {row['location']}")
print(f"groupid: {row['groupid']}")
print("Read data from TDengine successfully using read_sql_table.")
except Exception as err:
print(f"Failed to read data from power.meters using read_sql_table, ErrMessage:{err}")
raise err







