跳到主要内容

高效写入

本节介绍如何高效地向 TDengine 写入数据。

高效写入原理

客户端程序的角度

从客户端程序的角度来说,高效写入数据要考虑以下几个因素:

  1. 单次写入的数据量。一般来讲,每批次写入的数据量越大越高效(但超过一定阈值其优势会消失)。使用 SQL 写入 TDengine 时,尽量在一条 SQL 中拼接更多数据。目前,TDengine 支持的一条 SQL 的最大长度为 1,048,576(1MB)个字符
  2. 并发连接数。一般来讲,同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)
  3. 数据在不同表(或子表)之间的分布,即要写入数据的相邻性。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效
  4. 写入方式。一般来讲:
    • 参数绑定写入比 SQL 写入更高效。因参数绑定方式避免了 SQL 解析。
    • SQL 写入不自动建表比自动建表更高效。因自动建表要频繁检查表是否存在。
    • SQL 写入比无模式写入更高效。因无模式写入会自动建表且支持动态更改表结构。

客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。

数据源的角度

客户端程序通常需要从数据源读数据再写入 TDengine。从数据源角度来说,以下几种情况需要在读线程和写线程之间增加队列:

  1. 有多个数据源,单个数据源生成数据的速度远小于单线程写入的速度,但数据量整体比较大。此时队列的作用是把多个数据源的数据汇聚到一起,增加单次写入的数据量。
  2. 单个数据源生成数据的速度远大于单线程写入的速度。此时队列的作用是增加写入的并发度。
  3. 单张表的数据分散在多个数据源。此时队列的作用是将同一张表的数据提前汇聚到一起,提高写入时数据的相邻性。

如果写应用的数据源是 Kafka, 写应用本身即 Kafka 的消费者,则可利用 Kafka 的特性实现高效写入。比如:

  1. 将同一张表的数据写到同一个 Topic 的同一个 Partition,增加数据的相邻性
  2. 通过订阅多个 Topic 实现数据汇聚
  3. 通过增加 Consumer 线程数增加写入的并发度
  4. 通过增加每次 Fetch 的最大数据量来增加单次写入的最大数据量

服务器配置的角度

从服务端配置的角度,要根据系统中磁盘的数量,磁盘的 I/O 能力,以及处理器能力在创建数据库时设置适当的 vgroups 数量以充分发挥系统性能。如果 vgroups 过少,则系统性能无法发挥;如果 vgroups 过多,会造成无谓的资源竞争。常规推荐 vgroups 数量为 CPU 核数的 2 倍,但仍然要结合具体的系统资源配置进行调优。

更多调优参数,请参考 数据库管理服务端配置

高效写入示例

场景设计

下面的示例程序展示了如何高效写入数据,场景设计如下:

  • TDengine 客户端程序从其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源,同时提供了从 Kafka 拉取数据写入 TDengine 的示例。
  • 为了提高TDengine 客户端程序读取数据速度,使用多线程读取。为了避免乱序,多个读取线程读取数据对应的表集合应该不重叠。
  • 为了与每个数据读取线程读取数据的速度相匹配,后台启用一组写入线程与之对应,每个写入线程都有一个独占的固定大小的消息队列。

TDengine 高效写入示例场景的线程模型

示例代码

这一部分是针对以上场景的示例代码。对于其它场景高效写入原理相同,不过代码需要适当修改。

本示例代码假设源数据属于同一张超级表(meters)的不同子表。程序在开始写入数据之前已经在 test 库创建了这个超级表,以及对应的子表。如果实际场景是多个超级表,只需按需创建多个超级表和启动多组任务。

JDBC 高效写入特性简介

JDBC 驱动从 3.6.0 版本开始,在 WebSocket 连接上提供了高效写入特性,其配置参数请参考 高效写入配置。 JDBC 驱动高效写入特性有如下特点:

  • 支持 JDBC 标准参数绑定接口。
  • 在资源充分条件下,写入能力跟写入线程数配置线性相关。
  • 支持写入超时和连接断开重连后的重试次数和重试间隔配置。
  • 支持调用 executeUpdate 接口获取写入数据条数,若写入有异常,此时可捕获。

JDBC 高效写入使用说明

下面是一个简单的使用 JDBC 高效写入的例子,说明了高效写入相关的配置和接口。

