跳到主要内容

SparkplugB

本节讲述如何通过 Explorer 界面创建数据写入任务,从 SparkplugB 读取数据写入到当前 TDengine 集群。

功能概述

SparkplugB 是一种开放消息规范,专为工业物联网 (IIoT) 应用设计,基于 MQTT 协议。

TDengine 可以通过 SparkplugB 连接器从 MQTT 代理订阅数据并将其写入 TDengine,以实现实时数据流入库。

创建任务

1. 新增数据源

在数据写入页面中,点击 +新增数据源 按钮,进入新增数据源页面。

spb-01.png

2. 配置基本信息

名称 中输入任务名称,如:“test_spb”;

类型 下拉列表中选择 SparkplugB

代理 是非必填项,如有需要,可以在下拉框中选择指定的代理,也可以先点击右侧的 +创建新的代理 按钮

目标数据库 下拉列表中选择一个目标数据库,也可以先点击右侧的 +创建数据库 按钮

spb-02.png

3. 配置连接和认证信息

Brokers 中填写 MQTT 代理的地址,例如:localhost:1883, 可以填写多个,用 ',' 分隔,用于连接多个 broker。

MQTT 协议 中选择使用的 MQTT 协议版本,默认 5.0 版本。

客户端 ID 中填写连接到每个 broker 所使用的客户端标识符。

Keep Alive 中输入保持活动间隔。如果代理在保持活动间隔内没有收到来自客户端的任何消息,它将假定客户端已断开连接,并关闭连接。 保持活动间隔是指客户端和代理之间协商的时间间隔,用于检测客户端是否活动。如果客户端在保持活动间隔内没有向代理发送消息,则代理将断开连接。

用户 中填写 MQTT 代理的用户名。

密码 中填写 MQTT 代理的密码。

TLS 校验 中选择 TLS 证书的校验方式

  1. 不开启:表示不进行 TLS 证书认证。在连接 MQTT 时,会先进行 TCP 连接,如果连接失败,会进行无证书认证模式的 TLS 连接。

  2. 单向认证:开启 TLS 连接,并验证服务端证书,此时需要上传 CA 证书。

  3. 双向认证:开启 TLS 连接,并与服务端进行双向认证,此时需要上传 CA 证书,客户端证书以及客户端密钥。

点击 检查连通性 按钮,检查数据源是否可用。

spb-03.png

4. 订阅配置

Group ID 中填写 SparkplugB 规定的 group id 字段,通常一个 group id 代表一个集团/公司/工厂/流水线 等概念。

节点/设备列表 中填写需要订阅的节点和设备的列表,以逗号分隔,其中节点直接填写 ID 即可,设备需要按照 节点 ID/设备 ID 格式填写。

消息类型 中填写需要订阅的 SparkplugB 消息类型,以逗号分隔,有 NBIRTH/NDEATH/NDATA/NCMD/DBIRTH/DDEATH/DDATA/DCMD/STATE。在订阅时,NBIRTH/NDEATH/NDATA/NCMD 类型的消息只会匹配 "节点/设备列表" 中的节点,而 DBIRTH/DDEATH/DDATA/DCMD 只会匹配 "节点/设备列表" 中的设备。

下发 REBIRTH 命令 开启后,taosX 会自动下发 NCMD 中的 "Node Control/Rebirth" 命令,获取节点和设备的所有 metric 信息,从而可以获取 metric name 和 metric alias 的对应关系。如果节点/设备在上报数据时不使用 alias 别名机制,可以不开启此选项。

spb-04.png

5. 配置 Payload 转换

Payload 解析 区域填写 Payload 解析相关的配置参数。

5.1 解析

有三种获取示例数据的方法:

点击 从服务器检索 按钮,从 MQTT 获取示例数据。

点击 文件上传 按钮,上传 CSV 文件,获取示例数据。

消息体 中填写 MQTT 消息体中的示例数据,由于 SparkplugB 消息使用 protobuf 进行编码,因此从服务器检索的数据是经过编码为 json 格式的数据。

json 数据支持 JSONObject 或者 JSONArray,可以用于解析 SparkplugB 中的 metadata 和 properties 等 json 格式的字段。

spb-05.png

点击 放大镜图标 可查看预览解析结果。

spb-06.png

5.2 字段拆分

从列中提取或拆分 中填写从消息体中提取或拆分的字段,例如:将 datatype_str 字段的值转换为 TDengine 类型,在 rule 输入框中填写如下 json 值,在 name 中填写 td_datatype

{
"Int8": "TINYINT",
"UInt8": "TINYINT UNSIGNED",
"Int16": "SMALLINT",
"UInt16": "SMALLINT UNSIGNED",
"Int32": "INT",
"UInt32": "INT UNSIGNED",
"Int64": "BIGINT",
"UInt64": "BIGINT UNSIGNED",
"Float": "FLOAT",
"DOUBLE": "DOUBLE",
"Boolean": "BOOL",
"String": "VARCHAR(128)",
"DateTime": "TIMESTAMP"
}

会将 datatype_str 列的字段的值如 "Int8" 转换为对应的 "TINYINT",新的列命名为 td_datatype

spb-07.png

点击 删除,可以删除当前提取规则。

点击 新增,可以添加更多提取规则。

点击 放大镜图标 可查看预览提取/拆分结果。

spb-08.png

5.3 数据过滤

过滤 中,填写过滤条件,例如:填写datatype_str != "Int8",则只有 datatype_str 不为 Int8 的数据才会被写入 TDengine。

spb-9.png

点击 删除,可以删除当前过滤规则。

点击 放大镜图标 可查看预览过滤结果。

spb-10.png

5.4 表映射

