跳到主要内容

高效写入

本节介绍如何高效地向 TDengine 写入数据。

高效写入原理

客户端程序的角度

从客户端程序的角度来说,高效写入数据要考虑以下几个因素:

  1. 单次写入的数据量。一般来讲,每批次写入的数据量越大越高效(但超过一定阈值其优势会消失)。使用 SQL 写入 TDengine 时,尽量在一条 SQL 中拼接更多数据。目前,TDengine 支持的一条 SQL 的最大长度为 1,048,576(1M)个字符。可通过配置客户端参数 maxSQLLength(默认值为 65480)进行修改。
  2. 并发连接数。一般来讲,同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。
  3. 数据在不同表(或子表)之间的分布,即要写入数据的相邻性。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;
  4. 写入方式。一般来讲:
    • 参数绑定写入比 SQL 写入更高效。因参数绑定方式避免了 SQL 解析。(但增加了 C 接口的调用次数,对于连接器也有性能损耗)。
    • SQL 写入不自动建表比自动建表更高效。因自动建表要频繁检查表是否存在
    • SQL 写入比无模式写入更高效。因无模式写入会自动建表且支持动态更改表结构

客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。

数据源的角度

客户端程序通常需要从数据源读数据再写入 TDengine。从数据源角度来说,以下几种情况需要在读线程和写线程之间增加队列:

  1. 有多个数据源,单个数据源生成数据的速度远小于单线程写入的速度,但数据量整体比较大。此时队列的作用是把多个数据源的数据汇聚到一起,增加单次写入的数据量。
  2. 单个数据源生成数据的速度远大于单线程写入的速度。此时队列的作用是增加写入的并发度。
  3. 单张表的数据分散在多个数据源。此时队列的作用是将同一张表的数据提前汇聚到一起,提高写入时数据的相邻性。

如果写应用的数据源是 Kafka, 写应用本身即 Kafka 的消费者,则可利用 Kafka 的特性实现高效写入。比如:

  1. 将同一张表的数据写到同一个 Topic 的同一个 Partition,增加数据的相邻性
  2. 通过订阅多个 Topic 实现数据汇聚
  3. 通过增加 Consumer 线程数增加写入的并发度
  4. 通过增加每次 fetch 的最大数据量来增加单次写入的最大数据量

服务器配置的角度

从服务器配置的角度来说,也有很多优化写入性能的方法。

如果总表数不多(远小于核数乘以1000), 且无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那么很可能是因为表在各个 vgroup 分布不均。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么所有的表都会分布在 1 个 vgroup 上。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则可将表分布至 10 个 vgroup。(假设 maxVgroupsPerDb 大于等于 10)。

如果总表数比较大(比如大于500万),适当增加 maxVgroupsPerDb 也能显著提高建表的速度。maxVgroupsPerDb 默认值为 0, 自动配置为 CPU 的核数。 如果表的数量巨大,也建议调节 maxTablesPerVnode 参数,以免超过单个 vnode 建表的上限。

更多调优参数,请参考性能优化配置参考部分。

高效写入示例

场景设计

下面的示例程序展示了如何高效写入数据,场景设计如下:

  • TDengine 客户端程序从其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源
  • 单个连接向 TDengine 写入的速度无法与读数据的速度相匹配,因此客户端程序启动多个线程,每个线程都建立了与 TDengine 的连接,每个线程都有一个独占的固定大小的消息队列
  • 客户端程序将接收到的数据根据所属的表名(或子表名)HASH 到不同的线程,即写入该线程所对应的消息队列,以此确保属于某个表(或子表)的数据一定会被一个固定的线程处理
  • 各个子线程在将所关联的消息队列中的数据读空后或者读取数据量达到一个预定的阈值后将该批数据写入 TDengine,并继续处理后面接收到的数据

TDengine 高效写入示例场景的线程模型

示例代码

这一部分是针对以上场景的示例代码。对于其它场景高效写入原理相同,不过代码需要适当修改。

