Flink
Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批一体化处理框架,可用于流处理、批处理、复杂事件处理、实时数据仓库构建及为机器学习提供实时数据支持等诸多大数据处理场景。与此同时,Flink 拥有丰富的连接器与各类工具,可对接众多不同类型的数据源实现数据的读取与写入。在数据处理的过程中,Flink 还提供了一系列可靠的容错机制,有力保障任务即便遭遇意外状况,依然能稳定、持续运行。
借助 TDengine 的 Flink 连接器,Apache Flink 得以与 TDengine 数据库无缝对接,可以从 TDengine 数据库中快速、稳定地读取海量数据,并在此基础上进行全面、深入的分析处理,充分挖掘数据的潜在价值,为企业的决策制定提供有力的数据支持和科学依据,极大地提升数据处理的效率和质量,增强企业在数字化时代的竞争力和创新能力。
注意:本功能仅适用于 TDengine 企业版。
安装和配置 Flink 服务器
安装 Apache Flink v1.19.0 或以上版本。详细请参考 官方文档。
确认企业版服务正常
- 确认 taosd 服务正常;
- 确认 taosAdapter 服务正常;
版本历史
| Flink Connector 版本 | 主要变化 | TDengine TSDB-Enterprise 版本 |
|---|---|---|
| 2.1.4 | 升级 jdbc 驱动到 3.7.3 版本,详细参见 JDBC 历史版本 | - |
| 2.1.3 | 增加数据转换异常信息输出 | - |
| 2.1.2 | 写入字段增加反引号过滤 | - |
| 2.1.1 | 修复 Stmt 相同表的数据绑定失败问题 | - |
| 2.1.0 | 修复不同数据源 varchar 类型写入问题 | - |
| 2.0.2 | Table Sink 支持 RowKind.UPDATE_BEFORE、RowKind.UPDATE_AFTER 和 RowKind.DELETE 类型 | - |
| 2.0.1 | Sink 支持对所有继承自 RowData 并已实现的类型进行数据写入 | - |
| 2.0.0 | 1. Sink 支持自定义数据结构序列化,写入 TDengine TSDB 2. 支持 Table SQL 方式写入 TDengine TSDB 数据库 | 3.3.5.1 及以上版本 |
| 1.0.0 | 支持 Sink 功能,将来着其他数据源的数据写入到 TDengine TSDB | 3.3.2.0 及以上版本 |
异常和错误码
在任务执行失败后,查看 Flink 任务执行日志确认失败原因
具体的错误码请参考:
| Error Code | Description | Suggested Actions |
|---|---|---|
| 0xa000 | connection param error | 连接器参数错误。 |
| 0xa010 | database name configuration error | 数据库名配置错误。 |
| 0xa011 | table name configuration error | 表名配置错误。 |
| 0xa013 | value.deserializer parameter not set | 未设置序列化方式。 |
| 0xa014 | list of column names for target table not set | 未设置目标表的列名列表。 |
| 0x2301 | connection already closed | 连接已经关闭,检查连接情况,或重新创建连接去执行相关指令。 |
| 0x2302 | this operation is NOT supported currently! | 当前使用接口不支持,可以更换其他连接方式。 |
| 0x2303 | invalid variables | 参数不合法,请检查相应接口规范,调整参数类型及大小。 |
| 0x2304 | statement is closed | statement 已经关闭,请检查 statement 是否关闭后再次使用,或是连接是否正常。 |
| 0x2305 | resultSet is closed | resultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。 |
| 0x230d | parameter index out of range | 参数越界,请检查参数的合理范围。 |
| 0x230e | connection already closed | 连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。 |
| 0x230f | unknown sql type in TDengine | 请检查 TDengine TSDB 支持的 Data Type 类型。 |
| 0x2315 | unknown taos type in TDengine | 在 TDengine TSDB 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine TSDB 数据类型。 |
| 0x2319 | user is required | 创建连接时缺少用户名信息。 |
| 0x231a | password is required | 创建连接时缺少密码信息。 |
| 0x231d | can't create connection with server within | 通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。 |
| 0x231e | failed to complete the task within the specified time | 通过增加参数 messageWaitTimeout 增加执行耗时,或是请检查与 taosAdapter 之间的连接情况。 |
| 0x2352 | Unsupported encoding | 本地连接下指定了不支持的字符编码集。 |
| 0x2353 | internal error of database, please see taoslog for more details | 本地连接执行 prepareStatement 时出现错误,请检查 taos log 进行问题定位。 |
| 0x2354 | connection is NULL | 本地连接执行命令时,Connection 已经关闭。请检查与 TDengine TSDB 的连接情况。 |
| 0x2355 | result set is NULL | 本地连接获取结果集,结果集异常,请检查连接情况,并重试。 |
| 0x2356 | invalid num of fields | 本地连接获取结果集的 meta 信息不匹配。 |
数据类型映射
TDengine TSDB 目前支持时间戳、数字、字符、布尔类型,与 Flink RowData Type 对应类型转换如下:
| TDengine TSDB DataType | Flink RowDataType |
|---|---|
| TIMESTAMP | TimestampData |
| INT | Integer |
| BIGINT | Long |
| FLOAT | Float |
| DOUBLE | Double |
| SMALLINT | Short |
| TINYINT | Byte |
| BOOL | Boolean |
| VARCHAR | StringData |
| BINARY | StringData |
| NCHAR | StringData |
| JSON | StringData |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
使用说明
Flink 语义选择说明
采用 At-Least-Once(至少一次)语义原因:
- TDengine TSDB 目前不支持事务,不能进行频繁的检查点操作和复杂的事务协调。
- 由于 TDengine TSDB 采用时间戳作为主键,重复数据下游算子可以进行过滤操作,避免重复计算。
- 采用 At-Least-Once(至少一次)确保达到较高的数据处理的性能和较低的数据延时,设置方式如下:
使用方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
如果使用 Maven 管理项目,只需在 pom.xml 中加入以下依赖。
<dependency>
<groupId>com.taosdata.flink</groupId>
<artifactId>flink-connector-tdengine</artifactId>
<version>2.1.4</version>
</dependency>
连接参数
建立连接的参数有 URL 和 Properties。
URL 规范格式为:jdbc:TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]
参数说明:
- user:登录 TDengine 用户名,默认值 'root'
- password:用户登录密码,默认值 'taosdata'
- database_name: 数据库名称。
- timezone: 时区设置。
- httpConnectTimeout: 连接超时时间,单位 ms,默认值为 60000。
- messageWaitTimeout: 消息超时时间,单位 ms,默认值为 60000。
- useSSL: 连接中是否使用 SSL。
数据准备
通过命令行工具 taos 或管理界面 Explorer 执行 SQL 语句,创建数据库,超级表,主题,并写入数据,供下一步订阅使用。以下为简单示例:
create database db vgroups 1;
create table db.meters (ts timestamp, f1 int) tags(t1 int);
create topic topic_meters as select ts, tbname, f1, t1 from db.meters;
insert into db.tb using db.meters tags(1) values(now, 1);
Source
Source 拉取 TDengine 数据库中的数据,并将获取到的数据转换为 Flink 内部可处理的格式和类型,并以并行的方式进行读取和分发,为后续的数据处理提供高效的输入。 通过设置数据源的并行度,实现多个线程并行地从数据源中读取数据,提高数据读取的效率和吞吐量,充分利用集群资源进行大规模数据处理能力。
Properties 中配置参数如下:
- TDengineConfigParams.PROPERTY_KEY_USER:登录 TDengine 用户名,默认值 'root'
- TDengineConfigParams.PROPERTY_KEY_PASSWORD:用户登录密码,默认值 'taosdata'
- TDengineConfigParams.VALUE_DESERIALIZER:下游算子接收结果集反序列化方法,如果接收结果集类型是
Flink的RowData,仅需要设置为RowData即可。也可继承TDengineRecordDeserialization并实现convert和getProducedType方法,根据SQL的ResultSet自定义反序列化方式 - TDengineConfigParams.TD_BATCH_MODE:此参数用于批量将数据推送给下游算子,如果设置为 True,创建
TDengineSource对象时需要指定数据类型为SourceRecords类型的泛型形式 - TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间,单位 ms,默认值为 60000。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。true: 启用,false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在
PROPERTY_KEY_ENABLE_AUTO_RECONNECT为 true 时生效。 - TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3,仅在
PROPERTY_KEY_ENABLE_AUTO_RECONNECT为 true 时生效。 - TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证。true: 启用,false: 不启用。默认为 false。
按时间分片
用户可以对查询的 SQL 按照时间拆分为多个子任务,输入:开始时间,结束时间,拆分间隔,时间字段名称,系统会按照设置的间隔(时间左闭右开)进行拆分并行获取数据
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
按超级表 TAG 分片
用户可以按照超级表的 TAG 字段将查询的 SQL 拆分为多个查询条件,系统会以一个查询条件对应一个子任务的方式对其进行拆分,进而并行获取数据。
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, current, voltage, phase, groupid, location from meters where voltage > 100")
.setTagList(Arrays.asList("groupid >100 and location = 'Shanghai'",
"groupid >50 and groupid < 100 and location = 'Guangzhou'",
"groupid >0 and groupid < 50 and location = 'Beijing'"))
.setSplitType(SplitType.SPLIT_TYPE_TAG);
按表名分片
支持输入多个相同表结构的超级表或普通表进行分片,系统会按照一个表一个任务的方式进行拆分,进而并行获取数据。
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSelect("ts, current, voltage, phase, groupid, location")
.setTableList(Arrays.asList("d1001", "d1002"))
.setOther("order by ts limit 100")
.setSplitType(SplitType.SPLIT_TYPE_TABLE);
使用 Source 连接器
查询结果为 RowData 数据类型示例:
RowData Source
static void testSource() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
TDengineSource<RowData> source = new TDengineSource<>(connProps, sql, RowData.class);
DataStreamSource<RowData> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTimestamp(0, 0) +
", current: " + rowData.getFloat(1) +
", voltage: " + rowData.getInt(2) +
", phase: " + rowData.getFloat(3) +
", location: " + rowData.getString(4).toString());
sb.append("\n");
return sb.toString();
});
resultStream.print();
env.execute("tdengine flink source");
}
批量查询结果示例:
Batch Source
void testBatchSource() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
Class<SourceRecords<RowData>> typeClass = (Class<SourceRecords<RowData>>) (Class<?>) SourceRecords.class;
SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters");
TDengineSource<SourceRecords<RowData>> source = new TDengineSource<>(connProps, sql, typeClass);
DataStreamSource<SourceRecords<RowData>> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<SourceRecords<RowData>, String>) records -> {
StringBuilder sb = new StringBuilder();
Iterator<RowData> iterator = records.iterator();
while (iterator.hasNext()) {
GenericRowData row = (GenericRowData) iterator.next();
sb.append("ts: " + row.getTimestamp(0, 0) +
", current: " + row.getFloat(1) +
", voltage: " + row.getInt(2) +
", phase: " + row.getFloat(3) +
", location: " + rowData.getString(4).toString());
sb.append("\n");
totalVoltage.addAndGet(row.getInt(2));
}
return sb.toString();
});
resultStream.print();
env.execute("flink tdengine source");
}
查询结果为自定义数据类型示例:
Custom Type Source
void testCustomTypeSource() throws Exception {
System.out.println("testTDengineSourceByTimeSplit start!");
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultSourceDeserialization");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
//按照时间分片
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
TDengineSource<ResultBean> source = new TDengineSource<>(connProps, splitSql, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() +
", current: " + rowData.getCurrent() +
", voltage: " + rowData.getVoltage() +
", phase: " + rowData.getPhase() +
", groupid: " + rowData.getGroupid() +
", location" + rowData.getLocation() +
", tbname: " + rowData.getTbname());
sb.append("\n");
totalVoltage.addAndGet(rowData.getVoltage());
return sb.toString();
});
resultStream.print();
env.execute("flink tdengine source");
}
- ResultBean 自定义的一个内部类,用于定义 Source 查询结果的数据类型。
- ResultSourceDeserialization 是自定义的一个内部类,通过继承
TDengineRecordDeserialization并实现convert和getProducedType方法。
CDC 数据订阅
Flink CDC 主要用于提供数据订阅功能,能实时监控 TDengine 数据库的数据变化,并将这些变更以数据流形式传输到 Flink 中进行处理,同时确保数据的一致性和完整性。
Properties 中配置参数如下:
- TDengineCdcParams.BOOTSTRAP_SERVERS:TDengine 服务端所在的
ip:port,如果使用WebSocket连接,则为 taosAdapter 所在的ip:port - TDengineCdcParams.CONNECT_USER:登录 TDengine 用户名,默认值 'root'
- TDengineCdcParams.CONNECT_PASS:用户登录密码,默认值 'taosdata'
- TDengineCdcParams.POLL_INTERVAL_MS:拉取数据间隔,默认 500ms
- TDengineCdcParams.VALUE_DESERIALIZER:结果集反序列化方法,如果接收结果集类型是
Flink的RowData,仅需要设置为RowData即可。可以继承com.taosdata.jdbc.tmq.ReferenceDeserializer,并指定结果集 bean,实现反序列化 - TDengineCdcParams.TMQ_BATCH_MODE:此参数用于批量将数据推送给下游算子,如果设置为 True,创建
TDengineCdcSource对象时需要指定数据类型为ConsumerRecords类型的泛型形式 - TDengineCdcParams.GROUP_ID:消费组 ID,同一消费组共享消费进度。最大长度:192
- TDengineCdcParams.AUTO_OFFSET_RESET:消费组订阅的初始位置(
earliest从头开始订阅,latest仅从最新数据开始订阅,默认latest) - TDengineCdcParams.ENABLE_AUTO_COMMIT:是否启用消费位点自动提交,true: 自动提交;false:依赖
checkpoint时间来提交,默认 false
注意:自动提交模式 reader 获取完成数据后自动提交,不管下游算子是否正确的处理了数据,存在数据丢失的风险,主要用于为了追求高效的无状态算子场景或是数据一致性要求不高的场景
- TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS:消费记录自动提交消费位点时间间隔,单位为毫秒。默认值为 5000, 此参数在
ENABLE_AUTO_COMMIT为 true 生效 - TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION:传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT:是否启用自动重连。true: 启用,false: 不启用。默认为 false
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS:自动重连重试间隔,单位毫秒,默认值 2000。仅在
PROPERTY_KEY_ENABLE_AUTO_RECONNECT为 true 时生效 - TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT:自动重连重试次数,默认值 3,仅在
PROPERTY_KEY_ENABLE_AUTO_RECONNECT为 true 时生效 - TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS:
consumer心跳丢失后超时时间,超时后会触发rebalance逻辑,成功后该consumer会被删除(从 3.3.3.0 版本开始支持),默认值为 12000,取值范围 [6000, 1800000] - TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS:
consumer poll拉取数据间隔的最长时间,超过该时间,会认为该consumer离线,触发rebalance逻辑,成功后该consumer会被删除。默认值为 300000,[1000,INT32_MAX]
使用 CDC 连接器
CDC 连接器会根据用户设置的并行度进行创建 consumer,因此用户根据资源情况合理设置并行度。
订阅结果为 RowData 数据类型示例:
CDC Source
void testTDengineCdc() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(100, AT_LEAST_ONCE);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.ENABLE_AUTO_COMMIT, "true");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<RowData> tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class);
DataStreamSource<RowData> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("tsxx: " + rowData.getTimestamp(0, 0) +
", current: " + rowData.getFloat(1) +
", voltage: " + rowData.getInt(2) +
", phase: " + rowData.getFloat(3) +
", location: " + rowData.getString(4).toString());
sb.append("\n");
totalVoltage.addAndGet(rowData.getInt(2));
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
// The task submitted by Flink UI cannot be cancle and needs to be stopped on the UI page.
jobClient.cancel().get();
}
将订阅结果批量下发到算子的示例:
CDC Batch Source
void testTDengineCdcBatch() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true");
Class<ConsumerRecords<RowData>> typeClass = (Class<ConsumerRecords<RowData>>) (Class<?>) ConsumerRecords.class;
TDengineCdcSource<ConsumerRecords<RowData>> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass);
DataStreamSource<ConsumerRecords<RowData>> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ConsumerRecords<RowData>, String>) records -> {
Iterator<ConsumerRecord<RowData>> iterator = records.iterator();
StringBuilder sb = new StringBuilder();
while (iterator.hasNext()) {
GenericRowData row = (GenericRowData) iterator.next().value();
sb.append("tsxx: " + row.getTimestamp(0, 0) +
", current: " + row.getFloat(1) +
", voltage: " + row.getInt(2) +
", phase: " + row.getFloat(3) +
", location: " + rowData.getString(4).toString());
sb.append("\n");
totalVoltage.addAndGet(row.getInt(2));
}
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
jobClient.cancel().get();
}
订阅结果为自定义数据类型示例:
CDC Custom Type
static void testCustomTypeCdc() throws Exception {
System.out.println("testCustomTypeTDengineCdc start!");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(100, AT_LEAST_ONCE);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(4);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultDeserializer");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<ResultBean> tdengineSource = new TDengineCdcSource<>("topic_meters", config, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() +
", current: " + rowData.getCurrent() +
", voltage: " + rowData.getVoltage() +
", phase: " + rowData.getPhase() +
", groupid: " + rowData.getGroupid() +
", location" + rowData.getLocation() +
", tbname: " + rowData.getTbname());
sb.append("\n");
totalVoltage.addAndGet(rowData.getVoltage());
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
jobClient.cancel().get();
}
- ResultBean 是自定义的一个内部类,其字段名和数据类型与列的名称和数据类型一一对应,这样根据
TDengineCdcParams.VALUE_DESERIALIZER属性对应的反序列化类可以反序列化出 ResultBean 类型的对象。
Table SQL
使用 Table SQL 的方式从多个不同的数据源数据库(如 TDengine、MySQL、Oracle 等)中提取数据后,再进行自定义的算子操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果加载到目标数据源(如 TDengine、Mysql 等)中。
Table Source 连接器
参数配置说明:
| 参数名称 | 类型 | 参数说明 |
|---|---|---|
| connector | string | 连接器标识,设置 tdengine-connector |
| td.jdbc.url | string | 连接的 url |
| td.jdbc.mode | string | 连接器类型,设置 source、sink |
| table.name | string | 原表或目标表名称 |
| scan.query | string | 获取数据的 SQL 语句 |
| sink.db.name | string | 目标数据库名称 |
| sink.supertable.name | string | 写入的超级表名称 |
| sink.batch.size | integer | 写入的批大小 |
| sink.table.name | string | 写入的普通表或子表名称 |
使用示例:
将 power 库的 meters 表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
Table Source
static void testTableToSink() throws Exception {
System.out.println("testTableToSink start!");
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String tdengineSourceTableDDL = "CREATE TABLE `meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata'," +
" 'td.jdbc.mode' = 'source'," +
" 'table-name' = 'meters'," +
" 'scan.query' = 'SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`'" +
")";
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tableEnv.executeSql(tdengineSourceTableDDL);
tableEnv.executeSql(tdengineSinkTableDDL);
tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`");
}
Table CDC 连接器
参数配置说明:
| 参数名称 | 类型 | 参数说明 |
|---|---|---|
| connector | string | 连接器标识,设置 tdengine-connector |
| user | string | 用户名,默认 root |
| password | string | 密码,默认 taosdata |
| bootstrap.servers | string | 服务器地址 |
| topic | string | 订阅主题 |
| td.jdbc.mode | string | 连接器类型,cdc、sink |
| group.id | string | 消费组 ID,同一消费组共享消费进度 |
| auto.offset.reset | string | 消费组订阅的初始位置earliest: 从头开始订阅latest: 仅从最新数据开始订阅默认 latest |
| poll.interval_ms | integer | 拉取数据间隔,默认 500ms |
| sink.db.name | string | 目标数据库名称 |
| sink.supertable.name | string | 写入的超级表名称 |
| sink.batch.size | integer | 写入的批大小 |
| sink.table.name | string | 写入的普通表或子表名称 |
使用示例:
订阅 power 库的 meters 超级表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
Table CDC
static void testCdcTableToSink() throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String tdengineSourceTableDDL = "CREATE TABLE `meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'bootstrap.servers' = 'localhost:6041'," +
" 'td.jdbc.mode' = 'cdc'," +
" 'group.id' = 'group_22'," +
" 'auto.offset.reset' = 'earliest'," +
" 'enable.auto.commit' = 'false'," +
" 'topic' = 'topic_meters'" +
")";
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tableEnv.executeSql(tdengineSourceTableDDL);
tableEnv.executeSql(tdengineSinkTableDDL);
TableResult tableResult = tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`");
Thread.sleep(5000L);
tableResult.getJobClient().get().cancel().get();
}







