跳到主要内容

Pulsar

本节讲述如何通过 Explorer 界面创建数据迁移任务,从 Pulsar 迁移数据到当前 TDengine TSDB 集群。

功能概述

Apache Pulsar 是一个云原生的开源分布式消息与流处理平台。

TDengine TSDB 可以高效地从 Pulsar 读取数据并将其写入 TDengine TSDB,以实现历史数据迁移或实时数据流入库。

创建任务

1. 新增数据源

在数据写入页面中,点击 +新增数据源 按钮,进入新增数据源页面。

image-20260103095152670

2. 配置基本信息

名称 中输入任务名称,如:“test_pulsar”;

类型 下拉列表中选择 Pulsar

代理 是非必填项,如有需要,可以在下拉框中选择指定的代理,也可以先点击右侧的 +创建新的代理

目标数据库 下拉列表中选择一个目标数据库,也可以先点击右侧的 +创建数据库 按钮

image-20260103095451790

3. 配置连接信息

Broker Server,例如:192.168.2.131:6650

只需要填写一个有效的 broker server 地址。

image-20260103095925872

4. 认证机制

如果服务端开启了相关认证机制,此处需要填写认证信息,目前支持 Basic Auth/JWT/mTLS/Custom Authentication 四种认证机制,请按实际情况进行选择。

如果服务端没有配置任何认证,可跳过此步骤不用填写。

4.1. Basic Auth 认证

选择 Basic-Auth 认证机制,输入用户名和密码:

image-20260103114752040

4.2. JWT 认证

选择 JWT 认证机制,输入 JWT token 信息:

image-20260103114952662

4.3. 配置 mTLS 证书认证

如果服务端开启了 mTLS 加密认证,此处需要启用 mTLS 并配置相关内容。

image-20260103115353359

4.4. Custom Authentication 认证

选择 Custom Authentication ,输入服务器自定义的认证信息即可:

image-20260104111359714

5. 配置采集信息

采集配置 区域填写采集任务相关的配置参数。

超时时间 中填写超时时间。当从 Pulsar 消费不到任何数据,超过 timeout 后,数据采集任务会退出。默认值是 0 ms。当 timeout 设置为 0 时,会一直等待,直到有数据可用,或者发生错误。

主题 中填写要消费的 Topic 名称。可以配置多个 Topic,Topic 之间用逗号分隔。例如:persistent://public/default/tp1,persistent://public/default/tp2

消费者名称 中填写消费者标识,填写后会生成带有 taosx 前缀的消费者 ID。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 taosx 之后,输入的标识之前。

订阅名称 中填写订阅名标识,填写后会生成带有 taosx 前缀的订阅 ID。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 taosx 之后,输入的标识之前。

Initial Position 的下拉列表中选择从哪个位置开始消费数据。有两个选项:EarliestLatest。默认值为 Earliest。

  • Earliest:用于请求最早的位置。
  • Latest:用于请求最晚的位置。

字符编码 中,配置消息体编码格式,taosX 在接收到消息后,使用对应的编码格式对消息体进行解码获取原始数据。可选项 UTF_8, GBK, GB18030, BIG5,默认为 UTF_8

点击 连通性检查 按钮,检查数据源是否可用。

image-20260103120055362

6. 配置 Payload 解析

Payload 解析 区域填写 Payload 解析相关的配置参数。

6.1 解析

有三种获取示例数据的方法:

点击 从服务器检索 按钮,从 Pulsar 获取示例数据。

点击 文件上传 按钮,上传 CSV 文件,获取示例数据。

消息体 中填写 Pulsar 消息体中的示例数据。

json 数据支持 JSONObject 或者 JSONArray,使用 json 解析器可以解析一下数据:

{"id": 1, "message": "hello-world"}
{"id": 2, "message": "hello-world"}

或者

[{"id": 1, "message": "hello-world"},{"id": 2, "message": "hello-world"}]

解析结果如下所示:

kafka-07.png

点击 放大镜图标 可查看预览解析结果。

kafka-08.png

6.2 字段拆分

从列中提取或拆分 中填写从消息体中提取或拆分的字段,例如:将 message 字段拆分成 message_0message_1 这 2 个字段,选择 split 提取器,separator 填写 -, number 填写 2。

点击 新增,可以添加更多提取规则。

点击 删除,可以删除当前提取规则。

kafka-09.png

点击 放大镜图标 可查看预览提取/拆分结果。

kafka-10.png

6.3 数据过滤

过滤 中,填写过滤条件,例如:填写 id != 1,则只有 id 不为 1 的数据才会被写入 TDengine TSDB。

点击 新增,可以添加更多过滤规则。

点击 删除,可以删除当前过滤规则。

kafka-11.png

点击 放大镜图标 可查看预览过滤结果。

kafka-12.png

6.4 表映射

目标超级表 的下拉列表中选择一个目标超级表,也可以先点击右侧的 创建超级表 按钮

