跳到主要内容

TDengine Flink Connector

Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批一体化处理框架,可用于流处理、批处理、复杂事件处理、实时数据仓库构建及为机器学习提供实时数据支持等诸多大数据处理场景。与此同时,Flink 拥有丰富的连接器与各类工具,可对接众多不同类型的数据源实现数据的读取与写入。在数据处理的过程中,Flink 还提供了一系列可靠的容错机制,有力保障任务即便遭遇意外状况,依然能稳定、持续运行。

借助 TDengine 的 Flink 连接器,Apache Flink 得以与 TDengine 数据库无缝对接,能够将来自不同数据源的数据经过复杂运算和深度分析后所得到的结果精准存入 TDengine 数据库,实现数据的高效存储与管理;为后续进行全面、深入的分析处理,充分挖掘数据的潜在价值,为企业的决策制定提供有力的数据支持和科学依据,极大地提升数据处理的效率和质量,增强企业在数字化时代的竞争力和创新能力。

前置条件

准备以下环境:

  • TDengine 服务已部署并正常运行(企业及社区版均可)
  • taosAdapter 能够正常运行。详细参考 taosAdapter 使用手册
  • Apache Flink v1.19.0 或以上版本已安装。安装 Apache Flink 请参考 官方文档

支持的平台

Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。

版本历史

Flink Connector 版本主要变化TDengine 版本
2.1.0修复不同数据源varchar类型写入问题-
2.0.2Table Sink 支持 RowKind.UPDATE_BEFORE、RowKind.UPDATE_AFTER 和 RowKind.DELETE 类型-
2.0.1Sink 支持对所有继承自 RowData 并已实现的类型进行数据写入-
2.0.01. Sink 支持自定义数据结构序列化,写入 TDengine
2. 支持 Table SQL 方式写入 TDengine 数据库
3.3.5.1 及以上版本
1.0.0支持 Sink 功能,将来着其他数据源的数据写入到 TDengine3.3.2.0 及以上版本

异常和错误码

在任务执行失败后,查看 Flink 任务执行日志确认失败原因

具体的错误码请参考:

Error CodeDescriptionSuggested Actions
0xa000connection param error连接器参数错误。
0xa010database name configuration error数据库名配置错误。
0xa011table name configuration error表名配置错误。
0xa013value.deserializer parameter not set未设置序列化方式。
0xa014list of column names for target table not set未设置目标表的列名列表。
0x2301connection already closed连接已经关闭,检查连接情况,或重新创建连接去执行相关指令。
0x2302this operation is NOT supported currently!当前使用接口不支持,可以更换其他连接方式。
0x2303invalid variables参数不合法,请检查相应接口规范,调整参数类型及大小。
0x2304statement is closedstatement 已经关闭,请检查 statement 是否关闭后再次使用,或是连接是否正常。
0x2305resultSet is closedresultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。
0x230dparameter index out of range参数越界,请检查参数的合理范围。
0x230econnection already closed连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。
0x230funknown sql type in TDengine请检查 TDengine 支持的 Data Type 类型。
0x2315unknown taos type in TDengine在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。
0x2319user is required创建连接时缺少用户名信息。
0x231apassword is required创建连接时缺少密码信息。
0x231dcan't create connection with server within通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。
0x231efailed to complete the task within the specified time通过增加参数 messageWaitTimeout 增加执行耗时,或是请检查与 taosAdapter 之间的连接情况。
0x2352Unsupported encoding本地连接下指定了不支持的字符编码集。
0x2353internal error of database, please see taoslog for more details本地连接执行 prepareStatement 时出现错误,请检查 taos log 进行问题定位。
0x2354connection is NULL本地连接执行命令时,Connection 已经关闭。请检查与 TDengine 的连接情况。
0x2355result set is NULL本地连接获取结果集,结果集异常,请检查连接情况,并重试。
0x2356invalid num of fields本地连接获取结果集的 meta 信息不匹配。

数据类型映射