目标超级表 的下拉列表中选择一个目标超级表,也可以先点击右侧的 创建超级表 按钮创建新的超级表。

当超级表需要根据消息动态生成时,可以选择 创建模板。其中,超级表名称,列名,列类型等均可以使用模板变量,当接收到数据后,程序会自动计算模板变量并生成对应的超级表模板,当数据库中超级表不存在时,会使用此模板创建超级表;对于已创建的超级表,如果缺少通过模板变量计算得到的列,也会自动创建对应列。

spb-11.png

映射 中,填写目标超级表中的子表名称,例如:t_{id}。根据需求填写映射规则,其中 mapping 支持设置缺省值。

spb-12.png

点击 预览,可以查看映射的结果。

spb-13.png

6. 高级选项

处理批次上限 中填写可以同时进行数据处理流程的批次数量,当到达此上限后,不再从消息缓存队列中获取消息,会导致缓存队列的消息积压,最小值为 1。

批次大小 中填写每次发送给数据处理流程的消息数量,和 批次延时 配合使用,当读取的 MQTT 消息数量达到批次大小时,就算 批次延时 没有到达也立即向数据处理流程发送数据,最小值为 1。

批次延时 中填写每次生成批次消息的超时时间(单位:毫秒),从每批次接收到的第一个消息开始算起,和 批次大小 配合使用,当读取消息到达超时时间时,就算 批次大小 不满足数量也立即向数据处理流程发送数据,最小值为 1。

spb-14

7. 异常处理策略

异常处理策略区域是对数据异常时的处理策略进行配置,默认折叠的,点击右侧 > 可以展开,如下图所示:

exception-handling-strategy.png

各异常项说明及相应可选处理策略如下:

通用处理策略说明:
归档:将异常数据写入归档文件(默认路径为 ${data_dir}/tasks/_id/.datetime),不写入目标库
丢弃:将异常数据忽略,不写入目标库
报错:任务报错

  • 目标库连接超时 目标库连接失败,可选处理策略:归档、丢弃、报错、缓存

    缓存:当目标库状态异常(连接错误或资源不足等情况)时写入缓存文件(默认路径为 ${data_dir}/tasks/_id/.datetime),目标库恢复正常后重新入库

  • 目标库不存在 写入报错目标库不存在,可选处理策略:归档、丢弃、报错
  • 表不存在 写入报错表不存在,可选处理策略:归档、丢弃、报错、自动建表

    自动建表:自动建表,建表成功后重试

  • 主键时间戳溢出 检查数据中第一列时间戳是否在正确的时间范围内(now - keep1, now + 100y),可选处理策略:归档、丢弃、报错
  • 主键时间戳空 检查数据中第一列时间戳是否为空,可选处理策略:归档、丢弃、报错、使用当前时间

    使用当前时间:使用当前时间填充到空的时间戳字段中

  • 复合主键空 写入报错复合主键空,可选处理策略:归档、丢弃、报错
  • 表名长度溢出 检查子表表名的长度是否超出限制(最大 192 字符),可选处理策略:归档、丢弃、报错、截断、截断且归档

    截断:截取原始表名的前 192 个字符作为新的表名
    截断且归档:截取原始表名的前 192 个字符作为新的表名,并且将此行记录写入归档文件

  • 表名非法字符 检查子表表名中是否包含特殊字符(符号 . 等),可选处理策略:归档、丢弃、报错、非法字符替换为指定字符串

    非法字符替换为指定字符串:将原始表名中的特殊字符替换为后方输入框中的指定字符串,例如 a.b 替换为 a_b

  • 表名模板变量空值 检查子表表名模板中的变量是否为空,可选处理策略:丢弃、留空、变量替换为指定字符串

    留空:变量位置不做任何特殊处理,例如 a_{x} 转换为 a_ 变量替换为指定字符串:变量位置使用后方输入框中的指定字符串,例如 a_{x} 转换为 a_b

  • 列名不存在 写入报错列名不存在,可选处理策略:归档、丢弃、报错、自动增加缺失列

    自动增加缺失列:根据数据信息,自动修改表结构增加列,修改成功后重试

  • 列名长度溢出 检查列名的长度是否超出限制(最大 64 字符),可选处理策略:归档、丢弃、报错
  • 列自动扩容 开关选项,打开时,列数据长度超长时将自动修改表结构并重试
  • 列长度溢出 写入报错列长度溢出,可选处理策略:归档、丢弃、报错、截断、截断且归档

    截断:截取数据中符合长度限制的前 n 个字符
    截断且归档:截取数据中符合长度限制的前 n 个字符,并且将此行记录写入归档文件

  • 数据异常 其他数据异常(未在上方列出的其他异常)的处理策略,可选处理策略:归档、丢弃、报错
  • 连接超时 配置目标库连接超时时间,单位“秒”取值范围 1~600
  • 临时存储文件位置 配置缓存文件的位置,实际生效位置 $DATA_DIR/tasks/:id/{location}
  • 归档数据保留天数 非负整数,0 表示无限制
  • 归档数据可用空间 0~65535,其中 0 表示无限制
  • 归档数据文件位置 配置归档文件的位置,实际生效位置 $DATA_DIR/tasks/:id/{location}
  • 归档数据失败处理策略 当写入归档文件报错时的处理策略,可选处理策略:删除旧文件、丢弃、报错并停止任务

    删除旧文件:删除旧文件,如果删除旧文件后仍然无法写入,则报错并停止任务 丢弃:丢弃即将归档的数据 报错并停止任务:报错并停止当前任务

8. 创建完成

点击 提交 按钮,完成创建 MQTT 到 TDengine 的数据同步任务,回到数据源列表页面可查看任务执行情况。

请您留言

客服暂时离开,有问题请留言。

提交