映射 中,填写目标超级表中的子表名称,例如:t_{id}。根据需求填写映射规则,其中 mapping 支持设置缺省值。

kafka-13.png

点击 预览,可以查看映射的结果。

kafka-14.png

7. 配置高级选项

高级选项 区域是默认折叠的,点击右侧 > 可以展开,如下图所示:

最大读取并发数 数据源连接数或读取线程数限制,当默认参数不满足需要或需要调整资源使用量时修改此参数。

批次大小 单次发送的最大消息数或行数。默认是 10000。

advanced_options.png

8. 异常处理策略

异常处理策略区域是对数据异常时的处理策略进行配置,默认折叠的,点击右侧 > 可以展开,如下图所示:

exception-handling-strategy.png

各异常项说明及相应可选处理策略如下:

通用处理策略说明:
归档:将异常数据写入归档文件(默认路径为 ${data_dir}/tasks/_id/.datetime),不写入目标库
丢弃:将异常数据忽略,不写入目标库
报错:任务报错

  • 目标库连接超时 目标库连接失败,可选处理策略:归档、丢弃、报错、缓存

    缓存:当目标库状态异常(连接错误或资源不足等情况)时写入缓存文件(默认路径为 ${data_dir}/tasks/_id/.datetime),目标库恢复正常后重新入库

  • 目标库不存在 写入报错目标库不存在,可选处理策略:归档、丢弃、报错
  • 表不存在 写入报错表不存在,可选处理策略:归档、丢弃、报错、自动建表

    自动建表:自动建表,建表成功后重试

  • 主键时间戳溢出 检查数据中第一列时间戳是否在正确的时间范围内(now - keep1, now + 100y),可选处理策略:归档、丢弃、报错
  • 主键时间戳空 检查数据中第一列时间戳是否为空,可选处理策略:归档、丢弃、报错、使用当前时间

    使用当前时间:使用当前时间填充到空的时间戳字段中

  • 复合主键空 写入报错复合主键空,可选处理策略:归档、丢弃、报错
  • 表名长度溢出 检查子表表名的长度是否超出限制(最大 192 字符),可选处理策略:归档、丢弃、报错、截断、截断且归档

    截断:截取原始表名的前 192 个字符作为新的表名
    截断且归档:截取原始表名的前 192 个字符作为新的表名,并且将此行记录写入归档文件

  • 表名非法字符 检查子表表名中是否包含特殊字符(符号 . 等),可选处理策略:归档、丢弃、报错、非法字符替换为指定字符串

    非法字符替换为指定字符串:将原始表名中的特殊字符替换为后方输入框中的指定字符串,例如 a.b 替换为 a_b

  • 表名模板变量空值 检查子表表名模板中的变量是否为空,可选处理策略:丢弃、留空、变量替换为指定字符串

    留空:变量位置不做任何特殊处理,例如 a_{x} 转换为 a_ 变量替换为指定字符串:变量位置使用后方输入框中的指定字符串,例如 a_{x} 转换为 a_b

  • 列名不存在 写入报错列名不存在,可选处理策略:归档、丢弃、报错、自动增加缺失列

    自动增加缺失列:根据数据信息,自动修改表结构增加列,修改成功后重试

  • 列名长度溢出 检查列名的长度是否超出限制(最大 64 字符),可选处理策略:归档、丢弃、报错
  • 列自动扩容 开关选项,打开时,列数据长度超长时将自动修改表结构并重试
  • 列长度溢出 写入报错列长度溢出,可选处理策略:归档、丢弃、报错、截断、截断且归档

    截断:截取数据中符合长度限制的前 n 个字符
    截断且归档:截取数据中符合长度限制的前 n 个字符,并且将此行记录写入归档文件

  • 数据异常 其他数据异常(未在上方列出的其他异常)的处理策略,可选处理策略:归档、丢弃、报错
  • 连接超时 配置目标库连接超时时间,单位“秒”取值范围 1~600
  • 临时存储文件位置 配置缓存文件的位置,实际生效位置 $DATA_DIR/tasks/:id/{location}
  • 归档数据保留天数 非负整数,0 表示无限制
  • 归档数据可用空间 0~65535,其中 0 表示无限制
  • 归档数据文件位置 配置归档文件的位置,实际生效位置 $DATA_DIR/tasks/:id/{location}
  • 归档数据失败处理策略 当写入归档文件报错时的处理策略,可选处理策略:删除旧文件、丢弃、报错并停止任务

    删除旧文件:删除旧文件,如果删除旧文件后仍然无法写入,则报错并停止任务 丢弃:丢弃即将归档的数据 报错并停止任务:报错并停止当前任务

9. 创建完成

点击 提交 按钮,完成创建 Pulsar 到 TDengine TSDB 的数据同步任务,回到数据源列表页面可查看任务执行情况。