public class WSHighVolumeDemo {

// modify host to your own
private static final String HOST = "127.0.0.1";
private static final int port = 6041;
private static final Random random = new Random(System.currentTimeMillis());
private static final int NUM_OF_SUB_TABLE = 10000;
private static final int NUM_OF_ROW = 10;

public static void main(String[] args) throws SQLException {

String url = "jdbc:TAOS-WS://" + HOST + ":" + port + "/?user=root&password=taosdata";
Properties properties = new Properties();
// Use an efficient writing mode
properties.setProperty(TSDBDriver.PROPERTY_KEY_ASYNC_WRITE, "stmt");
// The maximum number of rows to be batched in a single write request
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_SIZE_BY_ROW, "10000");
// The maximum number of rows to be cached in the queue (for each backend write
// thread)
properties.setProperty(TSDBDriver.PROPERTY_KEY_CACHE_SIZE_BY_ROW, "100000");
// Number of backend write threads
properties.setProperty(TSDBDriver.PROPERTY_KEY_BACKEND_WRITE_THREAD_NUM, "5");
// Enable this option to automatically reconnect when the connection is broken
properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
// The maximum time to wait for a write request to be processed by the server in
// milliseconds
properties.setProperty(TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT, "5000");
// Enable this option to copy data when modifying binary data after the
// `addBatch` method is called
properties.setProperty(TSDBDriver.PROPERTY_KEY_COPY_DATA, "false");
// Enable this option to check the length of the sub-table name and the length
// of variable-length data types
properties.setProperty(TSDBDriver.PROPERTY_KEY_STRICT_CHECK, "false");

try (Connection conn = DriverManager.getConnection(url, properties)) {
init(conn);

// If you are certain that the child table exists, you can avoid binding the tag
// column to improve performance.
String sql = "INSERT INTO power.meters (tbname, groupid, location, ts, current, voltage, phase) VALUES (?,?,?,?,?,?,?)";

try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
long current = System.currentTimeMillis();

int rows = 0;
for (int j = 0; j < NUM_OF_ROW; j++) {
// To simulate real-world scenarios, we adopt the approach of writing a batch of
// sub-tables, with one record per sub-table.
for (int i = 1; i <= NUM_OF_SUB_TABLE; i++) {
pstmt.setString(1, "d_" + i);

pstmt.setInt(2, i);
pstmt.setString(3, "location_" + i);

pstmt.setTimestamp(4, new Timestamp(current + j));
pstmt.setFloat(5, random.nextFloat() * 30);
pstmt.setInt(6, random.nextInt(300));
pstmt.setFloat(7, random.nextFloat());

// when the queue of backend cached data reaches the maximum size, this method
// will be blocked
pstmt.addBatch();
rows++;
}

pstmt.executeBatch();

if (rows % 50000 == 0) {
// The semantics of executeUpdate in efficient writing mode is to synchronously
// retrieve the number of rows written between the previous call and the current
// one.
int affectedRows = pstmt.executeUpdate();
Assert.equals(50000, affectedRows);
}
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using efficient writing, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}

查看源码

程序清单

类名功能说明
FastWriteExample主程序,完成命令行参数解析,线程池创建,以及等待任务完成功能
WorkTask从模拟源中读取数据,调用 JDBC 标准接口写入
MockDataSource模拟生成一定数量 meters 子表的数据
DataBaseMonitor统计写入速度,并每隔 10 秒把当前写入速度打印到控制台
CreateSubTableTask根据子表范围创建子表,供主程序调用
Meters提供了 meters 表单条数据的序列化和反序列化,供发送消息给 Kafka 和 从 Kafka 接收消息使用
ProducerTask生产者,向 Kafka 发送消息
ConsumerTask消费者,从 Kafka 接收消息,调用 JDBC 高效写入接口写入 TDengine,并按进度提交 offset
Util提供一些基础功能,包括创建连接,创建 Kafka topic,统计写入条数等

以下是各类的完整代码和更详细的功能说明。

FastWriteExample

主程序命令行参数介绍:

   -b,--batchSizeByRow <arg>             指定高效写入的 batchSizeByRow 参数,默认 1000  
-c,--cacheSizeByRow <arg> 指定高效写入的 cacheSizeByRow 参数,默认 10000
-d,--dbName <arg> 指定数据库名, 默认 `test`
--help 打印帮助信息
-K,--useKafka 使用 Kafka,采用创建生产者发送消息,消费者接收消息写入 TDengine 方式。否则采用工作线程订阅模拟器生成数据写入 TDengine 方式
-r,--readThreadCount <arg> 指定工作线程数,默认 5,当 Kafka 模式,此参数同时决定生产者和消费者线程数
-R,--rowsPerSubTable <arg> 指定每子表写入行数,默认 100
-s,--subTableNum <arg> 指定子表总数,默认 1000000
-w,--writeThreadPerReadThread <arg> 指定每工作线程对应写入线程数,默认 5

JDBC URL 和 Kafka 集群地址配置:

  1. JDBC URL 通过环境变量配置,例如:export TDENGINE_JDBC_URL="jdbc:TAOS-WS://localhost:6041?user=root&password=taosdata"
  2. Kafka 集群地址通过环境变量配置,例如: KAFKA_BOOTSTRAP_SERVERS=localhost:9092

使用方式:

1. 采用模拟数据写入方式:java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 1000
2. 采用 Kafka 订阅写入方式:java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 100 -K

主程序负责:

  1. 解析命令行参数
  2. 创建子表
  3. 创建工作线程或 Kafka 生产者,消费者
  4. 统计写入速度
  5. 等待写入结束,释放资源
package com.taos.example.highvolume;

import org.apache.commons.cli.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;


public class FastWriteExample {
static final Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
static ThreadPoolExecutor writerThreads;
static ThreadPoolExecutor producerThreads;
static final ThreadPoolExecutor statThread = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
private static final List<Stoppable> allTasks = new ArrayList<>();

private static int readThreadCount = 5;
private static int writeThreadPerReadThread = 5;
private static int batchSizeByRow = 1000;
private static int cacheSizeByRow = 10000;
private static int subTableNum = 1000000;
private static int rowsPerSubTable = 100;
private static String dbName = "test";


public static void forceStopAll() {
logger.info("shutting down");

for (Stoppable task : allTasks) {
task.stop();
}

if (producerThreads != null) {
producerThreads.shutdown();
}

if (writerThreads != null) {
writerThreads.shutdown();
}

statThread.shutdown();
}

private static void createSubTables(){
writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-CreateSubTable-thread-"));

int range = (subTableNum + readThreadCount - 1) / readThreadCount;

for (int i = 0; i < readThreadCount; i++) {
int startIndex = i * range;
int endIndex;
if (i == readThreadCount - 1) {
endIndex = subTableNum - 1;
} else {
endIndex = startIndex + range - 1;
}

logger.debug("create sub table task {} {} {}", i, startIndex, endIndex);

CreateSubTableTask createSubTableTask = new CreateSubTableTask(i,
startIndex,
endIndex,
dbName);
writerThreads.submit(createSubTableTask);
}

logger.info("create sub table task started.");

while (writerThreads.getActiveCount() != 0) {
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
logger.info("create sub table task finished.");

}

public static void startStatTask() throws SQLException {
StatTask statTask = new StatTask(dbName, subTableNum);
allTasks.add(statTask);
statThread.submit(statTask);
}
public static ThreadFactory getNamedThreadFactory(String namePrefix) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
}
};
}

private static void invokeKafkaDemo() throws SQLException {
producerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-kafka-producer-thread-"));
writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-kafka-consumer-thread-"));

int range = (subTableNum + readThreadCount - 1) / readThreadCount;

for (int i = 0; i < readThreadCount; i++) {
int startIndex = i * range;
int endIndex;
if (i == readThreadCount - 1) {
endIndex = subTableNum - 1;
} else {
endIndex = startIndex + range - 1;
}

ProducerTask producerTask = new ProducerTask(i,
rowsPerSubTable,
startIndex,
endIndex);
allTasks.add(producerTask);
producerThreads.submit(producerTask);

ConsumerTask consumerTask = new ConsumerTask(i,
writeThreadPerReadThread,
batchSizeByRow,
cacheSizeByRow,
dbName);
allTasks.add(consumerTask);
writerThreads.submit(consumerTask);
}

startStatTask();
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::forceStopAll));

