Skip to main content

“Kafka”数据源

Kafka 数据写入,是通过连接代理把数据从 Kafka 服务器写入到当前选择的 TDengine Cloud 实例。

先决条件

  • 创建一个空数据库来存储 Kafka 数据。更多信息,请参阅 数据库
  • 确保连接代理运行在与 Kafka 服务器位于同一网络的机器上。更多信息,请参阅 安装连接代理

具体步骤

  1. 在 TDengine Cloud 中,在左边菜单中打开 数据写入 页面,在 数据源 选项卡上,单击 添加数据源打开新增页面。在名称输入框里面填写这个数据源的名称,并选择 Kafka 类型,在代理选择框里面选择已经创建的代理,如果没有创建代理,请点击旁边的创建新的代理按钮去创建新代理。
  2. 目标数据库里面选择一个当前所在的 TDengine Cloud 实例里面的数据库作为目标数据库。
  3. bootstrap-servers 栏目里,配置 Kafka 的 bootstrap 服务器,例如 192.168.1.92:9092,这个是必填字段。
  4. SSL 证书栏目中,如果开启了 SSL 认证,请上传对应的客户端证书和客户端私钥文件。
  5. 采集配置 栏目中,需要配置消费者的超时时间,消费者组的 ID 等参数,
    • 超时时间 中填写超时时间。当从 Kafka 消费不到任何数据,超过 timeout 后,数据采集任务会退出。
    • 主题 中填写要消费的 Topic 名称。可以配置多个 Topic , Topic 之间用逗号分隔。例如:tp1,tp2
    • Offset 的下拉列表中选择从哪个 Offset 开始消费数据。有三个选项:EarliestLatestByTime(ms)。 默认值为 Earliest,Earliest:用于请求最早的 offset,Latest:用于请求最晚的 offset,ByTime:用于请求在特定时间(毫秒)之前的所有消息; 时间戳为毫秒精度。
    • 获取数据的最大时长 中设置获取消息时等待数据不足的最长时间(以毫秒为单位),默认值为 100ms。
  6. 可以点击连通性检查, 检查 Cloud 实例 与 Kafka 服务之间是否可以连通。
  7. 如果消费的 Kafka 数据是 JSON 格式,可以配置 Payload 解析卡片,对数据进行解析转换。
    • 消息体 中填写 Kafka 消息体中的示例数据,例如:{"id": 1, "message": "hello-word"}{"id": 2, "message": "hello-word"}。之后会使用这条示例数据来配置提取和过滤条件。点击 放大镜图标 可查看预览解析结果。
    • 从列中提取或拆分 中填写从消息体中提取或拆分的字段,例如:将 message 字段拆分成 message_0message_1 这 2 个字段,选择 split 提取器,seperator 填写 -, number 填写 2。点击删除,可以删除当前提取规则。点击新增,可以添加更多提取规则。点击放大镜图标可查看预览提取/拆分结果。
    • 过滤中,填写过滤条件,例如:填写id != 1,则只有 id 不为 1 的数据才会被写入 TDengine。点击 删除,可以删除当前过滤规则。点击放大镜图标可查看预览过滤结果。
    • 目标超级表的下拉列表中选择一个目标超级表,也可以先点击右侧的创建超级表按钮,创建一个新的超级表。在映射中,填写目标超级表中的子表名称,例如:t_{id}。点击预览,可以查看映射的结果。
  8. 高级选项卡片,可以配置以下信息:
    • 最大读取并发数配置连接器从源 InfluxDB 数据库中读取数据时的最大并发数。
  9. 填写完以上信息后,点击提交按钮,即可启动从 Kafka 到 TDengine 的数据同步。