Skip to main content

查询数据

主要查询功能

TDengine 采用 SQL 作为查询语言。应用程序可以通过 REST API 或连接器发送 SQL 语句,用户还可以通过 TDengine 命令行工具 taos 手动执行 SQL 即席查询(Ad-Hoc Query)。TDengine 支持如下查询功能:

  • 单列、多列数据查询
  • 标签和数值的多种过滤条件:>, <, =, <>, like 等
  • 聚合结果的分组(Group by)、排序(Order by)、约束输出(Limit/Offset)
  • 时间窗口(Interval)、会话窗口(Session)和状态窗口(State_window)等窗口切分聚合查询
  • 数值列及聚合结果的四则运算
  • 时间戳对齐的连接查询(Join Query: 隐式连接)操作
  • 多种聚合/计算函数: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff 等

例如:在命令行工具 taos 中,从表 d1001 中查询出 voltage > 215 的记录,按时间降序排列,仅仅输出 2 条。

taos> select * from d1001 where voltage > 215 order by ts desc limit 2;
ts | current | voltage | phase |
======================================================================================
2018-10-03 14:38:16.800 | 12.30000 | 221 | 0.31000 |
2018-10-03 14:38:15.000 | 12.60000 | 218 | 0.33000 |
Query OK, 2 row(s) in set (0.001100s)

为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。

具体的查询语法请看 TDengine SQL 的数据查询 章节。

多表聚合查询

物联网场景中,往往同一个类型的数据采集点有多个。TDengine 采用超级表(STable)的概念来描述某一个类型的数据采集点,一张普通的表来描述一个具体的数据采集点。同时 TDengine 使用标签来描述数据采集点的静态属性,一个具体的数据采集点有具体的标签值。通过指定标签的过滤条件,TDengine 提供了一高效的方法将超级表(某一类型的数据采集点)所属的子表进行聚合查询。对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样。

示例一

在 TDengine CLI,查找加利福尼亚州所有智能电表采集的电压平均值,并按照 location 分组。

taos> SELECT AVG(voltage), location FROM meters GROUP BY location;
avg(voltage) | location |
===============================================================================================
219.200000000 | California.SanFrancisco |
221.666666667 | California.LosAngeles |
Query OK, 2 rows in database (0.005995s)

示例二

在 TDengine CLI, 查找 groupId 为 2 的所有智能电表的记录条数,电流的最大值。

taos> SELECT count(*), max(current) FROM meters where groupId = 2;
cunt(*) | max(current) |
==================================
5 | 13.4 |
Query OK, 1 row(s) in set (0.002136s)

TDengine SQL 的数据查询 一章,查询类操作都会注明是否支持超级表。

降采样查询、插值

物联网场景里,经常需要通过降采样(down sampling)将采集的数据按时间段进行聚合。TDengine 提供了一个简便的关键词 interval 让按照时间窗口的查询操作变得极为简单。比如,将智能电表 d1001 采集的电流值每 10 秒钟求和

taos> SELECT _wstart, sum(current) FROM d1001 INTERVAL(10s);
_wstart | sum(current) |
======================================================
2018-10-03 14:38:00.000 | 10.300000191 |
2018-10-03 14:38:10.000 | 24.900000572 |
Query OK, 2 rows in database (0.003139s)

降采样操作也适用于超级表,比如:将加利福尼亚州所有智能电表采集的电流值每秒钟求和

taos> SELECT _wstart, SUM(current) FROM meters where location like "California%" INTERVAL(1s);
_wstart | sum(current) |
======================================================
2018-10-03 14:38:04.000 | 10.199999809 |
2018-10-03 14:38:05.000 | 23.699999809 |
2018-10-03 14:38:06.000 | 11.500000000 |
2018-10-03 14:38:15.000 | 12.600000381 |
2018-10-03 14:38:16.000 | 34.400000572 |
Query OK, 5 rows in database (0.007413s)

降采样操作也支持时间偏移,比如:将所有智能电表采集的电流值每秒钟求和,但要求每个时间窗口从 500 毫秒开始