TDengine 目前支持时间戳、数字、字符、布尔类型,与 Flink RowData Type 对应类型转换如下:

TDengine DataTypeFlink RowDataType
TIMESTAMPTimestampData
INTInteger
BIGINTLong
FLOATFloat
DOUBLEDouble
SMALLINTShort
TINYINTByte
BOOLBoolean
VARCHARStringData
BINARYStringData
NCHARStringData
JSONStringData
VARBINARYbyte[]
GEOMETRYbyte[]

使用说明

采用 At-Least-Once(至少一次)语义原因:

  • TDengine 目前不支持事务,不能进行频繁的检查点操作和复杂的事务协调。
  • 由于 TDengine 采用时间戳作为主键,重复数据下游算子可以进行过滤操作,避免重复计算。
  • 采用 At-Least-Once(至少一次)确保达到较高的数据处理的性能和较低的数据延时,设置方式如下:

使用方式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

如果使用 Maven 管理项目,只需在 pom.xml 中加入以下依赖。

<dependency>
<groupId>com.taosdata.flink</groupId>
<artifactId>flink-connector-tdengine</artifactId>
<version>2.1.0</version>
</dependency>

连接参数

建立连接的参数有 URL 和 Properties。
URL 规范格式为: jdbc:TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]

参数说明:

  • user:登录 TDengine 用户名,默认值 'root'。
  • password:用户登录密码,默认值 'taosdata'。
  • database_name: 数据库名称。
  • timezone: 时区设置。
  • httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 60000。
  • messageWaitTimeout: 消息超时时间,单位 ms, 默认值为 60000。
  • useSSL: 连接中是否使用 SSL。

Sink

Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自不同数据源或算子的数据写入 TDengine。在这一过程中 TDengine 所具备的高效写入机制发挥了至关重要的作用,有力保障了数据的快速和稳定存储。

备注
  • 写入的数据库必须已经创建。
  • 写入的超级表/普通表必须已经创建。

Properties 中配置参数如下:

  • TDengineConfigParams.PROPERTY_KEY_USER:登录 TDengine 用户名,默认值 'root'。
  • TDengineConfigParams.PROPERTY_KEY_PASSWORD:用户登录密码,默认值 'taosdata'。
  • TDengineConfigParams.PROPERTY_KEY_DBNAME:写入的数据库名称。
  • TDengineConfigParams.TD_SUPERTABLE_NAME:写入的超级表名称。写入的数据必须有 tbname 字段,确定写入那张子表。
  • TDengineConfigParams.TD_TABLE_NAME:写入子表或普通表的表名,此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可。
  • TDengineConfigParams.VALUE_DESERIALIZER:接收结果集反序列化方法, 如果接收结果集类型是 Flink 的 RowData,仅需要设置为 RowData即可。也可继承 TDengineSinkRecordSerializer 并实现 serialize 方法,根据 接收的数据类型自定义反序列化方式。
  • TDengineConfigParams.TD_BATCH_SIZE:设置一次写入 TDengine 数据库的批大小 | 当到达批的数量后进行写入,或是一个 checkpoint 的时间也会触发写入数据库。
  • TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms, 默认值为 60000。
  • TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false。
  • TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。true: 启用,false: 不启用。默认为 false。
  • TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
  • TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3,仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
  • TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。true: 启用,false: 不启用。默认为 false。

使用示例:

RowData 类型的数据写入 power_sink 库的 sink_meters 超级表对应的子表中。

RowData Into Super Table
static void testRowDataToSuperTable() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
RowData[] rowDatas = new GenericRowData[10];
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < 10; i++) {
GenericRowData row = new GenericRowData(7);
long current = System.currentTimeMillis() + i * 1000;
row.setField(0, TimestampData.fromEpochMillis(current)); // ts
row.setField(1, random.nextFloat() * 30); // current
row.setField(2, 300 + (i + 1)); // voltage
row.setField(3, random.nextFloat()); // phase
row.setField(4, StringData.fromString("location_" + i)); // location
row.setField(5, i); // groupid
row.setField(6, StringData.fromString("d0" + i)); // tbname
rowDatas[i] = row;
}
DataStream<RowData> dataStream = env.fromElements(RowData.class, rowDatas);