本示例代码假设源数据属于同一张超级表(meters)的不同子表。程序在开始写入数据之前已经在 test 库创建了这个超级表。对于子表,将根据收到的数据,由应用程序自动创建。如果实际场景是多个超级表,只需修改写任务自动建表的代码。

程序清单

类名功能说明
FastWriteExample主程序
ReadTask从模拟源中读取数据,将表名经过 hash 后得到 Queue 的 index,写入对应的 Queue
WriteTask从 Queue 中获取数据,组成一个 Batch,写入 TDengine
MockDataSource模拟生成一定数量 meters 子表的数据
SQLWriterWriteTask 依赖这个类完成 SQL 拼接、自动建表、 SQL 写入、SQL 长度检查
StmtWriter实现参数绑定方式批量写入(暂未完成)
DataBaseMonitor统计写入速度,并每隔 10 秒把当前写入速度打印到控制台

以下是各类的完整代码和更详细的功能说明。

Details

FastWriteExample 主程序负责:

  1. 创建消息队列
  2. 启动写线程
  3. 启动读线程
  4. 每隔 10 秒统计一次写入速度

主程序默认暴露了 4 个参数,每次启动程序都可调节,用于测试和调优:

  1. 读线程个数。默认为 1。
  2. 写线程个数。默认为 3。
  3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。如果总表数较大,建表需要花费较长,开始统计的写入速度可能较慢。
  4. 每批最多写入记录数量。默认为 3000。

队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。 示例程序默认值已经设置地足够大。

package com.taos.example.highvolume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);

final static int taskQueueCapacity = 1000000;
final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>();
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();

public static void stopAll() {
logger.info("shutting down");
readTasks.forEach(task -> task.stop());
writeTasks.forEach(task -> task.stop());
databaseMonitor.close();
}

public static void main(String[] args) throws InterruptedException, SQLException {
int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3;
int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000;

logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}",
readTaskCount, writeTaskCount, tableCount, maxBatchSize);

databaseMonitor.init().prepareDatabase();

// Create task queues, whiting tasks and start writing threads.
for (int i = 0; i < writeTaskCount; ++i) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
taskQueues.add(queue);
WriteTask task = new WriteTask(queue, maxBatchSize);
Thread t = new Thread(task);
t.setName("WriteThread-" + i);
t.start();
}

// create reading tasks and start reading threads
int tableCountPerTask = tableCount / readTaskCount;
for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask);
Thread t = new Thread(task);
t.setName("ReadThread-" + i);
t.start();
}

Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));

long lastCount = 0;
while (true) {
Thread.sleep(10000);
long numberOfTable = databaseMonitor.getTableCount();
long count = databaseMonitor.count();
logger.info("numberOfTable={} count={} speed={}", numberOfTable, count, (count - lastCount) / 10);
lastCount = count;
}
}
}

查看源码

ReadTask

读任务负责从数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成一点数量表的数据。不同的模拟数据源生成不同表的数据。

读任务采用阻塞的方式写消息队列。也就是说,一旦队列满了,写操作就会阻塞。

package com.taos.example.highvolume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;

class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId;
private final List<BlockingQueue<String>> taskQueues;
private final int queueCount;
private final int tableCount;
private boolean active = true;

public ReadTask(int readTaskId, List<BlockingQueue<String>> queues, int tableCount) {
this.taskId = readTaskId;
this.taskQueues = queues;
this.queueCount = queues.size();
this.tableCount = tableCount;
}

/**
* Assign data received to different queues.
* Here we use the suffix number in table name.
* You are expected to define your own rule in practice.
*
* @param line record received
* @return which queue to use
*/
public int getQueueId(String line) {
String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
String suffixNumber = tbName.split("_")[1];
return Integer.parseInt(suffixNumber) % this.queueCount;
}

@Override
public void run() {
logger.info("started");
Iterator<String> it = new MockDataSource("tb" + this.taskId, tableCount);
try {
while (it.hasNext() && active) {
String line = it.next();
int queueId = getQueueId(line);
taskQueues.get(queueId).put(line);
}
} catch (Exception e) {
logger.error("Read Task Error", e);
}
}

public void stop() {
logger.info("stop");
this.active = false;
}
}

