查询数据
主要查询功能
TDengine 采用 SQL 作为查询语言。应用程序可以通过 REST API 或连接器发送 SQL 语句,用户还可以通过 TDengine 命令行工具 taos 手动执行 SQL 即席查询(Ad-Hoc Query)。TDengine 支持如下查询功能:
- 单列、多列数据查询
- 标签和数值的多种过滤条件:>, <, =, <>, like 等
- 聚合结果的分组(Group by)、排序(Order by)、约束输出(Limit/Offset)
- 时间窗口(Interval)、会话窗口(Session)和状态窗口(State_window)等窗口切分聚合查询
- 数值列及聚合结果的四则运算
- 时间戳对齐的连接查询(Join Query: 隐式连接)操作
- 多种聚合/计算函数: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff 等
例如:在命令行工具 taos 中,从表 d1001 中查询出 voltage > 215 的记录,按时间降序排列,仅仅输出 2 条。
select * from test.d101 where voltage > 215 order by ts desc limit 2;
ts | current | voltage | phase |
======================================================================================
2018-10-03 14:38:16.800 | 12.30000 | 221 | 0.31000 |
2018-10-03 14:38:15.000 | 12.60000 | 218 | 0.33000 |
Query OK, 2 row(s) in set (0.001100s)
为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。
具体的查询语法请看 TDengine SQL 的数据查询 章节。
多表聚合查 询
物联网场景中,往往同一个类型的数据采集点有多个。TDengine 采用超级表(STable)的概念来描述某一个类型的数据采集点,一张普通的表来描述一个具体的数据采集点。同时 TDengine 使用标签来描述数据采集点的静态属性,一个具体的数据采集点有具体的标签值。通过指定标签的过滤条件,TDengine 提供了一高效的方法将超级表(某一类型的数据采集点)所属的子表进行聚合查询。对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样。
示例一
在 TDengine CLI ,查找加利福尼亚州所有智能电表采集的电压平均值,并按照 location 分组。
SELECT AVG(voltage), location FROM test.meters GROUP BY location;
location | avg(voltage) |
=======================================================
California.PaloAlto | 109.507000000 |
California.Sunnyvale | 109.507000000 |
California.MountainView | 109.507000000 |
California.SanFrancisco | 109.507000000 |
California.SanJose | 109.507000000 |
California.SanDiego | 109.507000000 |
California.SantaClara | 109.507000000 |
California.Cupertino | 109.507000000 |
California.Campbell | 109.507000000 |
California.LosAngles | 109.507000000 |
Query OK, 10 row(s) in set
示例二
在 TDengine CLI taos
, 查找 groupId 为 2 的所有智能电表过去24小时的记录条数和电流的最大值。
SELECT count(*), max(current) FROM test.meters where groupId = 2 and ts > now - 24h;
count(*) | max(current) |
==================================
5 | 13.4 |
Query OK, 1 row(s) in set (0.002136s)
在 TDengine SQL 的数据查询 一章,查询类操作都会注明是否支持超级表。
降采样查询、插值
物联网场景里,经常需要通过降采样(down sampling)将采集的数据按时间段进行聚合。TDengine 提供了一个简便的关键词 interval 让按照时间窗口的查询操作变得极为简单。比如,将智能电表 d1001 采集的电流值每 10 秒钟求和
taos> SELECT _wstart, sum(current) FROM test.d1001 INTERVAL(10s);
_wstart | sum(current) |
======================================================
2018-10-03 14:38:00.000 | 10.300000191 |
2018-10-03 14:38:10.000 | 24.900000572 |
Query OK, 2 rows in database (0.003139s)
降采样操作也适用于超级表,比如:将加利福尼亚州所有智能电表采集的电流值每秒钟求和
SELECT _wstart, SUM(current) FROM test.meters where location like "California%" INTERVAL(1s);
_wstart | sum(current) |
======================================================
2018-10-03 14:38:04.000 | 10.199999809 |
2018-10-03 14:38:05.000 | 23.699999809 |
2018-10-03 14:38:06.000 | 11.500000000 |
2018-10-03 14:38:15.000 | 12.600000381 |
2018-10-03 14:38:16.000 | 34.400000572 |
Query OK, 5 rows in database (0.007413s)
降采样操作也支持时间偏移,比如:将所有智能电表采集的电流值每秒钟求和,但要求每个时间窗口从 500 毫秒开始
SELECT _wstart, SUM(current) FROM test.meters INTERVAL(1s, 500a);
_wstart | sum(current) |
======================================================
2018-10-03 14:38:03.500 | 10.199999809 |
2018-10-03 14:38:04.500 | 10.300000191 |
2018-10-03 14:38:05.500 | 13.399999619 |
2018-10-03 14:38:06.500 | 11.500000000 |
2018-10-03 14:38:14.500 | 12.600000381 |
2018-10-03 14:38:16.500 | 34.400000572 |
Query OK, 6 rows in database (0.005515s)
物联网场景里,每个数据采集点采集数据的时间是难同步的,但很多分析算法(比如 FFT)需要把采集的数据严格按照时间等间隔的对齐,在很多系统里,需要应用自己写程序来 处理,但使用 TDengine 的降采样操作就轻松解决。
如果一个时间间隔里,没有采集的数据,TDengine 还提供插值计算的功能。
语法规则细节请见 TDengine SQL 的按时间窗口切分聚合 章节。
连接器样例
- 在执行下面样例代码的之前,您必须先在 TDengine Cloud - 数据浏览器 页面创建一个名为 power 的数据库,并插入数据
- 如何在代码中建立和 TDengine Cloud 的连接,请参考 开发指南-建立连接。
- Python
- Java
- Go
- Rust
- Node.js
- C#
在这个例子里面,我们使用 query
方法去执行 SQL,然后获取 result
对象。
result = conn.query("SELECT ts, current FROM power.meters LIMIT 2")
从 result
对象里面获取列的元数据,包括列名,列类型和列的长度。
print(result.fields)
# output: [{'name': 'ts', 'type': 'TIMESTAMP', 'bytes': 8}, {'name': 'current', 'type': 'FLOAT', 'bytes': 4}]
从 result
获得总行数:
print(result.rows)
# output: 2
在每一行上面迭代:
for row in result:
print(row)
# output:
# [datetime.datetime(2018, 10, 3, 14, 38, 5), 10.3]
# [datetime.datetime(2018, 10, 3, 14, 38, 10), 10.3]
在这个例子中,我们使用 Statement
对象的 executeQuery
方法并获取 ResultSet
对象。
ResultSet result = stmt.executeQuery("SELECT ts, current FROM power.meters LIMIT 2");
从结果里面得到列的元数据:
// print column names
ResultSetMetaData meta = result.getMetaData();
System.out.println(meta.getColumnLabel(1) + "\t" + meta.getColumnLabel(2));
// output: ts current
在结果上面迭代打印每一行数据:
while(result.next()) {
System.out.println(result.getTimestamp(1) + "\t" + result.getFloat(2));
}
// output:
//2018-10-03 14:38:05.0 10.3
//2018-10-03 14:38:15.0 12.6
在这个例子中,我们使用 Query
方法执行 SQL 并获取了一个 sql.Rows
对象。
rows, err := taos.Query("SELECT ts, current FROM power.meters LIMIT 2")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
}
defer rows.Close()
从结果行里面获取列名:
// print column names
colNames, _ := rows.Columns()
fmt.Println(colNames)
在每一行上面迭代并打印每一行数据:
for rows.Next() {
var r struct {
ts time.Time
current float32
}
err := rows.Scan(&r.ts, &r.current)
if err != nil {
fmt.Println("scan error:\n", err)
return
}
fmt.Println(r.ts, r.current)
}
// 2018-10-03 14:38:05 +0000 UTC 10.3
// 2018-10-03 14:38:15 +0000 UTC 12.6
在这个例子里面,我们使用查询方法来执行 SQL ,然后获取到 result 对象。
let mut result = taos.query("SELECT * FROM power.meters limit 5").await?;
从结果里面获取列的元数据:
let fields = result.fields();
for column in fields {
println!("name: {}, type: {:?} , bytes: {}", column.name(), column.ty(), column.bytes());
}
// output
// name: ts, type: Timestamp , bytes: 8
// name: current, type: Float , bytes: 4
// name: voltage, type: Int , bytes: 4
// name: phase, type: Float , bytes: 4
// name: location, type: VarChar , bytes: 64
// name: groupid, type: Int , bytes: 4
获取前5行数据并输出每一行数据:
let rows = result.rows();
rows.try_for_each(|row| async {
println!("{}", row.into_value_iter().join(","));
Ok(())
}).await?;
// output
// 2018-10-03T14:39:05+08:00,12.3,219,0.31,California.SanFrancisco,2
// 2018-10-03T14:39:15+08:00,12.6,218,0.33,California.SanFrancisco,2
// 2018-10-03T14:39:16.800+08:00,12.3,221,0.31,California.SanFrancisco,2
// 2018-10-03T14:39:16.650+08:00,23.4,218,0.25,California.SanFrancisco,3
const taos = require('@tdengine/websocket');
var url = process.env.TDENGINE_CLOUD_URL;
async function queryData() {
let conn = null;
try {
let conf = new taos.WSConfig(url);
conn = await taos.sqlConnect(conf);
let res = await conn.query('show databases');
while (await res.next()) {
let row = res.getData();
console.log(row[0]);
}
} catch (err) {
throw err;
} finally {
if (conn) {
await conn.close();
}
}
}
queryData();
在这个例子里面,我们使用查询方法来执行 SQL ,然后获取到 result 对象。
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="3.0.*" GeneratePathProperty="true" />
</ItemGroup>
<Target Name="copyDLLDependency" BeforeTargets="BeforeBuild">
<ItemGroup>
<DepDLLFiles Include="$(PkgTDengine_Connector)\runtimes\**\*.*" />
</ItemGroup>
<Copy SourceFiles="@(DepDLLFiles)" DestinationFolder="$(OutDir)" />
</Target>
</Project>
string selectTable = "select * from test.meters";
res = LibTaosWS.WSQueryTimeout(conn, selectTable, 5000);
ValidQueryExecution(res);
// get meta info of the retrieved data as List
List<TDengineMeta> metas = LibTaosWS.WSGetFields(res);
Console.WriteLine(metas.Count);
// get data of the retrieved data as List.
List<object> dataSet = LibTaosWS.WSGetData(res);
Console.WriteLine(dataSet.Count);
// Free the query result every time when used up it.
LibTaosWS.WSFreeResult(res);
void ValidQueryExecution(IntPtr res)
{
int code = LibTaosWS.WSErrorNo(res);
if (code != 0)
{
throw new Exception($"execute SQL failed: reason: {LibTaosWS.WSErrorStr(res)}, code:{code}");
}
}