Properties sinkProps = new Properties();
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
sinkProps.setProperty(TDengineConfigParams.PROPERTY_KEY_DBNAME, "power_sink");
sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters");
sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata");
sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000");

TDengineSink<RowData> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname"));
dataStream.sinkTo(sink);
env.execute("flink tdengine sink");
}

查看源码

使用示例

RowData 类型的数据写入 power_sink 库的 sink_normal 普通表表中。

RowData Into Normal Table
static void testRowDataToNormalTable() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
RowData[] rowDatas = new GenericRowData[10];
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < 10; i++) {
GenericRowData row = new GenericRowData(4);
long current = System.currentTimeMillis() + i * 1000;
row.setField(0, TimestampData.fromEpochMillis(current)); // ts
row.setField(1, random.nextFloat() * 30); // current
row.setField(2, 300 + (i + 1)); // voltage
row.setField(3, random.nextFloat()); // phase
rowDatas[i] = row;
}
DataStream<RowData> dataStream = env.fromElements(RowData.class, rowDatas);

Properties sinkProps = new Properties();
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
sinkProps.setProperty(TDengineConfigParams.PROPERTY_KEY_DBNAME, "power_sink");
sinkProps.setProperty(TDengineConfigParams.TD_TABLE_NAME, "sink_normal");
sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata");
sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000");

TDengineSink<RowData> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase"));
dataStream.sinkTo(sink);
env.execute("flink tdengine sink");
}

查看源码

使用示例:

将自定义类型的数据写入 power_sink 库的 sink_meters 超级表对应的子表中。

CustomType Into Super Table
static void testCustomTypeToSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ResultBean[] rowDatas = new ResultBean[10];
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < 10; i++) {
ResultBean rowData = new ResultBean();
long current = System.currentTimeMillis() + i * 1000;
rowData.setTs(new Timestamp(current));
rowData.setCurrent(random.nextFloat() * 30);
rowData.setVoltage(300 + (i + 1));
rowData.setPhase(random.nextFloat());
rowData.setLocation("location_" + i);
rowData.setGroupid(i);
rowData.setTbname("d0" + i);
rowDatas[i] = rowData;
}

DataStream<ResultBean> dataStream = env.fromElements(ResultBean.class, rowDatas);

Properties sinkProps = new Properties();
sinkProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
sinkProps.setProperty(TDengineConfigParams.PROPERTY_KEY_CHARSET, "UTF-8");
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultBeanSinkSerializer");
sinkProps.setProperty(TDengineConfigParams.PROPERTY_KEY_DBNAME, "power_sink");
sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters");
sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata");
sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000");

TDengineSink<ResultBean> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname"));
dataStream.sinkTo(sink);
env.execute("flink tdengine sink");
}

查看源码

备注
  • ResultBean 自定义的一个内部类,用于定义写入字段的数据类型。
  • ResultBeanSinkSerializer 是自定义的一个内部类,通过继承 TDengineRecordDeserialization 并实现 serialize 方法完成序列化。

Table Sink

使用 Flink Table 的方式从多个不同的数据源数据库(如 MySQL、Oracle, Kafka 等)中提取数据后, 再进行自定义的算子操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果写入到 TDengine 中。

参数配置说明:

参数名称类型参数说明
connectorstring连接器标识,设置 tdengine-connector
td.jdbc.urlstring连接的 url 。
td.jdbc.modestring连接器类型, 设置 sink
sink.db.namestring目标数据库名称。
sink.batch.sizeinteger写入的批大小。
sink.supertable.namestring写入的超级表名称。
sink.table.namestring写入的普通表或子表名称。

使用示例:

通过 SQL 语句写入 power_sink 库的 sink_meters 超级表对应的子表中。

Table SQL Into Super Table
static void testTableSqlToSink() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);