while (writerThreads.getActiveCount() != 0) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
private static void invokeMockDataDemo() throws SQLException {
ThreadFactory namedThreadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "FW-work-thread-";

@Override
public Thread newThread(Runnable r) {
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
}
};

writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, namedThreadFactory);

int range = (subTableNum + readThreadCount - 1) / readThreadCount;

for (int i = 0; i < readThreadCount; i++) {
int startIndex = i * range;
int endIndex;
if (i == readThreadCount - 1) {
endIndex = subTableNum - 1;
} else {
endIndex = startIndex + range - 1;
}

WorkTask task = new WorkTask(i,
writeThreadPerReadThread,
batchSizeByRow,
cacheSizeByRow,
rowsPerSubTable,
startIndex,
endIndex,
dbName);
allTasks.add(task);
writerThreads.submit(task);
}

startStatTask();
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::forceStopAll));

while (writerThreads.getActiveCount() != 0) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}

// print help
private static void printHelp(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("java -jar highVolume.jar", options);
System.out.println();
}

public static void main(String[] args) throws SQLException, InterruptedException {
Options options = new Options();

Option readThdcountOption = new Option("r", "readThreadCount", true, "Specify the readThreadCount, default is 5");
readThdcountOption.setRequired(false);
options.addOption(readThdcountOption);

Option writeThdcountOption = new Option("w", "writeThreadPerReadThread", true, "Specify the writeThreadPerReadThread, default is 5");
writeThdcountOption.setRequired(false);
options.addOption(writeThdcountOption);

Option batchSizeOption = new Option("b", "batchSizeByRow", true, "Specify the batchSizeByRow, default is 1000");
batchSizeOption.setRequired(false);
options.addOption(batchSizeOption);

Option cacheSizeOption = new Option("c", "cacheSizeByRow", true, "Specify the cacheSizeByRow, default is 10000");
cacheSizeOption.setRequired(false);
options.addOption(cacheSizeOption);

Option subTablesOption = new Option("s", "subTableNum", true, "Specify the subTableNum, default is 1000000");
subTablesOption.setRequired(false);
options.addOption(subTablesOption);

Option rowsPerTableOption = new Option("R", "rowsPerSubTable", true, "Specify the rowsPerSubTable, default is 100");
rowsPerTableOption.setRequired(false);
options.addOption(rowsPerTableOption);

Option dbNameOption = new Option("d", "dbName", true, "Specify the database name, default is test");
dbNameOption.setRequired(false);
options.addOption(dbNameOption);

Option kafkaOption = new Option("K", "useKafka", false, "use kafka demo to test");
kafkaOption.setRequired(false);
options.addOption(kafkaOption);


Option helpOption = new Option(null, "help", false, "print help information");
helpOption.setRequired(false);
options.addOption(helpOption);

CommandLineParser parser = new DefaultParser();
CommandLine cmd;

try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
printHelp(options);
System.exit(1);
return;
}

