流计算
在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存,而且经常还需要使用原始的时序数据通过计算 生成新的时序数据。在传统的时序数据解决方案中,常常需要部署 Kafka、Flink 等流处理系统,而流处理系统的复杂性,带来了高昂的开发与运维成本。
TDengine 的流计算引擎提供了实时处理写入的数据流的能力,使用 SQL 定义实时流变换,当数据被写入流的源表后,数据会被以定义的方式自动处理,并根据定义的触发模式向目的表推送结果。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟。
流计算可以包含数据过滤,标量函数计算(含 UDF),以及窗口聚合(支持滑动窗口、会话窗口与状态窗口),能够以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition,不同的 partition 将写入到目的超级表的不同子表。
TDengine 的流计算能够支持分布在多个节点中的超级表聚合,能够处理乱序数据的写入。它提供 watermark 机制以度量容忍数据乱序的程度,并提供了 ignore expired 配置项以决定乱序数据的处理策略 —— 丢弃或者重新计算。
下面详细介绍流计算使用的具体方法。
创建流计算
语法如下:
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name
[(field1_name, ...)] [TAGS (column_definition [, column_definition] ...)]
SUBTABLE(expression) AS subquery
stream_options: {
TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
WATERMARK time
IGNORE EXPIRED [0|1]
DELETE_MARK time
FILL_HISTORY [0|1]
IGNORE UPDATE [0|1]
}
column_definition:
col_name col_type [COMMENT 'string_value']
其中 subquery 是 select 普通查询语法的子集。
subquery: SELECT select_list
from_clause
[WHERE condition]
[PARTITION BY tag_list]
[window_clause]
window_cluse: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
| COUNT_WINDOW(count_val[, sliding_val])
}
subquery 支持会话窗口、状态窗口与滑动窗口。其中,会话窗口与状态窗口搭配超级表时必须与 partition by tbname 一起使用。
-
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间间隔超过 tol_val,则自动开启下一个窗口。
-
EVENT_WINDOW 是事件窗口,根据开始条件和结束条件来划定窗口。当 start_trigger_condition 满足时则窗口开始,直到 end_trigger_condition 满足时窗口关闭。 start_trigger_condition 和 end_trigger_condition 可以是任意 TDengine 支持的条件表达式,且可以包含不同的列。
-
COUNT_WINDOW 是计数窗口,按固定的数据行数来划分窗口。 count_val 是常量,是正整数,必须大于等于 2,小于 2147483648。 count_val 表示每个 COUNT_WINDOW 包含的最大数据行数,总数据行数不能整除 count_val 时,最后一个窗口的行数会小于 count_val 。 sliding_val 是常量,表示窗口滑动的数量,类似于 INTERVAL 的 SLIDING 。
窗口的定义与时序数据窗口查询中的定义完全相同,具体可参考 TDengine 窗口函数部分。
如下 SQL 将创建一个流计算,执行后 TDengine 会自动创建名为avg_vol 的超级表,此流计算以 1min 为时间窗口、30s 为前向增量统计这些智能电表的平均电压,并将来自 meters 的数据的计算结果写入 avg_vol,不同分区的数据会分别创建子表并写入不同子表。
CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstart, count(*), avg(voltage) FROM power.meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
本节涉及的相关参数的说明如下。
- stb_name 是保存计算结果的超级表的表名,如果该超级表不存在,则会自动创建;如果已存在,则检查列的 schema 信息。详见 6.3.8 节。
- tags 子句定义了流计算中创建标签的规则。通过 tags 字段可以为每个分区对应的子表生成自定义的标签值。