String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tEnv.executeSql(tdengineSinkTableDDL);

String insertQuery = "INSERT INTO sink_meters " +
"VALUES " +
"(CAST('2024-12-19 19:12:45' AS TIMESTAMP(6)), 50.30000, 201, 3.31003, 'California.SanFrancisco', 1, 'd1001')," +
"(CAST('2024-12-19 19:12:46' AS TIMESTAMP(6)), 82.60000, 202, 0.33000, 'California.SanFrancisco', 1, 'd1001')," +
"(CAST('2024-12-19 19:12:47' AS TIMESTAMP(6)), 92.30000, 203, 0.31000, 'California.SanFrancisco', 1, 'd1001')," +
"(CAST('2024-12-19 19:12:45' AS TIMESTAMP(6)), 50.30000, 204, 3.25003, 'Alabama.Montgomery', 2, 'd1002')," +
"(CAST('2024-12-19 19:12:46' AS TIMESTAMP(6)), 62.60000, 205, 0.33000, 'Alabama.Montgomery', 2, 'd1002')," +
"(CAST('2024-12-19 19:12:47' AS TIMESTAMP(6)), 72.30000, 206, 0.31000, 'Alabama.Montgomery', 2, 'd1002');";

TableResult tableResult = tEnv.executeSql(insertQuery);
tableResult.await();
}

查看源码

使用示例:

通过 SQL 语句写入 power_sink 库的 sink_normal 普通表中。

Table SQL Into Normal Table
static void testNormalTableSqlToSink() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);

String tdengineSinkTableDDL = "CREATE TABLE `sink_normal` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT"
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.table.name' = 'sink_normal'" +
")";
tEnv.executeSql(tdengineSinkTableDDL);

String insertQuery = "INSERT INTO sink_normal " +
"VALUES " +
"(CAST('2024-12-19 19:12:45' AS TIMESTAMP(6)), 50.30000, 201, 3.31003)," +
"(CAST('2024-12-19 19:12:46' AS TIMESTAMP(6)), 82.60000, 202, 0.33000)," +
"(CAST('2024-12-19 19:12:47' AS TIMESTAMP(6)), 92.30000, 203, 0.31000)," +
"(CAST('2024-12-19 19:12:45' AS TIMESTAMP(6)), 50.30000, 204, 3.25003)," +
"(CAST('2024-12-19 19:12:46' AS TIMESTAMP(6)), 62.60000, 205, 0.33000)," +
"(CAST('2024-12-19 19:12:47' AS TIMESTAMP(6)), 72.30000, 206, 0.31000);";

TableResult tableResult = tEnv.executeSql(insertQuery);
tableResult.await();
}

查看源码

使用示例:

Row 类型数据写入 power_sink 库的 sink_meters 超级表对应的子表中。

Table Row To Sink
static void testTableRowToSink() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);

String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tEnv.executeSql(tdengineSinkTableDDL);

int sum = 0;
String tbname = "d001";
int groupId = 1;
String location = "California.SanFrancisco";
List<Row> rowDatas = new ArrayList<>();
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < 50; i++) {
sum += 300 + (i + 1);
long timestampInMillis = System.currentTimeMillis() + i * 1000;
Row row = Row.of(
new Timestamp(timestampInMillis), // ts
random.nextFloat() * 30, // current
300 + (i + 1), // voltage
random.nextFloat(), // phase
location,
groupId,
tbname

);
rowDatas.add(row);
}

Table inputTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
DataTypes.FIELD("current", DataTypes.FLOAT()),
DataTypes.FIELD("voltage", DataTypes.INT()),
DataTypes.FIELD("phase", DataTypes.FLOAT()),
DataTypes.FIELD("location", DataTypes.STRING()),
DataTypes.FIELD("groupid", DataTypes.INT()),
DataTypes.FIELD("tbname", DataTypes.STRING())
),
rowDatas
);

TableResult result = inputTable.executeInsert("sink_meters");
result.await(); // waiting for task completion
}

查看源码