TDengine Kafka Connector 使用教程
TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDengine Sink Connector。用户只需提供简单的配置文件,就可以将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine, 或将 TDengine 中指定数据库的数据(批量或实时)同步到 Kafka。
什么是 Kafka Connect?
Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。
TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。
什么是 Confluent?
Confluent 在 Kafka 的基础上增加很多扩展功能。包括:
- Schema Registry
- REST 代理
- 非 Java 客户端
- 很多打包好的 Kafka Connect 插件
- 管理和监控 Kafka 的 GUI —— Confluent 控制中心
这些扩展功能有的包含在社区版本的 Confluent 中,有的只有企业版能用。
Confluent 企业版提供了 confluent
命令行工具管理各个组件。
前置条件
运行本教程中示例的前提条件。
- Linux 操作系统
- 已安装 Java 8 和 Maven
- 已安装 Git
- 已安装并启动 TDengine。如果还没有可参考安装和卸载
安装 Confluent
Confluent 提供了 Docker 和二进制包两种安装方式。本文仅介绍二进制包方式安装。
在任意目录下执行:
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz
tar xzf confluent-7.1.1.tar.gz -C /opt/test
然后需要把 $CONFLUENT_HOME/bin
目录加入 PATH。
export CONFLUENT_HOME=/opt/confluent-7.1.1
PATH=$CONFLUENT_HOME/bin
export PATH
以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile)
安装完成之后,可以输入confluent version
做简单验证:
# confluent version
confluent - Confluent CLI
Version: v2.6.1
Git Ref: 6d920590
Build Date: 2022-02-18T06:14:21Z
Go Version: go1.17.6 (linux/amd64)
Development: false
安装 TDengine Connector 插件
从源码安装
git clone --branch master https://github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d $CONFLUENT_HOME/share/java/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 target/components/packages/
目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径: $CONFLUENT_HOME/share/java/
。
用 confluent-hub 安装
Confluent Hub 提供下载 Kafka Connect 插件的服务。在 TDengine Kafka Connector 发布到 Confluent Hub 后可以使用命令工具 confluent-hub
安装。
TDengine Kafka Connector 目前没有正式发布,不能用这种方式安装。
启动 Confluent
confluent local services start
一定要先安装插件再启动 Confluent, 否则加载插件会失败。
若某组件启动失败,可尝试清空数据,重新启动。数据目录在启动时将被打印到控制台,比如 :
Using CONFLUENT_CURRENT: /tmp/confluent.106668
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
清空数据可执行 rm -rf /tmp/confluent.106668
。
验证各个组件是否启动成功
输入命令:
confluent local services status
如果各组件都启动成功,会得到如下输出:
Connect is [UP]
Control Center is [UP]
Kafka is [UP]
Kafka REST is [UP]
ksqlDB Server is [UP]
Schema Registry is [UP]
ZooKeeper is [UP]
验证插件是否安装成功
在 Kafka Connect 组件完全启动后,可用以下命令列出成功加载的插件:
confluent local services connect plugin list
如果成功安装,会输出如下:
Available Connect Plugins:
[
{
"class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"type": "sink",
"version": "1.0.0"
},
{
"class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
"type": "source",
"version": "1.0.0"
},
......
如果插件安装失败,请检查 Kafka Connect 的启动日志是否有异常信息,用以下命令输出日志路径:
echo `cat /tmp/confluent.current`/connect/connect.stdout
该命令的输出类似: /tmp/confluent.104086/connect/connect.stdout
。
与日志文件 connect.stdout
同一目录,还有一个文件名为: connect.properties
。在这个文件的末尾,可以看到最终生效的 plugin.path
, 它是一系列用逗号分割的路径。如果插件安装失败,很可能是因为实际的安装路径不包含在 plugin.path
中。
TDengine Sink Connector 的使用
TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字(见配置参数 connection.database), 也可按一定规则生成(见配置参数 connection.database.prefix)。
TDengine Sink Connector 内部使用 TDengine 无模式写入接口写数据到 TDengine,目前支持三种格式的数据:InfluxDB 行协议格式、 OpenTSDB Telnet 协议格式 和 OpenTSDB JSON 协议格式。
下面的示例将主题 meters 的数据,同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。
添加配置文件
mkdir ~/test
cd ~/test
vi sink-demo.properties
sink-demo.properties 内容如下:
name=TDengineSinkConnector
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
data.precision=ns
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
关键配置说明:
topics=meters
和connection.database=power
, 表示订阅主 题 meters 的数据,并写入数据库 power。db.schemaless=line
, 表示使用 InfluxDB Line 协议格式的数据。
创建 Connector 实例
confluent local services connect connector load TDengineSinkConnector --config ./sink-demo.properties
若以上命令执行成功,则有如下输出:
{
"name": "TDengineSinkConnector",
"config": {
"connection.database": "power",
"connection.password": "taosdata",
"connection.url": "jdbc:TAOS://127.0.0.1:6030",
"connection.user": "root",
"connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"data.precision": "ns",
"db.schemaless": "line",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "meters",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "TDengineSinkConnector"
},
"tasks": [],
"type": "sink"
}
写入测试数据
准备测试数据的文本文件,内容如下:
meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000000
meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250000000
meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249000000
meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250000000
使用 kafka-console-producer 向主题 meters 添加测试数据。
cat test-data.txt | kafka-console-producer --broker-list localhost:9092 --topic meters
如果目标数据库 power 不存在,那么 TDengine Sink Connector 会自动创建数据库。自动创建数据库使用的时间精度为纳秒,这就要求写入数据的时间戳精度也是纳秒。如果写入数据的时间戳精度不是纳秒,将会抛异常。
验证同步是否成功
使用 TDengine CLI 验证同步是否成功。
taos> use power;
Database changed.
taos> select * from meters;
ts | current | voltage | phase | groupid | location |
===============================================================================================================================================================
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles |
Query OK, 4 row(s) in set (0.004208s)
若看到了以上数据,则说明同步成功。若没有,请检查 Kafka Connect 的日志。配置参数的详细说明见配置参考。
TDengine Source Connector 的使用
TDengine Source Connector 的作用是将 TDengine 某个数据库某一时刻之后的数据全部推送到 Kafka。TDengine Source Connector 的实现原理是,先分批拉取历史数据,再用定时查询的策略同步增量数据。同时会监控表的变化,可以自动同步新增的表。如果重启 Kafka Connect, 会从上次中断的位置继续同步。
TDengine Source Connector 会将 TDengine 数据表中的数据转换成 InfluxDB Line 协议格式 或 OpenTSDB JSON 协议格式, 然后写入 Kafka。
下面的示例程序同步数据库 test 中的数据到主题 tdengine-source-test。