if (cmd.hasOption("help")) {
printHelp(options);
return;
}

if (cmd.getOptionValue("readThreadCount") != null) {
readThreadCount = Integer.parseInt(cmd.getOptionValue("readThreadCount"));
if (readThreadCount <= 0){
logger.error("readThreadCount must be greater than 0");
return;
}
}

if (cmd.getOptionValue("writeThreadPerReadThread") != null) {
writeThreadPerReadThread = Integer.parseInt(cmd.getOptionValue("writeThreadPerReadThread"));
if (writeThreadPerReadThread <= 0){
logger.error("writeThreadPerReadThread must be greater than 0");
return;
}
}

if (cmd.getOptionValue("batchSizeByRow") != null) {
batchSizeByRow = Integer.parseInt(cmd.getOptionValue("batchSizeByRow"));
if (batchSizeByRow <= 0){
logger.error("batchSizeByRow must be greater than 0");
return;
}
}

if (cmd.getOptionValue("cacheSizeByRow") != null) {
cacheSizeByRow = Integer.parseInt(cmd.getOptionValue("cacheSizeByRow"));
if (cacheSizeByRow <= 0){
logger.error("cacheSizeByRow must be greater than 0");
return;
}
}

if (cmd.getOptionValue("subTableNum") != null) {
subTableNum = Integer.parseInt(cmd.getOptionValue("subTableNum"));
if (subTableNum <= 0){
logger.error("subTableNum must be greater than 0");
return;
}
}

if (cmd.getOptionValue("rowsPerSubTable") != null) {
rowsPerSubTable = Integer.parseInt(cmd.getOptionValue("rowsPerSubTable"));
if (rowsPerSubTable <= 0){
logger.error("rowsPerSubTable must be greater than 0");
return;
}
}

if (cmd.getOptionValue("dbName") != null) {
dbName = cmd.getOptionValue("dbName");
}

logger.info("readThreadCount={}, writeThreadPerReadThread={} batchSizeByRow={} cacheSizeByRow={}, subTableNum={}, rowsPerSubTable={}",
readThreadCount, writeThreadPerReadThread, batchSizeByRow, cacheSizeByRow, subTableNum, rowsPerSubTable);

logger.info("create database begin.");
Util.prepareDatabase(dbName);

logger.info("create database end.");

logger.info("create sub tables start.");
createSubTables();
logger.info("create sub tables end.");


if (cmd.hasOption("K")) {
Util.createKafkaTopic();
// use kafka demo
invokeKafkaDemo();

} else {
// use mock data source demo
invokeMockDataDemo();
}

}
}

查看源码

WorkTask

工作线程负责从模拟数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成某个子表区间的数据。不同的模拟数据源生成不同表的数据。
工作线程采用阻塞的方式调用 JDBC 标准接口 addBatch。也就是说,一旦对应高效写入后端队列满了,写操作就会阻塞。