taos> SELECT _wstart, SUM(current) FROM meters INTERVAL(1s, 500a);
_wstart | sum(current) |
======================================================
2018-10-03 14:38:03.500 | 10.199999809 |
2018-10-03 14:38:04.500 | 10.300000191 |
2018-10-03 14:38:05.500 | 13.399999619 |
2018-10-03 14:38:06.500 | 11.500000000 |
2018-10-03 14:38:14.500 | 12.600000381 |
2018-10-03 14:38:16.500 | 34.400000572 |
Query OK, 6 rows in database (0.005515s)

物联网场景里,每个数据采集点采集数据的时间是难同步的,但很多分析算法(比如 FFT)需要把采集的数据严格按照时间等间隔的对齐,在很多系统里,需要应用自己写程序来处理,但使用 TDengine 的降采样操作就轻松解决。

如果一个时间间隔里,没有采集的数据,TDengine 还提供插值计算的功能。

语法规则细节请见 TDengine SQL 的按时间窗口切分聚合 章节。

示例代码

查询数据

SQL 写入 一章,我们创建了 power 数据库,并向 meters 表写入了一些数据,以下示例代码展示如何查询这个表的数据。

package com.taos.example;

import java.sql.*;

public class RestQueryExample {
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://localhost:6041/power?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}

private static void printRow(ResultSet rs) throws SQLException {
ResultSetMetaData meta = rs.getMetaData();
for (int i = 1; i <= meta.getColumnCount(); i++) {
String value = rs.getString(i);
System.out.print(value);
System.out.print("\t");
}
System.out.println();
}

private static void printColName(ResultSet rs) throws SQLException {
ResultSetMetaData meta = rs.getMetaData();
for (int i = 1; i <= meta.getColumnCount(); i++) {
String colLabel = meta.getColumnLabel(i);
System.out.print(colLabel);
System.out.print("\t");
}
System.out.println();
}

private static void processResult(ResultSet rs) throws SQLException {
printColName(rs);
while (rs.next()) {
printRow(rs);
}
}

private static void queryData() throws SQLException {
try (Connection conn = getConnection()) {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT AVG(voltage) FROM meters GROUP BY location");
processResult(rs);
}
}
}

public static void main(String[] args) throws SQLException {
queryData();
}
}

// possible output:
// avg(voltage) location
// 222.0 California.LosAngeles
// 219.0 California.SanFrancisco

查看源码

note
  1. 无论是使用 REST 连接还是原生连接的连接器,以上示例代码都能正常工作。
  2. 唯一需要注意的是:由于 REST 接口无状态, 不能使用 use db 语句来切换数据库。除了在 REST 参数中指定数据库以外也可以在 SQL 语句中使用 <db_name>.<table_name> 来指定数据库。

异步查询

除同步查询 API 之外,TDengine 还提供性能更高的异步调用 API 处理数据插入、查询操作。在软硬件环境相同的情况下,异步 API 处理数据插入的速度比同步 API 快 2-4 倍。异步 API 采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步 API 在网络延迟严重的情况下,优点尤为突出。

需要注意的是,只有使用原生连接的连接器,才能使用异步查询功能。

import time
from ctypes import *

from taos import *


def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)

if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None

for row in result.rows_iter(num_of_rows):
print(row)
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)


def query_callback(p_param, p_result, code):
if p_result is None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)


class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]

def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)


def test_query(conn):
counter = Counter(count=0)
conn.query_a("select ts, current, voltage from power.meters", query_callback, byref(counter))

while not counter.done:
print(counter)
time.sleep(1)
print(counter)
conn.close()


if __name__ == "__main__":
test_query(connect())

# possible output:
# { count: 0, done: False }
# fetched 8 rows
# 1538548685000 10.300000 219
# 1538548695000 12.600000 218
# 1538548696800 12.300000 221
# 1538548696650 10.300000 218
# 1538548685500 11.800000 221
# 1538548696600 13.400000 223
# 1538548685500 10.800000 223
# 1538548686500 11.500000 221
# fetched 0 rows
# fetching completed
# { count: 8, done: True }

查看源码

note

这个示例程序,目前在 Windows 系统上还无法运行