“Kafka”数据源
Kafka 数据写入,是通过连接代理把数据从 Kafka 服务器写入到当前选择的 TDengine Cloud 实例。
先决条件
具体步骤
- 在 TDengine Cloud 中,在左边菜单中打开 数据写入 页面,在 数据源 选项卡上,单击 添加数据源打开新增页面。在名称输入框里面填写这个数据源的名称,并选择 Kafka 类型,在代理选择框里面选择已经创建的代理,如果没有创建代理,请点击旁边的创建新的代理按钮去创建新代理。
- 在目标数据库里面选择一个当前所在的 TDengine Cloud 实例里面的数据库作为目标数据库。
- 在 bootstrap-servers 栏目里,配置 Kafka 的 bootstrap 服务器,例如 192.168.1.92:9092,这个是必填字段。
- 在 SSL 证书栏目中,如果开启了 SSL 认证,请上传对应的客户端证书和客户端私钥文件。
- 在 采集配置 栏目中,需要配置消费者的超时时间,消费者组的 ID 等参数,
- 超时时间 中填写超时时间。当从 Kafka 消费不到任何数据,超过 timeout 后,数据采集任务会退出。
- 主题 中填写要消费的 Topic 名称。可以配置多个 Topic , Topic 之间用逗号分隔。例如:
tp1,tp2
。 - Offset 的下拉列表中选择从哪个 Offset 开始消费数据。有三个选项:
Earliest
、Latest
、ByTime(ms)
。 默认值为 Earliest,Earliest:用于请求最早的 offset,Latest:用于请求最晚的 offset,ByTime:用于请求在特定时间(毫秒)之前的所有消息; 时间戳为毫秒精度。 - 获取数据的最大时长 中设置获取消息时等待数据不足的最长时间(以毫秒为单位),默认值为 100ms。
- 可以点击连通性检查, 检查 Cloud 实例 与 Kafka 服务之间是否可以连通。
- 如果消费的 Kafka 数据是 JSON 格式,可以配置 Payload 解析卡片,对数据进行解析转换。
- 在 消息体 中填写 Kafka 消息体中的示例数据,例如:
{"id": 1, "message": "hello-word"}{"id": 2, "message": "hello-word"}
。之后会使用这条示例数据来配置提取和过滤条件。点击 放大镜图标 可查看预览解析结果。 - 从列中提取或拆分 中填写从消息体中提取或拆分的字段,例如:将 message 字段拆分成
message_0
和message_1
这 2 个字段,选择 split 提取器,seperator 填写 -, number 填写 2。点击删除,可以删除当前提取规则。点击新增,可以添加更多提取规则。点击放大镜图标可查看预览提取/拆分结果。 - 在过滤中,填写过滤条件,例如:填写
id != 1
,则只有 id 不为 1 的数据才会被写入 TDengine。点击 删除,可以删除当前过滤规则。点击放大镜图标可查看预览过滤结果。 - 在目标超级表的下拉列表中选择一个目标超级表,也可以先点击右侧的创建超级表按钮,创建一个新的超级表。在映射中,填写目标超级表中的子表名称,例如:
t_{id}
。点击预览,可以查看映射的结果。
- 在 消息体 中填写 Kafka 消息体中的示例数据,例如:
- 在高级选项卡片,可以配置以下信息:
- 在最大读取并发数配置连接器从源 InfluxDB 数据库中读取数据时的最大并发数。
- 填写完以上信息后,点击提交按钮,即可启动从 Kafka 到 TDengine 的数据同步。