查看源码

WriteTask
package com.taos.example.highvolume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;

class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final int maxBatchSize;

// the queue from which this writing task get raw data.
private final BlockingQueue<String> queue;

// A flag indicate whether to continue.
private boolean active = true;

public WriteTask(BlockingQueue<String> taskQueue, int maxBatchSize) {
this.queue = taskQueue;
this.maxBatchSize = maxBatchSize;
}

@Override
public void run() {
logger.info("started");
String line = null; // data getting from the queue just now.
SQLWriter writer = new SQLWriter(maxBatchSize);
try {
writer.init();
while (active) {
line = queue.poll();
if (line != null) {
// parse raw data and buffer the data.
writer.processLine(line);
} else if (writer.hasBufferedValues()) {
// write data immediately if no more data in the queue
writer.flush();
} else {
// sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, .
Thread.sleep(100);
}
}
if (writer.hasBufferedValues()) {
writer.flush();
}
} catch (Exception e) {
String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount());
logger.error(msg, e);
} finally {
writer.close();
}
}

public void stop() {
logger.info("stop");
this.active = false;
}
}

查看源码

MockDataSource
package com.taos.example.highvolume;

import java.util.Iterator;

/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;

// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;

// mock values
String[] location = {"California.LosAngeles", "California.SanDiego", "California.SanJose", "California.Campbell", "California.SanFrancisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};

public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}

@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}

@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID

return sb.toString();
}
}

查看源码

SQLWriter

SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是在 catch 到表不存在异常的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它异常,这里简单地记录当时执行的 SQL 语句到日志中,你也可以记录更多线索到日志,已便排查错误和故障恢复。

package com.taos.example.highvolume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.HashMap;
import java.util.Map;

/**
* A helper class encapsulate the logic of writing using SQL.
* <p>
* The main interfaces are two methods:
* <ol>
* <li>{@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.</li>
* <li>{@link SQLWriter#flush}, which assemble INSERT statement and execute it.</li>
* </ol>
* <p>
* There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
* This ensure that checking table existence is a one-time-only operation.
* </p>
*
* </p>
*/
public class SQLWriter {
final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);

private Connection conn;
private Statement stmt;

/**
* current number of buffered records
*/
private int bufferedCount = 0;
/**
* Maximum number of buffered records.
* Flush action will be triggered if bufferedCount reached this value,
*/
private int maxBatchSize;


/**
* Maximum SQL length.
*/
private int maxSQLLength;

/**
* Map from table name to column values. For example:
* "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
*/
private Map<String, String> tbValues = new HashMap<>();

/**
* Map from table name to tag values in the same order as creating stable.
* Used for creating table.
*/
private Map<String, String> tbTags = new HashMap<>();

public SQLWriter(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}


/**
* Get Database Connection
*
* @return Connection
* @throws SQLException
*/
private static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
return DriverManager.getConnection(jdbcURL);
}

/**
* Create Connection and Statement
*
* @throws SQLException
*/
public void init() throws SQLException {
conn = getConnection();
stmt = conn.createStatement();
stmt.execute("use test");
ResultSet rs = stmt.executeQuery("show variables");
while (rs.next()) {
String configName = rs.getString(1);
if ("maxSQLLength".equals(configName)) {
maxSQLLength = Integer.parseInt(rs.getString(2));
logger.info("maxSQLLength={}", maxSQLLength);
}
}
}

/**
* Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
* Trigger writing when number of buffered records reached maxBachSize.
*
* @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
*/
public void processLine(String line) throws SQLException {
bufferedCount += 1;
int firstComma = line.indexOf(',');
String tbName = line.substring(0, firstComma);
int lastComma = line.lastIndexOf(',');
int secondLastComma = line.lastIndexOf(',', lastComma - 1);
String value = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
if (tbValues.containsKey(tbName)) {
tbValues.put(tbName, tbValues.get(tbName) + value);
} else {
tbValues.put(tbName, value);
}
if (!tbTags.containsKey(tbName)) {
String location = line.substring(secondLastComma + 1, lastComma);
String groupId = line.substring(lastComma + 1);
String tagValues = "('" + location + "'," + groupId + ')';
tbTags.put(tbName, tagValues);
}
if (bufferedCount == maxBatchSize) {
flush();
}
}