package com.taos.example.highvolume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Iterator;

class WorkTask implements Runnable, Stoppable {
private static final Logger logger = LoggerFactory.getLogger(WorkTask.class);
private final int taskId;
private final int writeThreadCount;
private final int batchSizeByRow;
private final int cacheSizeByRow;
private final int rowsPerTable;
private final int subTableStartIndex;
private final int subTableEndIndex;
private final String dbName;
private volatile boolean active = true;
public WorkTask(int taskId,
int writeThradCount,
int batchSizeByRow,
int cacheSizeByRow,
int rowsPerTable,
int subTableStartIndex,
int subTableEndIndex,
String dbName) {
this.taskId = taskId;
this.writeThreadCount = writeThradCount;
this.batchSizeByRow = batchSizeByRow;
this.cacheSizeByRow = cacheSizeByRow;
this.rowsPerTable = rowsPerTable;
this.subTableStartIndex = subTableStartIndex; // for this task, the start index of sub table
this.subTableEndIndex = subTableEndIndex; // for this task, the end index of sub table
this.dbName = dbName;
}

@Override
public void run() {
logger.info("task {} started", taskId);
Iterator<Meters> it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable);
try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount);
PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) {
long i = 0L;
while (it.hasNext() && active) {
i++;
Meters meters = it.next();
pstmt.setString(1, meters.getTableName());
pstmt.setTimestamp(2, meters.getTs());
pstmt.setFloat(3, meters.getCurrent());
pstmt.setInt(4, meters.getVoltage());
pstmt.setFloat(5, meters.getPhase());
pstmt.addBatch();

if (i % batchSizeByRow == 0) {
pstmt.executeBatch();
}

if (i % (10L * batchSizeByRow) == 0){
pstmt.executeUpdate();
}
}
} catch (Exception e) {
logger.error("Work Task {} Error", taskId, e);
}
logger.info("task {} stopped", taskId);
}

public void stop() {
logger.info("task {} stopping", taskId);
this.active = false;
}
}

查看源码

MockDataSource

模拟数据生成器,生成一定子表范围的数据。为了模拟真实情况,采用轮流每个子表一条数据的生成方式。

package com.taos.example.highvolume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

/**
* Generate test data
*/
class MockDataSource implements Iterator<Meters> {
private final static Logger logger = LoggerFactory.getLogger(MockDataSource.class);

private final int tableStartIndex;
private final int tableEndIndex;
private final long maxRowsPerTable;

long currentMs = System.currentTimeMillis();
private int index = 0;

// mock values

public MockDataSource(int tableStartIndex, int tableEndIndex, int maxRowsPerTable) {
this.tableStartIndex = tableStartIndex;
this.tableEndIndex = tableEndIndex;
this.maxRowsPerTable = maxRowsPerTable;
}

@Override
public boolean hasNext() {
return index < (tableEndIndex - tableStartIndex + 1) * maxRowsPerTable;
}

@Override
public Meters next() {
// use interlace rows to simulate the data distribution in real world
if (index % (tableEndIndex - tableStartIndex + 1) == 0) {
currentMs += 1000;
}

long currentTbId = index % (tableEndIndex - tableStartIndex + 1) + tableStartIndex;

Meters meters = new Meters();

meters.setTableName(Util.getTableNamePrefix() + currentTbId);
meters.setTs(new java.sql.Timestamp(currentMs));
meters.setCurrent((float) (Math.random() * 100));
meters.setVoltage(ThreadLocalRandom.current().nextInt());
meters.setPhase((float) (Math.random() * 100));

index ++;
return meters;
}
}

查看源码

CreateSubTableTask

根据子表范围创建子表,采用批量拼 sql 创建方式。

package com.taos.example.highvolume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

class CreateSubTableTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(CreateSubTableTask.class);
private final int taskId;
private final int subTableStartIndex;
private final int subTableEndIndex;
private final String dbName;

public CreateSubTableTask(int taskId,
int subTableStartIndex,
int subTableEndIndex,
String dbName) {
this.taskId = taskId;
this.subTableStartIndex = subTableStartIndex;
this.subTableEndIndex = subTableEndIndex;
this.dbName = dbName;
}

