流式计算
与传统的流计算相比,TDengine 的流计算进行了功能和边界上的扩展。传统定义的流计算是一种以低延迟、持续性和事件时间驱动为核心,处理无界数据流的实时计算范式。TDengine 的流计算采用的是触发与计算分离的策略,处理的依然是持续的无界的数据流,但是进行了以下几个方面的扩展:
- 处理对象的扩展:传统流计算的事件驱动对象与计算对象往往是统一的,根据同一份数据产生事件和计算。TDengine 的流计算支持触发(事件驱动)与计算的分离,也就意味着触发对象可以与计算对象进行分离。触发表与计算的数据源表可以不相同,甚至可以不需要触发表,处理的数据集合无论是列、时间范围都可以不相同。
- 触发方式的扩展:除了数据写入触发方式外,TDengine 的流计算支持更多触发方式的扩展。通过支持窗口触发,用户可以灵活的定义和使用各种方式的窗口来产生触发事件,可以选择在开窗、关窗以及开关窗同时进行触发。除了与触发表关联的事件时间驱动外,还支持与事件时间无关的驱动,即定时触发。在事件触发之前,还支持对触发数据进行预先过滤处理,只有符合条件的数据才会进入触发判断。
- 计算的扩展:既可以对触发表进行计算,也可以对其他库、表进行计算。计算类型不受限制,支持任何查询语句。计算结果的应用可根据需要进行选择,支持发送通知、写入输出表,也可以两者同时使用。
TDengine 的流计算引擎还提供了其他使用上的便利。针对结果延迟的不同需求,支持用户在结果时效性与资源负载之间进行平衡。针对非正常顺序写入场景的不同需求,支持用户灵活选择适合的处理方式与策略。
说明:全新流计算从 v3.3.7.0
开始支持。
创建流式计算
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]
options: {
trigger_type [FROM [db_name.]table_name] [PARTITION BY col1 [, ...]] [STREAM_OPTIONS(stream_option [|...])] [notification_definition]
}
trigger_type: {
PERIOD(period_time[, offset_time])
| [INTERVAL(interval_val[, interval_offset])] SLIDING(sliding_val[, offset_time])
| SESSION(ts_col, session_val)
| STATE_WINDOW(col) [TRUE_FOR(duration_time)]
| EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(duration_time)]
| COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]])
}
stream_option: {WATERMARK(duration_time) | EXPIRED_TIME(exp_time) | IGNORE_DISORDER | DELETE_RECALC | DELETE_OUTPUT_TABLE | FILL_HISTORY[(start_time)] | FILL_HISTORY_FIRST[(start_time)] | CALC_NOTIFY_ONLY | LOW_LATENCY_CALC | PRE_FILTER(expr) | FORCE_OUTPUT | MAX_DELAY(delay_time) | EVENT_TYPE(event_types) | IGNORE_NODATA_TRIGGER}
notification_definition:
NOTIFY(url [, ...]) [ON (event_types)] [WHERE condition] [NOTIFY_OPTIONS(notify_option[|notify_option])]
notify_option: [NOTIFY_HISTORY | ON_FAILURE_PAUSE]
event_types:
event_type [|event_type]
event_type: {WINDOW_OPEN | WINDOW_CLOSE}
tag_definition:
tag_name type_name [COMMENT 'string_value'] AS expr
流式计算的触发方式
事件触发是流计算的驱动方式,事件触发产生的来源可能多种多样,可以来自于某个表的数据写入,也可以来自于对某个表的计算分析结果,甚至可以不来自于任何表。当流计算引擎检测到符合用户定义的触发条件时,就会触发计算,条件符合次数和计算触发次数是相同的,触发对象与计算对象彼此分离。用户可以灵活的定义和使用各种窗口来产生触发事件,支持在开窗、关窗以及开关窗同时进行触发,支持分组触发,支持对触发数据进行预先过滤处理。
触发类型
触发类型通过 trigger_type
指定,包括定时触发、滑动触发、滑动窗口触发、会话窗口触发、状态窗口触发、事件窗口触发、计数窗口触发。其中,状态窗口、事件窗口和计数窗口搭配超级表时,必须与 partition by tbname
一起使用。
定时触发
PERIOD(period_time[, offset_time])
定时触发通过系统时间的固定间隔来驱动,本质上就是我们常说的定时任务。定时触发不属于窗口触发。各参数含义如下:
- period_time:定时间隔,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)、天 (d),支持的时间范围为
[10a, 3650d]
。 - offset_time:可选,定时偏移,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h),偏移大小应该小于 1 天。
使用说明:
- 定时间隔小于 1 天时,基准时间点为每日零点加定时偏移,根据定时间隔来确定下次触发的时间点。基准时间点在每日零点重置。每日最后一次触发的时间点与下一日的基准时间点之间的间隔可能小于定时间隔。例如:
- 定时间隔为 5 小时 30 分钟,那么当天的触发时刻为
[00:00, 05:30, 11:00, 16:30, 22:00]
,后续每一天的触发时刻都是相同的。 - 同样的定时间隔,如果指定时间偏移为 1 秒,那么当天的触发时刻为
[00:01, 05:31, 11:01, 16:31, 22:01]
,后续每一天的触发时刻都是相同的。 - 同样条件下,如果建流时当前系统时间为
12:00
,那么当天的触发时刻为[16:31, 22:01]
,后续每一天内的触发时刻为[00:01, 05:31, 11:01, 16:31, 22:01]
。
- 定时间隔为 5 小时 30 分钟,那么当天的触发时刻为
- 定时间隔大于等于 1 天时,基准时间点为当日的零点加定时偏移,后续不会重置。例如:
- 定时间隔为 1 天 1 小时,建流时当前系统时间为
05-01 12:00
,那么在当天及随后几天的触发时刻为[05-02 01:00, 05-03 02:00, 05-04 03:00, 05-05 04:00, ……]
。 - 同样条件下,如果指定时间偏移为 1 秒,那么当天及随后几天的触发时刻为
[05-02 01:01, 05-03 02:02, 05-04 03:03, 05-05 04:04, ……]
。
- 定时间隔为 1 天 1 小时,建流时当前系统时间为
适用场景:需要按照系统时间连续定时驱动计算的场景,例如每小时计算生成一次当天的统计数据,每天定时发送统计报告等。
滑动触发
SLIDING(sliding_val[, offset_time])
滑动触发是指对触发表的写入数据按照事件时间的固定间隔来驱动的触发。不可以指定 INTERVAL 窗口,不属于窗口触发,必须指定触发表。滑动触发的触发时刻、时间偏移规则和定时触发相同,唯一的区别是系统时间变更为事件时间。
各参数含义如下:
- sliding_val:必选,事件时间的滑动时长。
- offset_time:可选,指定滑动触发的时间偏移,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)。
使用说明:
- 必须指定触发表,触发表为超级表时支持按标签、子表分组,支持不分组。
- 支持对写入数据进行处理过滤后(有条件)的滑动触发。
适用场景:需要按照事件时间连续定时驱动计算的场景,例如每小时计算生成一次当天的统计数据,每天定时发送统计报告等场景。
滑动窗口触发
[INTERVAL(interval_val[, interval_offset])] SLIDING(sliding_val)
滑动窗口触发是指对触发表的写入数据按照事件时间和固定窗口大小滑动而形成的触发,必须指定 INTERVAL 窗口,属于窗口触发,必须指定触发表。
滑动窗口触发的起始时间点是窗口的起始点,窗口默认是从 Unix time 0(1970-01-01 00:00:00 UTC)开始划分,可以通过指定窗口时间偏移的方式来改变窗口的划分起始点。各参数含义如下:
- interval_val:可选,滑动窗口的时长。
- interval_offset:可选,滑动窗口的时间偏移。
- sliding_val:必选,事件时间的滑动时长。
使用说明:
- 必须指定触发表,触发表为超级表时支持按标签、子表分组,支持不分组。
- 支持对写入数据进行处理过滤后(有条件)的滑动窗口触发。
适用场景:需要按照事件时间定时窗口计算的场景,例如每小时计算生成该小时内的统计数据,每隔 1 小时计算最后 5 分钟窗口内的数据等场景。
会话窗口触发
SESSION(ts_col, session_val)
会话窗口触发是指对触发表的写入数据按照会话窗口的方式进行窗口划分,当窗口启动和(或)关闭时进行的触发。各参数含义如下:
- ts_col:主键列名。
- session_val:属于同一个会话的最大时间间隔,间隔小于等于
session_val
的记录都属于同一个会话。
使用说明:
- 必须指定触发表,触发表为超级表时支持按标签、子表分组,支持不分组。
- 支持对写入数据进行处理过滤后(有条件)的窗口触发。
适用场景:需要通过会话窗口驱动计算和(或)通知的场景。
状态窗口触发
STATE_WINDOW(col) [TRUE_FOR(duration_time)]
状态窗口触发是指对触发表的写入数据按照状态窗口的方式进行窗口划分,当窗口启动和(或)关闭时进行的触发。各参数含义如下:
- col:状态列的列名。
- duration_time:可选,指定窗口的最小持续时长,如果某个窗口的时长低于该设定值,则会自动舍弃,不产生触发。
使用说明:
- 必须指定触发表,触发表为超级表时支持按标签、子表分组,支持不分组。
- 搭配超级表时,必须与
partition by tbname
一起使用。 - 支持对写入数据进行处理过滤后(有条件)的窗口触发。
适用场景:需要通过状态窗口驱动计算和(或)通知的场景。
事件窗口触发
EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(duration_time)]
事件窗口触发是指对触发表的写入数据按照事件窗口的方式进行窗口划分,当窗口启动和(或)关闭时进行的触发。各参数含义如下:
- start_condition:事件开始条件的定义。
- end_condition:事件结束条件的定义。
- duration_time:可选,指定窗口的最小持续时长,如果某个窗口的时长低于该设定值,则会自动舍弃,不产生触发。
使用说明:
- 必须指定触发表,触发表为超级表时支持按标签、子表分组,支持不分组。
- 搭配超级表时,必须与
partition by tbname
一起使用。 - 支持对写入数据进行处理过滤后(有条件)的窗口触发。
适用场景:需要通过事件窗口驱动计算和(或)通知的场景。
计数窗口触发
COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]])
计数窗口触发是指对触发表的写入数据按照计数窗口的方式进行窗口划分,当窗口启动和(或)关闭时进行的触发。支持列的触发,只有当指定的列有数据写入时才触发。各参数含义如下:
- count_val:计数条数,当写入数据条目数达到
count_val
时触发,最小值为 1。 - sliding_val:可选,窗口滑动的条数。
- col1 [, ...]:可选,按列触发模式时的触发列列表,只支持普通列,列表中任一列有非空数据写入时才为有效条目,NULL 值视为无效值。
使用说明:
- 必须指定触发表,触发表为超级表时支持按标签、子表分组,支持不分组。
- 搭配超级表时,必须与
partition by tbname
一起使用。 - 支持对写入数据进行处理过滤后(有条件)的窗口触发。
适用场景:
- 需要对每条数据进行处理的场景,例如故障数据写入、采样数据写入等场景。
- 需要根据某些列特定值进行处理的场景,例如异常值写入场景。
- 需要批量处理数据的场景,例如每写入 1000 条电压数据求平均值场景。
触发动作
触发后可以根据需要执行不同的动作,比如发送事件通知、执行计算或者两者同时进行。
- 只通知不计算:通过
WebSocket
方式向外部应用发送事件通知。 - 只计算不通知:执行任意一个查询并保存结果到流计算的输出表中。
- 既通知又计算:执行任意一个查询,同时发送计算结果或事件通知给外部应用。
触发表与分组
通常意义来说,一个流计算只对应一个计算,比如根据一个子表触发和产生一个计算,结果保存到一张表中。根据 TDengine 一个设备一张表的设计理念,如果需要对所有设备分别计算,那就需要为每个子表创建一个流计算,这会造成使用的不便和处理效率的降低。为了解决这个问题,TDengine 的流计算支持触发分组,分组是流计算的最小执行单元,从逻辑上可以认为每个分组对应一个单独的流计算,每个分组对应一个输出表和单独的事件通知。如果未指定分组或未指定触发表(定时触发方式允许),那么整个流计算将只产生一个计算,可以认为此时只有一个分组,最终对应一个输出表和通知。由于每个分组都具有独立的流计算,所以每个分组的计算进度、输出频率等都是不同的。
**总结来说,一个流计算输出表(子表或普通表)的个数与触发表的分组个数相同,未指定分组时只产生一个输出表(普通表)。**目前支持的触发方式与分组组合如下:
触发方式 | 支持的分组类型 |
---|---|
PERIOD、SLIDING、INTERVAL、SESSION | 按子表分组、按标签分组、不分组 |
其他窗口触发 | 按子表分组 |
触发表
触发表可以为普通表、超级表、子表、虚拟表,不支持系统表、视图、查询。除定时触发可不指定触发表外,其他触发方式必须指定触发表。
[FROM [db_name.]table_name]
触发分组
指定触发的分组列,支持多列,目前只支持按照子表和标签进行分组。
[PARTITION BY col1 [, ...]]
流式计算的结果输出
流计算的计算结果默认会保存到输出表中,每个输出表中的计算结果是截至当前时刻已经触发和计算完成的输出。可以指定输出表的结构定义,如果存在分组还可以指定子表的标签值。
[INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])]
tag_definition:
tag_name type_name [COMMENT 'string_value'] AS expr
说明如下:
- INTO [db_name.]table_name:可选,指定输出表的表名为
table_name
和所在数据库名db_name
。- 存在触发分组时该表为超级表。
- 不存在触发分组时该表为普通表。
- 只触发通知不计算,或计算结果只通知不保存时,不需要指定。
- [OUTPUT_SUBTABLE(tbname_expr)]:可选,指定每个触发分组的计算输出表(子表)名,没有触发分组时不可以指定。未指定时自动为每个分组生成唯一的输出表(子表)名。
tbname_expr
为任意输出字符串的表达式,可根据需要选择触发表分组列(来自[PARTITION BY col1[, ...]]
),输出长度不能超过表名最大长度,超过时截断处理。如果不希望不同分组输出到同一子表中,用户需确保每个分组输出表名都是唯一的。 - [(column_name1, column_name2 [COMPOSITE KEY][, ...])]:可选,指定输出表的每列列名,未指定时每列列名与计算结果的每列列名相同。可以通过
[COMPOSITE KEY]
指定第二列为主键列,与第一列共同组成复合主键。 - [TAGS (tag_definition [, ...])]:可选,指定输出超级表的标签列定义与值的列表,只有存在触发分组时才可以指定。未指定时,标签列的定义和值来自于所有分组列,此时分组列中不可以存在相同的列名。当按子表分组时,默认产生的标签列名为
tag_tbname
,类型为VARCHAR(270)
。具体的tag_definition
参数说明如下:- tag_name:标签列名
- type_name:标签列类型
- string_value:标签列说明
- expr:标签值计算表达式,可根据需要选择任意触发表分组列(来自
[PARTITION BY col1[, ...]]
)。
流式计算的计算任务
[AS subquery]
计算任务是流在事件触发后执行的计算动作,可以是任意类型的查询语句,既可以对触发表进行计算,也可以对其他库表进行计算。计算任务的灵活度很高,需在建流前进行合理的设计。注意事项如下:
- 查询输出的第一列将作为输出表的主键列:要求查询输出的第一列为合法的主键数值(Timestamp),如果列类型不符建流时会报错,如果运算过程中出现 NULL 值则对应的计算结果会被丢弃处理。
- 每个触发分组的计算结果会写入到该分组的同一个输出表(子表或普通表):如果查询语句也包含分组子句,分组结果中相同主键的记录会产生覆盖。如果需要使用分组,建议为输出表定义复合主键。
占位符
计算时可能需要使用触发时的关联信息,这些信息在 SQL 语句中以占位符的形式出现,在每次计算时会被作为常量替换到 SQL 语句中。包括:
触发方式 | 占位符 | 含义与说明 |
---|---|---|
定时触发 | _tprev_localtime | 上一次触发时刻的系统时间(精度:ns) |
定时触发 | _tnext_localtime | 下一次触发时刻的系统时间(精度:ns) |
滑动触发 | _tprev_ts | 上一次触发的事件时间(精度同记录) |
滑动触发 | _tcurrent_ts | 本次触发的事件时间(精度同记录) |
滑动触发 | _tnext_ts | 下一次触发的事件时间(精度同记录) |
窗口触发 | _twstart | 本次触发窗口的起始时间戳 |
窗口触发 | _twend | 本次触发窗口的结束时间戳,只适用于 WINDOW_CLOSE 触发使用 |
窗口触发 | _twduration | 本次触发窗口的持续时间,只适用于 WINDOW_CLOSE 触发使用 |
窗口触发 | _twrownum | 本次触发窗口的记录条数,只适用于 WINDOW_CLOSE 触发使用 |
通用 | _tgrpid | 触发分组的 ID 值,类型为 BIGINT |
通用 | _tlocaltime | 本次触发时刻的系统时间(精度:ns) |
通用 | %%n | 触发分组列的引用 n 为分组列(来自 [PARTITION BY col1[, ...]] )的下标(从 1 开始) |
通用 | %%tbname | 触发表每个分组表名的引用 只有触发分组含 tbname 时可用 可作为查询表名使用( FROM %%tbname ) |
通用 | %%trows | 触发表每个分组的触发数据集(满足本次触发的数据集)的引用 定时触发时为上次与本次触发之间写入的触发表数据 只可作为查询表名使用( FROM %%trows )只适用于 WINDOW_CLOSE 触发使用推荐在小数据量场景下使用 |
使用限制:
- %%trows:只能用于 FROM 子句,在使用 %%trows 的语句中不支持 where 条件过滤,不支持对 %%trows 进行关联查询。
- %%tbname:可以用于 FROM、SELECT 和 WHERE 子句。
- 其他占位符:只能用于 SELECT 和 WHERE 子句。
流式计算的控制选项
[STREAM_OPTIONS(stream_option [|...])]
stream_option: {WATERMARK(duration_time) | EXPIRED_TIME(exp_time) | IGNORE_DISORDER | DELETE_RECALC | DELETE_OUTPUT_TABLE | FILL_HISTORY[(start_time)] | FILL_HISTORY_FIRST[(start_time)] | CALC_NOTIFY_ONLY | LOW_LATENCY_CALC | PRE_FILTER(expr) | FORCE_OUTPUT | MAX_DELAY(delay_time) | EVENT_TYPE(event_types) | IGNORE_NODATA_TRIGGER}
控制选项用于控制触发和计算行为,可以多选,同一个选项不可以多次指定。包括:
- WATERMARK(duration_time):指定数据乱序的容忍时长,超过该时长的数据会被当做乱序数据,根据不同触发方式的乱序数据处理策略和用户配置进行处理,未指定时默认
duration_time
值为 0。 - EXPIRED_TIME(exp_time) :指定过期数据间隔并忽略过期数据,未指定时无过期数据。不需要感知超过一定时间范围的数据写入或更新时可以指定。
exp_time
为过期时间间隔,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)、天 (d)。 - IGNORE_DISORDER:指定忽略触发表的乱序数据,未指定时不忽略乱序数据。注重计算或通知的时效性、触发表乱序数据不影响计算结果等场景可以指定。乱序数据既包括新的乱序数据的写入,也包括对已写入数据的更新操作。
- DELETE_RECALC: 指定触发表的数据删除(包含触发子表被删除场景)需要自动重新计算,只有触发方式支持数据删除的自动重算才可以指定。未指定时忽略数据删除,只有触发表数据删除会影响计算结果的场景才需要指定。
- DELETE_OUTPUT_TABLE:指定触发子表被删除时其对应的输出子表也需要被删除,只适用于按子表分组的场景,未指定时触发子表被删除不会删除其输出子表。
- FILL_HISTORY[(start_time)]:指定需要从
start_time
(事件时间)开始触发历史数据计算,未指定时从最早的记录开始触发计算。如果未指定FILL_HISTORY
和FILL_HISTORY_FIRST
,则不进行历史数据的触发计算。该选项不能与FILL_HISTORY_FIRST
同时指定。定时触发(PERIOD)模式下不支持历史计算。 - FILL_HISTORY_FIRST[(start_time)]:指定需要从
start_time
(事件时间)开始优先触发历史数据计算,未指定时从最早的记录开始触发计算。该选项适合在需要按照时间顺序计算历史数据且历史数据计算完成前不需要实时计算的场景下指定,未指定时优先实时计算,不能与FILL_HISTORY
同时指定。定时触发(PERIOD)模式下不支持历史计算。 - CALC_NOTIFY_ONLY:指定计算结果只发送通知,不保存到输出表,未指定时默认会保存到输出表。
- LOW_LATENCY_CALC:指定触发后需要低延迟的计算或通知,单次触发发生后会立即启动计算或通知。低延迟的计算或通知会保证实时流计算任务的时效性,但是也会造成处理效率的降低,有可能需要更多的处理资源才能满足需求,因此只推荐在业务有强时效性要求时使用。未指定时单次触发发生后有可能不会立即进行计算,采用批量计算与通知的方式来达到较好的资源利用效率。
- PRE_FILTER(expr) :指定在触发进行前对触发表进行数据过滤处理,只有符合条件的数据才会进入触发判断,
expr
中可以包含列、标签、常量及其标量与逻辑运算。例如:col1 > 0
则只有 col1 为正数的数据行可以进行触发,未指定时无触发表数据过滤。 - FORCE_OUTPUT:指定计算结果强制输出选项,当某次触发没有计算结果时将强制输出一行数据,除常量外(含常量对待列)其他列的值都为 NULL,后续版本会增加更多填充策略。
- MAX_DELAY(delay_time):指定在窗口未关闭时的最长等待的时长(处理时间),从窗口开启时每经过该时间段且窗口仍未关闭时产生触发,非窗口触发时自动忽略。当窗口触发存在
TRUE_FOR
条件且TRUE_FOR
时长大于MAX_DELAY
时,MAX_DELAY
仍然生效 (即使最终当前窗口未满足TRUE_FOR
条件)。delay_time
为等待时长,支持的时间单位包括:秒 (s)、分 (m)、小时 (h)、天 (d),最小允许的值为 3 秒,误差范围在 1 秒以内,当计算时长超过delay_time
时忽略期间的MAX_DELAY
触发。WATERMARK
的判断逻辑早于窗口判定,因此可能出现设定max_delay
但仍未产生触发的情况,这是由于窗口并未真正开启。 - EVENT_TYPE(event_types):指定窗口触发的事件类型,可以多选,未指定时默认值为
WINDOW_CLOSE
。SLIDING 触发(不带 INTERVAL)和 PERIOD 触发不适用(自动忽略)。各选项含义如下:- WINDOW_OPEN:窗口启动事件。
- WINDOW_CLOSE:窗口关闭事件。
- IGNORE_NODATA_TRIGGER:指定忽略触发表无输入数据时的触发,适用于滑动触发(SLIDING)、滑动窗口触发(INTERVAL)、定时触发(PERIOD)。
- 滑动触发与定时触发:如果两次触发时刻中间触发表没有数据则忽略该次触发。
- 滑动窗口触发:如果窗口内触发表没有数据则忽略该次触发。
- 没有未指定时:不忽略无输入数据时的触发。
流式计算的通知机制
事件通知是流在事件触发后可选的执行动作,支持通过 WebSocket
协议发送事件通知到应用。用户通过 notification_definition
来指定需要通知的事件,以及用于接收通知消息的目标地址。通知内容可以包含计算结果,也可以在没有计算结果时只通知事件相关信息。
[notification_definition]
notification_definition:
NOTIFY(url [, ...]) [ON (event_types)] [WHERE condition] [NOTIFY_OPTIONS(notify_option[|notify_option])]
event_types:
event_type [|event_type]
event_type: {WINDOW_OPEN | WINDOW_CLOSE | ON_TIME}
详细说明如下:
- url [, ...]:指定通知的目标地址,必须包括协议、IP 或域名、端口号,并允许包含路径、参数,整个 url 需要包含在引号内。目前仅支持 WebSocket 协议。例如:
ws://localhost:8080
、ws://localhost:8080/notify
、ws://localhost:8080/notify?key=foo
。 - [ON (event_types)]:指定需要通知的事件类型,可多选。SLIDING(不带 INTERVAL)和 PERIOD 触发不需要指定,其他触发必须指定,支持的事件类型有:
- WINDOW_OPEN:窗口打开事件,在触发表分组窗口打开时发送通知。
- WINDOW_CLOSE:窗口关闭事件,在触发表分组窗口关闭时发送通知。
- ON_TIME: 定时触发事件,在触发时发送通知。
- [WHERE condition]:指定通知需要满足的条件,
condition
中只能指定含计算结果列和(或)常量的条件。 - [NOTIFY_OPTIONS(notify_option[|notify_option])]:可选,指定通知选项用于控制通知的行为,可以多选,目前支持的通知选项包括:
- NOTIFY_HISTORY:指定计算历史数据时是否发送通知,未指定时默认不发送。
- ON_FAILURE_PAUSE:指定在向通知地址发送通知失败时暂停流计算任务,循环重试发送通知,直至发送成功才恢复流计算运行,未指定时默认直接丢失事件通知(不影响流计算继续运行)。
当触发指定的事件时,taosd 会向指定的 URL 发送 POST 请求,消息体为 JSON 格式。一个请求可能包含若干个流的若干个事件,且事件类型不一定相同。 事件信息视窗口类型而定:
- 时间窗口:开始时发送起始时间;结束时发送起始时间、结束时间、计算结果。
- 状态窗口:开始时发送起始时间、前一个窗口的状态值、当前窗口的状态值;结束时发送起始时间、结束时间、计算结果、当前窗口的状态值、下一个窗口的状态值。
- 会话窗口:开始时发送起始时间;结束时发送起始时间、结束时间、计算结果。
- 事件窗口:开始时发送起始时间,触发窗口打开的数据值和对应条件编号;结束时发送起始时间、结束时间、计算结果、触发窗口关闭的数据值和对应条件编号。
- 计数窗口:开始时发送起始时间;结束时发送起始时间、结束时间、计算结果。
通知消息的结构示例如下:
{
"messageId": "unique-message-id-12345",
"timestamp": 1733284887203,
"streams": [
{
"streamName": "avg_current_stream",
"events": [
{
"tableName": "t_a667a16127d3b5a18988e32f3e76cd30",
"eventType": "WINDOW_OPEN",
"eventTime": 1733284887097,
"triggerId": "window-id-67890",
"triggerType": "Interval",
"groupId": "2650968222368530754",
"windowStart": 1733284800000
},
{
"tableName": "t_a667a16127d3b5a18988e32f3e76cd30",
"eventType": "WINDOW_CLOSE",
"eventTime": 1733284887197,
"triggerId": "window-id-67890",
"triggerType": "Interval",
"groupId": "2650968222368530754",
"windowStart": 1733284800000,
"windowEnd": 1733284860000,
"result": {
"_wstart": 1733284800000,
"avg(current)": 1.3
}
}
]
},
{
"streamName": "max_voltage_stream",
"events": [
{
"tableName": "t_96f62b752f36e9b16dc969fe45363748",
"eventType": "WINDOW_OPEN",
"eventTime": 1733284887231,
"triggerId": "window-id-13579",
"triggerType": "Event",
"groupId": "7533998559487590581",
"windowStart": 1733284800000,
"triggerCondition": {
"conditionIndex": 0,
"fieldValue": {
"c1": 10,
"c2": 15
}
},
},
{
"tableName": "t_96f62b752f36e9b16dc969fe45363748",
"eventType": "WINDOW_CLOSE",
"eventTime": 1733284887231,
"triggerId": "window-id-13579",
"triggerType": "Event",
"groupId": "7533998559487590581",
"windowStart": 1733284800000,
"windowEnd": 1733284810000,
"triggerCondition": {
"conditionIndex": 1,
"fieldValue": {
"c1": 20,
"c2": 3
}
},
"result": {
"_wstart": 1733284800000,
"max(voltage)": 220
}
}
]
}
]
}
后续小节是通知消息中各个字段的说明。
根级字段说明
- messageId:字符串类型,是通知消息的唯一标识符,确保整条消息可以被追踪和去重。
- timestamp:长整型时间戳,表示通知消息生成的时间,精确到毫秒,即:'00:00, Jan 1 1970 UTC' 以来的毫秒数。
- streams:对象数组,包含多个流任务的事件信息。(详细信息见下节)
stream 对象的字段说明
- streamName:字符串类型,流任务的名称,用于标识事件所属的流。
- events:对象数组,该流任务下的事件列表,包含一个或多个事件对象。(详细信息见下节)
event 对象的字段说明
通用字段
这部分是所有 event 对象所共有的字段。
- tableName:字符串类型,是对应目标子表的表名。
- eventType:字符串类型,表示事件类型,支持 WINDOW_OPEN、WINDOW_CLOSE、WINDOW_INVALIDATION 三种类型。
- eventTime:长整型时间戳,表示事件生成时间,精确到毫秒,即:'00:00, Jan 1 1970 UTC' 以来的毫秒数。
- triggerId:字符串类型,触发事件的唯一标识符,确保打开和关闭事件(如果有的话)的 ID 一致,便于外部系统将两者关联。如果 taosd 发生故障重启,部分事件可能会重复发送,会保证同一事件的 triggerId 保持不变。
- triggerType:字符串类型,表示触发类型,支持 Period、SLIDING 两种非窗口触发类型以及 INTERVAL、State、Session、Event、Count 五种窗口类型。
- groupId: 字符串类型,是对应分组的唯一标识符,如果是按子表分组,则与对应表的 uid 一致。
定时触发相关字段
这部分是 triggerType 为 Period 时 event 对象的关键字段。
- eventType 固定为 ON_TIME,包含如下字段:
- result:计算结果,为键值对形式,包含窗口计算的结果列列名及其对应的值。
滑动触发(Sliding)相关字段
这部分是 triggerType 为 Sliding 时 event 对象的关键字段。
- eventType 固定为 ON_TIME,包含如下字段:
- result:计算结果,为键值对形式,包含窗口计算的结果列列名及其对应的值。
滑动触发(Interval)相关字段
这部分是 triggerType 为 Interval 时 event 对象的关键字段。
- 如果 eventType 为 WINDOW_OPEN,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- 如果 eventType 为 WINDOW_CLOSE,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- windowEnd:长整型时间戳,表示窗口的结束时间,精度与结果表的时间精度一致。
状态窗口相关字段
这部分是 triggerType 为 State 时 event 对象才有的字段。
- 如果 eventType 为 WINDOW_OPEN,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- prevState:与状态列的类型相同,表示上一个窗口的状态值。如果没有上一个窗口 (即:现在是第一个窗口),则为 NULL。
- curState:与状态列的类型相同,表示当前窗口的状态值。
- 如果 eventType 为 WINDOW_CLOSE,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- windowEnd:长整型时间戳,表示窗口的结束时间,精度与结果表的时间精度一致。
- curState:与状态列的类型相同,表示当前窗口的状态值。
- nextState:与状态列的类型相同,表示下一个窗口的状态值。
- result:计算结果,为键值对形式,包含窗口计算的结果列列名及其对应的值。
会话窗口相关字段
这部分是 triggerType 为 Session 时 event 对象才有的字段。
- 如果 eventType 为 WINDOW_OPEN,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- 如果 eventType 为 WINDOW_CLOSE,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- windowEnd:长整型时间戳,表示窗口的结束时间,精度与结果表的时间精度一致。
- result:计算结果,为键值对形式,包含窗口计算的结果列列名及其对应的值。
事件窗口相关字段
这部分是 triggerType 为 Event 时 event 对象才有的字段。
- 如果 eventType 为 WINDOW_OPEN,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- triggerCondition:触发窗口开始的条件信息,包括以下字段:
- conditionIndex:整型,表示满足的触发窗口开始的条件的索引,从 0 开始编号。
- fieldValue:键值对形式,包含条件列列名及其对应的值。
- 如果 eventType 为 WINDOW_CLOSE,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- windowEnd:长整型时间戳,表示窗口的结束时间,精度与结果表的时间精度一致。
- triggerCondition:触发窗口关闭的条件信息,包括以下字段:
- conditionIndex:整型,表示满足的触发窗口关闭的条件的索引,从 0 开始编号。
- fieldValue:键值对形式,包含条件列列名及其对应的值。
- result:计算结果,为键值对形式,包含窗口计算的结果列列名及其对应的值。
计数窗口相关字段
这部分是 triggerType 为 Count 时 event 对象才有的字段。
- 如果 eventType 为 WINDOW_OPEN,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- 如果 eventType 为 WINDOW_CLOSE,则包含如下字段:
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- result:计算结果,为键值对形式,包含窗口计算的结果列列名及其对应的值。
窗口失效相关字段
因为流计算过程中会遇到数据乱序、更新、删除等情况,可能造成已生成的窗口被删除,或者结果需要重新计算。此时会向通知地址发送一条 WINDOW_INVALIDATION 的通知,说明哪些窗口已经被删除。
这部分是 eventType 为 WINDOW_INVALIDATION 时,event 对象才有的字段。
- windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。
- windowEnd: 长整型时间戳,表示窗口的结束时间,精度与结果表的时间精度一致。
删除流式计算
仅删除流式计算任务,由流式计算写入的数据不会被删除。
DROP STREAM [IF EXISTS] [db_name.]stream_name;
查看流式计算
查看流计算信息
显示当前数据库或指定数据库的流计算。
SHOW [db_name.]STREAMS;
如需查看更详细的信息,可以查询系统表 information_schema.ins_streams
。
SELECT * from information_schema.`ins_streams`;
查看流计算任务
流计算在执行时由多个任务组成,可以从系统表 information_schema.ins_stream_tasks
中获取具体的任务信息。
SELECT * from information_schema.`ins_stream_tasks`;
操作流式计算
启动操作
START STREAM [IF EXISTS] [IGNORE UNTREATED] [db_name.]stream_name;
说明:
- 没有指定
IF EXISTS
时,如果该 stream 不存在,则报错,如果存在,则启动流计算。 - 指定
IF EXISTS
时,如果 stream 不存在,则返回成功,如果存在,则启动流计算。 - 建流后,流自动启动运行,不需要用户启动,只有在停止操作后才需要通过启动操作恢复流的运行。
- 启动流计算时,流计算停止期间写入的数据会被当做历史数据处理。
停止操作
STOP STREAM [IF EXISTS] [db_name.]stream_name;
说明:
- 没有指定
IF EXISTS
时,如果该 stream 不存在,则报错,如果存在,则停止流计算。 - 指定
IF EXISTS
时,如果该 stream 不存在,则返回成功,如果存在,则停止流计算。 - 停止操作是持久有效的,在用户重启流运行之前不会重新运行。
其他特性及说明
高可用
流式计算从架构上支持流的存算分离,在部署时要求系统中必须部署 snode,除数据读取外,所有流计算功能都只在 snode 上运行。
- snode 是负责运行流计算计算任务的节点,在一个集群中可以部署 1 或多个(至少 1 个),每个 dnode 中最多只能部署一个 snode,每个 snode 都有多个执行线程。
- snode 可以与其他节点(vnode/mnode)在相同的 dnode 上部署,但是从资源隔离的角度考虑,强烈推荐 snode 部署在单独的 dnode 上,这样可以保证资源隔离,流计算不会对写入、查询等业务造成太大干扰。
- 为了保证流计算的高可用,推荐在一个集群的多个物理节点中部署多个 snode:
- 流计算在多个 snode 间进行负载均衡。
- 每两个 snode 间互为副本,负责存储流的状态和进度等信息。
- 如果集群只部署了一个 snode,系统将不提供高可用保证。
部署 snode
创建流式计算之前,必须部署 snode,语法如下:
CREATE SNODE ON DNODE dnode_id;
查看 snode
用户可以通过 SHOW SNODES
查看 snode 相关信息。
SHOW SNODES;
若要展示更详细的信息,可以使用:
SELECT * FROM information_schema.`ins_snodes`;
删除 snode
删除 snode 时,需要确保 snode 及其副本同时在线,以便同步流的状态信息。当 snode 或其副本不在线时,删除会失败。
DROP SNODE ON DNODE dnode_id;
权限控制
流式计算的权限控制只与数据库的权限相关联,每个流计算可能与多个数据库相关,权限要求如下:
关联的数据库 | 数据库个数 | 鉴权行为 | 权限要求 |
---|---|---|---|
流所属的数据库 | 1 个 | 创建、删除、停止、启动、手动重算 | 写入权限 |
触发表所属数据库 | 1 个 | 创建 | 读取权限 |
输出表所属数据库 | 1 个 | 创建 | 写入权限 |
计算数据源所属数据库 | 可能多个 | 创建 | 读取权限 |
重新计算
TDengine 的窗口类型大多与主键列相关联,比如事件窗口要根据按主键列排序后的数据进行窗口启动与关闭的计算。在使用窗口触发时,要求触发表的数据写入是尽可能有序的,这样才能保证流计算处理的效率是最高的。因此,一旦存在乱序写入的数据,这些数据将可能影响已经触发完成的窗口的计算结果正确性。同理,数据如果存在更新和删除的情况,也可能会影响计算结果的正确性。
支持使用 WATERMARK
来解决一定程度的乱序、更新、删除场景带来的问题。WATERMARK
是用户可以指定的基于事件时间的时长,它代表的是事件时间在流计算中的进展,体现了用户对于乱序数据的容忍程度。当前处理的最新事件时间 - WATERMARK 指定的固定间隔
即为当前水位线,只有写入数据的事件时间早于当前水位线才会进入触发判断,只有窗口或其他触发的时间条件早于当前水位线才会启动触发。WATERMARK
对于定时触发(PERIOD)不生效,定时触发模式下不会有重新计算。
对于超出 WATERMARK
的乱序、更新、删除场景,使用重新计算的方式来保证最终结果的正确性,重新计算意味着对于乱序、更新和删除的数据覆盖区间重新进行触发和运算。重算时输出表中已经产生的计算结果不会被删除,会重新写入新的结果,为保证这种方式的有效性,用户需要确保其计算语句和数据源表是与处理时间无关的,也就是说同一个触发即使执行多次其结果依然是有效的。
重新计算可以分为自动重新计算与手动重新计算,如果用户不需要自动重新计算,可以通过选项关闭。
手动重算
手动重新需要用户手动发起,在需要时可以通过 SQL 启动。
RECALCULATE STREAM [db_name.]stream_name FROM start_time [TO end_time];
说明:
- 可以根据需要来指定需要重算流的一段时间区间(事件时间)内的数据。如果未指定结束时间(end_time),重算区间将从指定的开始时间(start_time)一直到用户启动手动重算时刻流的当前计算进度截至。
- 不适用于定时触发(PERIOD),适用于其他所有触发类型。
- 计数窗口触发的手动重算必须同时指定开始时间与结束时间,只适用于当前流进度已经完成部分的时间区间的触发重算,如果指定的重算区间包含流当前还未启动计算的区间将自动忽略该重算请求。流计算会在该时间区间内重新划分触发窗口并进行重算,在这种情况下重新划分的窗口与重算前已经完成计算的窗口可能存在不对齐的情况,因此用户可能需要先手动删除结果表中重算区间内的结果,这样才能保证结果表中不会同时存在多次计算的结果。同理,如果未指定重算结束时间,该重算请求将自动被忽略。对于这种需要从某一时刻开始而没有结束时间的重算需求,可以通过先删流,再重建并指定
FILL_HISTORY_FIRST
的方式来满足。
非典型数据写入场景
数据乱序
数据乱序指的是触发表的乱序写入行为,计算不关心数据源表是否乱序,但是用户需要从业务的角度确保在触发发生时数据源表的数据已经写入完成。根据触发方式的不同,数据乱序的影响和处理也不相同。
触发方式 | 影响和处理 |
---|---|
定时触发 滑动触发 计数窗口触发 | 忽略,不处理 |
其他窗口触发 | 默认处理:通过重算进行处理 可选处理:忽略,不处理 |
数据更新
数据更新是指同一个时戳的数据被重复写入多次,其他列可以更新或不更新,数据更新操作也只针对触发表和触发操作有影响,对计算过程本身无影响。根据触发方式的不同,数据更新的影响和处理也不相同。 触发方式
触发方式 | 影响和处理 |
---|---|
定时触发 滑动触发 计数窗口触发 | 忽略,不处理 |
其他窗口触发 | 当做乱序数据处理(重算) |
数据删除
数据删除也只针对触发表和触发操作有影响,对计算过程本身无影响。根据触发方式的不同,数据删除的影响和处理也不相同。
触发方式 | 影响和处理 |
---|---|
定时触发 滑动触发 计数窗口触发 | 忽略,不处理 |
其他窗口触发 | 默认处理:忽略,不处理 可选处理:当做乱序数据处理(重算) |
过期数据
引入 expired_time
来设置数据的过期时间,流的触发产生的每个分组根据最新数据的事件时间和 expired_time
来判断新数据是否过期,过期数据的临界点通过最新事件时间减去 expired_time
得出,所有早于过期数据临界点的数据视为过期数据。
- 过期数据只针对触发表的实时数据,历史数据与其他表的数据没有过期数据的概念。过期数据的判断是在流触发时进行的,数据是否过期与其写入数据库的时机息息相关,所有按照事件时间顺序写入的数据都不会是过期数据,只有乱序写入的数据才可能是过期数据。
- 过期数据不会自动触发产生新的计算和重算,也就意味着在所有触发方式下,过期数据都将被忽略处理(不进行计算和重算)。如果用户没有需要忽略计算和重算的时间区间,则不需要指定
expired_time
。如果用户指定了过期数据,同时想对部分过期数据进行计算或重算,可以通过手动重算的方式实现。 - 过期数据只对是否产生自动触发有影响,不对计算数据的范围有任何影响。因此如果某次触发的计算范围包含触发表中的过期数据,这部分过期数据仍然会被计算使用。
库表操作说明
流计算创建后,如果用户对每个流计算涉及的库表进行了操作,这些操作对流的影响和流对操作的处理总结如下:
操作 | 影响与流的处理 |
---|---|
用户在触发超级表(非虚拟超级表)下新建子表并写入 | 新的子表自动加入当前流计算,可能加入某个分组或新产生一个分组 |
用户在触发虚拟超级表下新建子表并写入 | 忽略不额外处理 |
用户删除触发超级表的子表 | 默认忽略处理 某些触发方式可指定自动重算,可指定是否删除子表对应的结果表(只适用按子表分组的流) |
用户删除触发表 | 忽略不额外处理 |
用户在触发表上新增列 | 忽略不额外处理 |
用户删除触发表的列 | 忽略不额外处理 |
用户修改触发超级表的子表标签值 | 如果该标签列被流作为触发分组列使用则不允许操作,报错处理 否则忽略不处理 |
用户修改触发表的列 schema | 忽略不额外处理(读取时发现 schema 不一致报错) |
用户修改、删除数据源表 | 忽略不额外处理 |
用户修改、删除输出表 | 忽略不额外处理(写入时发现 schema 不一致报错,发现表不存在重新建表) |
用户拆分 vnode | 当被拆分 vnode 所在库为数据源库或触发表库时不允许 当存在虚拟表触发或虚拟表计算时不允许 用户在确认无影响后可指定强制执行( SPLIT VGROUP N FORCE ) |
用户删除数据库 | 当被删库为某个流的数据源库、或触发表库且不是该流的所在库时不允许 当存在非目标数据库的虚拟表触发或虚拟表计算流时不允许 用户在确认无影响后可指定强制执行( DROP DATABASE name FORCE ) |
除了上表中有明确限制和额外处理的操作外,其他未说明的操作和上表中说忽略不额外处理*的操作都没有限制。但是如果操作对流的计算可能产生影响,需要用户根据情况自行进行处理,可选忽略或者通过手动重算的方式进行重新计算。
配置参数说明
流计算相关配置参数包括,详细参见 taosd 参考手册
- numOfMnodeStreamMgmtThreads:mnode 流计算管理线程个数
- numOfStreamMgmtThreads:vnode/snode 流计算管理线程个数
- numOfVnodeStreamReaderThreads:vnode 流计算读线程个数
- numOfStreamTriggerThreads:流计算触发线程个数
- numOfStreamRunnerThreads:流计算执行线程个数
- streamBufferSize:流计算可以使用的最大缓存大小,只适用于
%%trows
的结果缓存(单位:MB) - streamNotifyMessageSize:用于控制事件通知的消息大小
- streamNotifyFrameSize:用于控制事件通知消息发送时底层的帧大小
规则和限制
流式计算功能在使用上的一些规则和限制说明如下:
- 创建流计算前,集群中必须部署 snode,建流时必须存在可用(正常运行状态)snode。
- 每个流属于一个特定的数据库,因此建流前系统中必须已经存在数据库,同一个数据库中的流不允许重名。
- 流的触发表与数据源表可以相同,也可以不相同,可以跨库使用。
- 流的输出表可以与流、触发表、数据源表在不同的库中,不能与触发表或数据源表相同。
- 流的输出表(超级表和普通表)在建流时自动创建,如需写入到已经存在的表中,需要结构完全匹配。
- 每个分组的输出子表不需要提前创建,运算过程中输出结果时会自动创建。
- 每个触发分组的计算结果会写入到同一个子表中,没有指定触发分组时所有计算结果写入到同一个普通表中。
- 如果不同分组指定生成的子表名相同,那么不同分组的计算结果就会保存到同一个子表中,需要用户自行确认这是预期的行为,否则用户需要确保每个分组生成的子表名都是唯一的。
- 用户除了可以指定子表名外,也可以根据需要指定输出超级表的标签列以及每个子表的标签列的值。
- 流计算可以嵌套使用,也就是说可以基于一个流的输出表再创建一个新的流计算。
- 计数窗口触发不支持乱序、更新、删除的自动处理(采取忽略处理的方式),在非
FILL_HISTORY_FIRST
模式下历史与实时窗口可能不对齐。 - 对于超级表的窗口触发方式,只有
interval
和session
窗口支持按照标签、子表分组和不分组,其他窗口只支持按子表分组。 - 不支持在查询中使用伪列,
qstart
、qend
、qduration
。
短期使用限制
- 暂不支持按普通数据列分组的场景。
- 暂不支持
Geometry
数据类型。 - 暂不支持
Interp
、Percentile
、Forecast
和 UDF 函数。 - 暂不支持
DELETE_OUTPUT_TABLE
选项。 - 暂不支持在
NOTIFY_OPTIONS
中使用ON_FAILURE_PAUSE
选项。 - 暂不支持在状态窗口触发中使用
Cast
函数。 - 暂不支持
Windows
平台。
兼容性说明
相比于 3.3.6.0
版本,流计算进行了全新设计。老版本升级之前要做如下动作后,在新的流计算版本上进行重建。
- 删除所有的流计算任务
- 删除所有的 TSMA
- 删除所有的 snode
- 删除 snode 存储目录(原配置项
checkpointBackupDir
指定的路径,默认值为/var/lib/taos/backup/checkpoint/
) - 删除所有的结果表
注意:如果未进行以上操作,taosd 会启动失败。
最佳实践
重构后的流式计算提供了更大的功能灵活性,取消了很多使用限制。在提升可用性的同时,也对使用流计算提出了更高的要求。
部署方面
- 在单独的 dnode 中部署 snode,降低流式计算对数据库读写的影响,该 dnode 不部署 vnode、mnode、qnode。
- 在集群内部署多个 snode 以保证高可用。
- 在建流前先预先完成多个 snode 的创建,以便更好的达到负载均衡。
- 流计算负载高时,可以通过增加 snode 来实现负载均衡。
配置方面
- 根据部署方式和负载情况,合理配置流相关的线程个数,线程数越大可能占用的 CPU 资源越高,反之则越低。
- 根据部署方式合理配置流最大的缓存大小,负载越大,并发流越多,单独部署时都可以调大配置。
建流前设计
用户在建流前需要依次明确下列主要功能检查点,明确后可以按照如下应对方式进行使用:
- 根据业务特点选择触发方式:如需每写入一条或多条数据进行处理则选择计数触发,如需满足窗口条件则选择窗口触发,如需根据事件时间定时运算则选择滑动触发,如需根据处理时间定时运算则选择定时触发。
- 根据业务计算的时效性要求选择窗口触发的可选项:窗口触发除了要选择窗口类型外,还需要选择是开窗触发、关窗触发还是两者都触发,除此之外还可以选择在窗口未关闭时是否需要及时计算(
MAX_DELAY(delay_time)
)。 - 根据事件来源选择触发表:事件触发的来源表选做触发表,对于定时触发可以没有触发表,但是如果需要分组输出或使用定时期间触发表数据集(
%%trows
),则必须指定触发表。 - 确保数据来源表与触发表的时序关系:如果数据来源表与触发表不相同,那么需要确保在触发表事件触发时数据来源表中数据已经可用,否则将可能影响计算结果的正确性。
- 根据业务最终计算的需求决定分组:以超级表计算为例,如果业务最终的需求是全局聚合则不需要分组,如果需要某些表聚合则选择按标签分组,如果需要单个子表的计算结果则可以选择按子表分组。
- 根据流计算结果的应用方式选择分组输出子表:
- 每个分组都可以有自己独立的输出子表,如果分组过多可能导致结果表过多,因此可以根据后续流计算结果的应用方式和系统资源限制,选择是否每个分组需要单独的输出子表。
- 多个分组可合并输出到一个子表时,可以设定这些分组使用相同的输出表名(
[OUTPUT_SUBTABLE(tbname_expr)]
),结合输出表的复合主键设计,即可实现分组结果合并。
- 用最优的数据写入方式:每个分组的数据都能顺序写入是流计算最佳的写入方式,如果存在大量的乱序写入、更新写入、数据删除操作,将可能造成大量的重算处理,因此如果有条件能够保证写入顺序将可以有效提升流计算的计算效能。
- 确认写入数据的乱序情况:根据每个子表的写入乱序情况确定是否需要指定
WATERMARK
以及确定合适的WATERMARK
时长。 - 确认乱序数据写入对流计算的影响:
- 如果存在乱序写入数据,需要确认这些乱序写入的数据的计算结果的影响,对于业务非常注重计算或通知的时效性、触发表乱序数据不影响计算结果等场景,可以指定
STREAM_OPTIONS(IGNORE_DISORDER)
忽略这些乱序数据。 - 如果存在严重的过去时间的乱序写入数据(写入数据的事件时间与当前已经处理的事件时间相差太大),且这些数据不影响计算结果或时效性已经丧失,可以通过
STREAM_OPTIONS(EXPIRED_TIME(exp_time))
指定其为过期数据且忽略处理。
- 如果存在乱序写入数据,需要确认这些乱序写入的数据的计算结果的影响,对于业务非常注重计算或通知的时效性、触发表乱序数据不影响计算结果等场景,可以指定
- 确认重算对流计算结果的有效性:乱序、更新、删除场景主要是通过重算方式来解决,如果重算结果不具有幂等性或有效性,则会影响结果的正确性,需可结合业务特点进行判断。
- 确认删除数据对流计算的影响:如果存在数据删除,且需要根据删除的数据重新计算结果时,可以通过
STREAM_OPTIONS(DELETE_RECALC)
指定。 - 确认历史数据是否需要计算和计算方式:
- 在建流前,数据库中已经写入的数据需要进行计算,需要根据业务特点和处理逻辑进一步确认是优先计算历史数据还是实时数据,例如
COUNT_WINDOW
触发需优先历史数据计算,否则可能造成窗口无法衔接。 - 如果优先历史数据计算,可以指定
STREAM_OPTIONS(FILL_HISTORY_FIRST)
,否则指定STREAM_OPTIONS(FILL_HISTORY)
。
- 在建流前,数据库中已经写入的数据需要进行计算,需要根据业务特点和处理逻辑进一步确认是优先计算历史数据还是实时数据,例如
- 确认业务对流计算实时性要求程度:如果业务对流计算通知或计算的实时性要求很高,可以指定
OPSTREAM_OPTIONSTIONS(LOW_LATENCY_CALC)
,在这种模式下会对计算资源有更高的要求。 - 确认使用流计算的用途:如果只需要一个事件触发通知,而不需要做计算,那可以使用只通知不计算模式(即不指定计算语句)。
- 确认计算后结果的应用方式:如果只需要结果通知而不需要保存,可以使用只通知不保存选项(
STREAM_OPTIONS(CALC_NOTIFY_ONLY)
)。 - 确认结果写入的可靠性:运算过程中如果计算结果的主键为 NULL 值,则对应的计算结果会被丢弃。
- 如果查询语句也包含分组子句,且分组结果写入到同一个子表,则不同分组产生的相同时间戳记录会出现数据覆盖。
- 如果同一个分组多次触发计算产生的主键时间戳相同,那么它们会互相覆盖。
- 如果同一个分组多次触发(重算)产生的主键时间戳不相同,那么它们就不会被更新。
流的维护
在流的状态展示中(查询表 information_schema.ins_streams
),会列举流的一些具体的状态信息,例如:流的实时计算是否能保持进度一致,流的重算次数多少(比例),流的错误信息等等,需要用户或运维人员关注展示的信息,判断流计算业务是否正常并据此进行分析优化处理。
建流示例
计数窗口触发
- 表 tb1 每写入 1 行数据时,计算表 tb2 在同一时刻前 5 分钟内 col1 的平均值,计算结果写入表 tb3。
CREATE stream sm1 count_window(1) FROM tb1
INTO tb3 AS
SELECT _twstart, avg(col1) FROM tb2
WHERE _c0 >= _twend - 5m AND _c0 <= _twend;
- 表 tb1 每写入 10 行大于 0 的 col1 列数据时,计算这 10 条数据 col1 列的平均值,计算结果不需要保存,需要通知到
ws://localhost:8080/notify
。
CREATE stream sm2 count_window(10, 1, col1) FROM tb1
STREAM_OPTIONS(CALC_ONTIFY_ONLY | PRE_FILTER(col1 > 0))
NOTIFY("ws://localhost:8080/notify") ON (WINDOW_CLOSE)
AS
SELECT avg(col1) FROM %%trows;
事件窗口触发
- 当环境温度超过 80 度持续超过 10 分钟时,计算环境温度的平均值。
CREATE STREAM `idmp`.`ana_temp` EVENT_WINDOW(start with `环境温度` > 80 end with `环境温度` <= 80 ) TRUE_FOR(10m) FROM `idmp`.`vt_气象传感器02_471544`
STREAM_OPTIONS( IGNORE_DISORDER)
INTO `idmp`.`ana_temp`
AS
SELECT _twstart+0s as output_timestamp, avg(`环境温度`) as `平均环境温度` FROM idmp.`vt_气象传感器02_471544` where ts >= _twstart and ts <= _twend;
滑动触发
- 超级表 stb1 的每个子表在每 5 分钟的时间窗口结束后,计算这 5 分钟的 col1 的平均值,每个子表的计算结果分别写入超级表 stb2 的不同子表中。
CREATE stream sm1 INTERVAL(5m) SLIDING(5m) FROM stb1 PARTITION BY tbname
INTO stb2
AS
SELECT _twstart, avg(col1) FROM %%tbname
WHERE _c0 >= _twstart AND _c0 <= _twend;
上面 SQL 中的
from %%tbname where _c0 >= _twstart and _c0 <= _twend
与from %%trows
的含义是不完全相同的。前者表示计算使用触发分组对应的表中在触发窗口时间段内的数据,窗口内的数据在计算时与%%trows
相比较是有可能有变化的,后者则表示只使用触发时读取到的窗口数据。
- 超级表 stb1 的每个子表从最早的数据开始,在每 5 分钟的时间窗口结束后或从窗口启动 1 分钟后窗口仍然未关闭时,计算窗口内的 col1 的平均值,每个子表的计算结果分别写入超级表 stb2 的不同子表中。
CREATE stream sm2 INTERVAL(5m) SLIDING(5m) FROM stb1 PARTITION BY tbname
STREAM_OPTIONS(MAX_DELAY(1m) | FILL_HISTORY_FIRST)
INTO stb2
AS
SELECT _twstart, avg(col1) FROM %%tbname WHERE _c0 >= _twstart AND _c0 <= _twend;
- 计算电表电流的每分钟平均值,并在窗口打开、关闭时向两个通知地址发送通知,计算历史数据时不发送通知,不允许在通知发送失败时丢弃通知。
CREATE STREAM avg_stream INTERVAL(1m) SLIDING(1m) FROM meters
NOTIFY ('ws://localhost:8080/notify', 'wss://192.168.1.1:8080/notify?key=foo') ON ('WINDOW_OPEN', 'WINDOW_CLOSE') NOTIFY_OPTIONS(NOTIFY_HISTORY | ON_FAILURE_PAUSE)
INTO avg_stb
AS
SELECT _twstart, _twend, AVG(current) FROM %%trows;
定时触发
- 每过 1 小时计算表 tb1 中总的数据量,计算结果写入表 tb2 (毫秒库)。
CREATE stream sm1 PERIOD(1h)
INTO tb2
AS
SELECT cast(_tlocaltime/1000000 AS TIMESTAMP), count(*) FROM tb1;
- 每过 1 小时通知
ws://localhost:8080/notify
当前系统时间。
CREATE stream sm1 PERIOD(1h)
NOTIFY("ws://localhost:8080/notify");