/**
* Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
* In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
*/
public void flush() throws SQLException {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (Map.Entry<String, String> entry : tbValues.entrySet()) {
String tableName = entry.getKey();
String values = entry.getValue();
String q = tableName + " values " + values + " ";
if (sb.length() + q.length() > maxSQLLength) {
executeSQL(sb.toString());
logger.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance");
sb = new StringBuilder("INSERT INTO ");
}
sb.append(q);
}
executeSQL(sb.toString());
tbValues.clear();
bufferedCount = 0;
}

private void executeSQL(String sql) throws SQLException {
try {
stmt.executeUpdate(sql);
} catch (SQLException e) {
// convert to error code defined in taoserror.h
int errorCode = e.getErrorCode() & 0xffff;
if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist
createTables();
executeSQL(sql);
} else {
logger.error("Execute SQL: {}", sql);
throw e;
}
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
}

/**
* Create tables in batch using syntax:
* <p>
* CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
* </p>
*/
private void createTables() throws SQLException {
StringBuilder sb = new StringBuilder("CREATE TABLE ");
for (String tbName : tbValues.keySet()) {
String tagValues = tbTags.get(tbName);
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
try {
stmt.executeUpdate(sql);
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
}

public boolean hasBufferedValues() {
return bufferedCount > 0;
}

public int getBufferedCount() {
return bufferedCount;
}

public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
}

查看源码

DataBaseMonitor
package com.taos.example.highvolume;

import java.sql.*;

/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
public class DataBaseMonitor {
private Connection conn;
private Statement stmt;

public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}

public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}

public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}

public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}

/**
* show test.stables;
*
* name | created_time | columns | tags | tables |
* ============================================================================================
* meters | 2022-07-20 08:39:30.902 | 4 | 2 | 620000 |
*/
public Long getTableCount() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("show test.stables");
result.next();
return result.getLong(5);
}
return null;
}
}

查看源码

执行步骤

执行 Java 示例程序

执行程序前需配置环境变量 TDENGINE_JDBC_URL。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置:

TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"

本地集成开发环境执行示例程序

  1. clone TDengine 仓库
    git clone git@github.com:taosdata/TDengine.git --depth 1
  2. 用集成开发环境打开 docs/examples/java 目录。
  3. 在开发环境中配置环境变量 TDENGINE_JDBC_URL。如果已配置了全局的环境变量 TDENGINE_JDBC_URL 可跳过这一步。
  4. 运行类 com.taos.example.highvolume.FastWriteExample

远程服务器上执行示例程序

若要在服务器上执行示例程序,可按照下面的步骤操作:

  1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行:

    mvn package
  2. 远程服务器上创建 examples 目录:

    mkdir -p examples/java
  3. 复制依赖到服务器指定目录:

    • 复制依赖包,只用复制一次
      scp -r .\target\lib <user>@<host>:~/examples/java
    • 复制本程序的 jar 包,每次更新代码都需要复制
      scp -r .\target\javaexample-1.0.jar <user>@<host>:~/examples/java
  4. 配置环境变量。 编辑 ~/.bash_profile~/.bashrc 添加如下内容例如:

    export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"

    以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。

  5. 用 java 命令启动示例程序,命令模板:

    java -classpath lib/*:javaexample-1.0.jar  com.taos.example.highvolume.FastWriteExample <read_thread_count>  <white_thread_count> <total_table_count> <max_batch_size>
  6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 CTRL + C 结束程序。 下面是一次实际运行的日志输出,机器配置 16核 + 64G + 固态硬盘。

    root@vm85$ java -classpath lib/*:javaexample-1.0.jar  com.taos.example.highvolume.FastWriteExample 2 12
    18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000
    18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started
    18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
    18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started
    18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444
    18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521
    18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394
    18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933
    18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696
    18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729
    18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521
    18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788
    18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950