@Override
public void run() {
try (Connection connection = Util.getConnection();
Statement statement = connection.createStatement()){
statement.execute("use " + dbName);
StringBuilder sql = new StringBuilder();
sql.append("create table");
int i = 0;
for (int tableNum = subTableStartIndex; tableNum <= subTableEndIndex; tableNum++) {
sql.append(" if not exists " + Util.getTableNamePrefix() + tableNum + " using meters" + " tags(" + tableNum + ", " + "\"location_" + tableNum + "\"" + ")");

if (i < 1000) {
i++;
} else {
statement.execute(sql.toString());
sql = new StringBuilder();
sql.append("create table");
i = 0;
}
}
if (sql.length() > "create table".length()) {
statement.execute(sql.toString());
}
} catch (SQLException e) {
logger.error("task id {}, failed to create sub table", taskId, e);
}
}


}

查看源码

Meters

数据模型类,提供了发送到 Kafka 的序列化和反序列化方法。

package com.taos.example.highvolume;

import java.sql.Timestamp;

public class Meters {
String tableName;
Timestamp ts;
float current;
int voltage;
float phase;

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}

public float getCurrent() {
return current;
}

public void setCurrent(float current) {
this.current = current;
}

public int getVoltage() {
return voltage;
}

public void setVoltage(int voltage) {
this.voltage = voltage;
}

public float getPhase() {
return phase;
}

public void setPhase(float phase) {
this.phase = phase;
}

@Override
public String toString() {
return tableName + "," +
ts.toString() + "," +
current + "," +
voltage + "," +
phase;
}

public static Meters fromString(String str) {
String[] parts = str.split(",");
if (parts.length != 5) {
throw new IllegalArgumentException("Invalid input format");
}
Meters meters = new Meters();
meters.setTableName(parts[0]);
meters.setTs(Timestamp.valueOf(parts[1]));
meters.setCurrent(Float.parseFloat(parts[2]));
meters.setVoltage(Integer.parseInt(parts[3]));
meters.setPhase(Float.parseFloat(parts[4]));
return meters;
}

}

查看源码

ProducerTask

消息生产者,采用与 JDBC 高效写入不同的 Hash 方式,将模拟数据生成器生成的数据,写入所有分区。

package com.taos.example.highvolume;

import com.taosdata.jdbc.utils.ReqId;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Properties;

class ProducerTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(ProducerTask.class);
private final int taskId;
private final int subTableStartIndex;
private final int subTableEndIndex;
private final int rowsPerTable;
private volatile boolean active = true;
public ProducerTask(int taskId,
int rowsPerTable,
int subTableStartIndex,
int subTableEndIndex) {
this.taskId = taskId;
this.subTableStartIndex = subTableStartIndex;
this.subTableEndIndex = subTableEndIndex;
this.rowsPerTable = rowsPerTable;
}

@Override
public void run() {
logger.info("kafak producer {}, started", taskId);
Iterator<Meters> it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable);

Properties props = new Properties();
props.put("bootstrap.servers", Util.getKafkaBootstrapServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", 1024 * 1024);
props.put("linger.ms", 500);

// create a Kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

try {
while (it.hasNext() && active) {
Meters meters = it.next();
String key = meters.getTableName();
String value = meters.toString();
// to avoid the data of the sub-table out of order. we use the partition key to ensure the data of the same sub-table is sent to the same partition.
// Because efficient writing use String hashcode,here we use another hash algorithm to calculate the partition key.
long hashCode = Math.abs(ReqId.murmurHash32(key.getBytes(), 0));
ProducerRecord<String, String> metersRecord = new ProducerRecord<>(Util.getKafkaTopic(), (int)(hashCode % Util.getPartitionCount()), key, value);
producer.send(metersRecord);
}
} catch (Exception e) {
logger.error("task id {}, send message error: ", taskId, e);
}
finally {
producer.close();
}
logger.info("kafka producer {} stopped", taskId);
}

public void stop() {
logger.info("kafka producer {} stopping", taskId);
this.active = false;
}
}

查看源码

ConsumerTask

消息消费者,从 Kafka 接收消息,写入 TDengine。

package com.taos.example.highvolume;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

class ConsumerTask implements Runnable, Stoppable {
private static final Logger logger = LoggerFactory.getLogger(ConsumerTask.class);
private final int taskId;
private final int writeThreadCount;
private final int batchSizeByRow;
private final int cacheSizeByRow;
private final String dbName;
private volatile boolean active = true;

public ConsumerTask(int taskId,
int writeThreadCount,
int batchSizeByRow,
int cacheSizeByRow,
String dbName) {
this.taskId = taskId;
this.writeThreadCount = writeThreadCount;
this.batchSizeByRow = batchSizeByRow;
this.cacheSizeByRow = cacheSizeByRow;
this.dbName = dbName;
}

@Override
public void run() {
logger.info("Consumer Task {} started", taskId);

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Util.getKafkaBootstrapServers());

props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(batchSizeByRow));
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "3000");

props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, String.valueOf(2 * 1024 * 1024));

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "15000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

List<String> topics = Collections.singletonList(Util.getKafkaTopic());

try {
consumer.subscribe(topics);
} catch (Exception e) {
logger.error("Consumer Task {} Error", taskId, e);
return;
}

try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount);
PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) {
long i = 0L;
long lastTimePolled = System.currentTimeMillis();
while (active) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> metersRecord : records) {
i++;
Meters meters = Meters.fromString(metersRecord.value());
pstmt.setString(1, meters.getTableName());
pstmt.setTimestamp(2, meters.getTs());
pstmt.setFloat(3, meters.getCurrent());
pstmt.setInt(4, meters.getVoltage());
pstmt.setFloat(5, meters.getPhase());
pstmt.addBatch();

if (i % batchSizeByRow == 0) {
pstmt.executeBatch();
}
if (i % (10L * batchSizeByRow) == 0){
pstmt.executeUpdate();
consumer.commitSync();
}
}

if (!records.isEmpty()){
lastTimePolled = System.currentTimeMillis();
} else {
if (System.currentTimeMillis() - lastTimePolled > 1000 * 60) {
lastTimePolled = System.currentTimeMillis();
logger.error("Consumer Task {} has been idle for 10 seconds, stopping", taskId);
}
}
}
} catch (Exception e) {
logger.error("Consumer Task {} Error", taskId, e);
} finally {
consumer.close();
}

logger.info("Consumer Task {} stopped", taskId);
}

public void stop() {
logger.info("consumer task {} stopping", taskId);
this.active = false;
}
}

查看源码

StatTask

提供定时统计写入条数功能

package com.taos.example.highvolume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

class StatTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(StatTask.class);
private final int subTableNum;
private final String dbName;
private final Connection conn;
private final Statement stmt;
private volatile boolean active = true;


public StatTask(String dbName,
int subTableNum) throws SQLException {
this.dbName = dbName;
this.subTableNum = subTableNum;
this.conn = Util.getConnection();
this.stmt = conn.createStatement();
}

@Override
public void run() {
long lastCount = 0;

while (active) {
try {
Thread.sleep(10000);

long count = Util.count(stmt, dbName);
logger.info("numberOfTable={} count={} speed={}", subTableNum, count, (count - lastCount) / 10);
lastCount = count;
} catch (InterruptedException e) {
logger.error("interrupted", e);
break;
} catch (SQLException e) {
logger.error("execute sql error: ", e);
break;
}
}

try {
stmt.close();
conn.close();
} catch (SQLException e) {
logger.error("close connection error: ", e);
}
}

public void stop() {
active = false;
}


}

查看源码

Util

工具类,提供连接创建,数据库创建和 topic 创建等功能。

package com.taos.example.highvolume;

import com.taosdata.jdbc.TSDBDriver;

import java.sql.*;
import java.util.Properties;

import org.apache.kafka.clients.admin.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutionException;

public class Util {
private final static Logger logger = LoggerFactory.getLogger(Util.class);

public static String getTableNamePrefix() {
return "d_";
}

public static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == "") {
jdbcURL = "jdbc:TAOS-WS://localhost:6041/?user=root&password=taosdata";
}
return DriverManager.getConnection(jdbcURL);
}

public static Connection getConnection(int batchSize, int cacheSize, int writeThreadNum) throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == "") {
jdbcURL = "jdbc:TAOS-WS://localhost:6041/?user=root&password=taosdata";
}
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_ASYNC_WRITE, "stmt");
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_SIZE_BY_ROW, String.valueOf(batchSize));
properties.setProperty(TSDBDriver.PROPERTY_KEY_CACHE_SIZE_BY_ROW, String.valueOf(cacheSize));
properties.setProperty(TSDBDriver.PROPERTY_KEY_BACKEND_WRITE_THREAD_NUM, String.valueOf(writeThreadNum));
properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
return DriverManager.getConnection(jdbcURL, properties);
}

public static void prepareDatabase(String dbName) throws SQLException {
try (Connection conn = Util.getConnection();
Statement stmt = conn.createStatement()) {
stmt.execute("DROP DATABASE IF EXISTS " + dbName);
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName + " vgroups 20");
stmt.execute("use " + dbName);
stmt.execute("CREATE STABLE " + dbName
+ ".meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(64))");
}
}

public static long count(Statement stmt, String dbName) throws SQLException {
try (ResultSet result = stmt.executeQuery("SELECT count(*) from " + dbName + ".meters")) {
result.next();
return result.getLong(1);
}
}

public static String getKafkaBootstrapServers() {
String kafkaBootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
if (kafkaBootstrapServers == null || kafkaBootstrapServers == "") {
kafkaBootstrapServers = "localhost:9092";
}

return kafkaBootstrapServers;
}

public static String getKafkaTopic() {
return "test-meters-topic";
}

public static void createKafkaTopic() {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());

try (AdminClient adminClient = AdminClient.create(config)) {
String topicName = getKafkaTopic();
int numPartitions = getPartitionCount();
short replicationFactor = 1;

ListTopicsResult topics = adminClient.listTopics();
Set<String> existingTopics = topics.names().get();

if (!existingTopics.contains(topicName)) {
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
createTopicsResult.all().get();
logger.info("Topic " + topicName + " created successfully.");
}

} catch (InterruptedException | ExecutionException e) {
logger.error("Failed to delete/create topic: " + e.getMessage());
throw new RuntimeException(e);
}
}

public static int getPartitionCount() {
return 5;
}

}

查看源码

执行步骤

执行 Java 示例程序

本地集成开发环境执行示例程序

  1. clone TDengine 仓库
    git clone git@github.com:taosdata/TDengine.git --depth 1
  2. 用集成开发环境打开 TDengine/docs/examples/JDBC/highvolume 目录。
  3. 在开发环境中配置环境变量 TDENGINE_JDBC_URL。如果已配置了全局的环境变量 TDENGINE_JDBC_URL 可跳过这一步。
  4. 如果要运行 Kafka 示例,需要设置 Kafka 集群地址的环境变量 KAFKA_BOOTSTRAP_SERVERS
  5. 指定命令行参数,如 -r 3 -w 3 -b 100 -c 1000 -s 1000 -R 100
  6. 运行类 com.taos.example.highvolume.FastWriteExample

远程服务器上执行示例程序

若要在服务器上执行示例程序,可按照下面的步骤操作:

  1. 打包示例代码。在目录 TDengine/docs/examples/JDBC/highvolume 下执行下面命令来生成 highVolume.jar

    mvn package
  2. 复制程序到服务器指定目录:

    scp -r .\target\highVolume.jar <user>@<host>:~/dest-path
  3. 配置环境变量。 编辑 ~/.bash_profile~/.bashrc 添加如下内容例如:

    export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"

    以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。
    如果想使用 Kafka 订阅模式,请再增加 Kafaka 集群环境变量配置:

    export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
  4. 用 Java 命令启动示例程序,命令模板(如果用 Kafaka 订阅模式,最后可以加上 -K):

    java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 1000
  5. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 CTRL + C 结束程序。 下面是一次实际运行的日志输出,机器配置 40 核 + 256G + 固态硬盘。

    ---------------$ java -jar highVolume.jar -r 2 -w 10 -b 10000 -c 100000 -s 1000000 -R 100
    [INFO ] 2025-03-24 18:03:17.980 com.taos.example.highvolume.FastWriteExample main 309 main readThreadCount=2, writeThreadPerReadThread=10 batchSizeByRow=10000 cacheSizeByRow=100000, subTableNum=1000000, rowsPerSubTable=100
    [INFO ] 2025-03-24 18:03:17.983 com.taos.example.highvolume.FastWriteExample main 312 main create database begin.
    [INFO ] 2025-03-24 18:03:34.499 com.taos.example.highvolume.FastWriteExample main 315 main create database end.
    [INFO ] 2025-03-24 18:03:34.500 com.taos.example.highvolume.FastWriteExample main 317 main create sub tables start.
    [INFO ] 2025-03-24 18:03:34.502 com.taos.example.highvolume.FastWriteExample createSubTables 73 main create sub table task started.
    [INFO ] 2025-03-24 18:03:55.777 com.taos.example.highvolume.FastWriteExample createSubTables 82 main create sub table task finished.
    [INFO ] 2025-03-24 18:03:55.778 com.taos.example.highvolume.FastWriteExample main 319 main create sub tables end.
    [INFO ] 2025-03-24 18:03:55.781 com.taos.example.highvolume.WorkTask run 41 FW-work-thread-2 started
    [INFO ] 2025-03-24 18:03:55.781 com.taos.example.highvolume.WorkTask run 41 FW-work-thread-1 started
    [INFO ] 2025-03-24 18:04:06.580 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=12235906 speed=1223590
    [INFO ] 2025-03-24 18:04:17.531 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=31185614 speed=1894970
    [INFO ] 2025-03-24 18:04:28.490 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=51464904 speed=2027929
    [INFO ] 2025-03-24 18:04:40.851 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=71498113 speed=2003320
    [INFO ] 2025-03-24 18:04:51.948 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=91242103 speed=1974399