Skip to main content

SQL 写入

SQL 写入简介

应用通过连接器执行 INSERT 语句来插入数据,用户还可以通过 TDengine CLI,手动输入 INSERT 语句插入数据。

一次写入一条

下面这条 INSERT 就将一条记录写入到表 d1001 中:

INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31);

这里的ts1为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

一次写入多条

TDengine 支持一次写入多条记录,比如下面这条命令就将两条记录写入到表 d1001 中:

INSERT INTO d1001 VALUES (ts1, 10.2, 220, 0.23) (ts2, 10.3, 218, 0.25);

这里的ts1ts2为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

一次写入多表

TDengine 也支持一次向多个表写入数据,比如下面这条命令就向 d1001 写入两条记录,向 d1002 写入一条记录:

INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31) (ts2, 12.6, 218, 0.33) d1002 VALUES (ts3, 12.3, 221, 0.31);

这里的ts1ts2ts3为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

详细的 SQL INSERT 语法规则参考 TDengine SQL 的数据写入

info
  • 要提高写入效率,需要批量写入。一般来说一批写入的记录条数越多,插入效率就越高。但一条记录不能超过 48KB,一条 SQL 语句总长度不能超过 1MB。
  • TDengine 支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开多个同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程频繁切换,会带来额外开销,合适的线程数量与服务端的处理能力,服务端的具体配置,数据库的参数,数据定义的 Schema,写入数据的 Batch Size 等很多因素相关。一般来说,服务端和客户端处理能力越强,所能支持的并发写入的线程可以越多;数据库配置时的 vgroups 参数值越多(但仍然要在服务端的处理能力以内)则所能支持的并发写入越多;数据定义的 Schema 越简单,所能支持的并发写入越多。
warning
  • 对同一张表,如果新插入记录的时间戳已经存在,则指定了新值的列会用新值覆盖旧值,而没有指定新值的列则不受影响。
  • 写入的数据的时间戳必须大于当前时间减去数据库配置参数 KEEP 的时间。如果 KEEP 配置为 3650 天,那么无法写入比 3650 天还早的数据。写入数据的时间戳也不能大于当前时间加配置参数 DURATION。如果 DURATION 为 2,那么无法写入比当前时间还晚 2 天的数据。

示例程序

普通 SQL 写入

package com.taos.example;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;


public class RestInsertExample {
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}

private static List<String> getRawData() {
return Arrays.asList(
"d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,'California.SanFrancisco',2",
"d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,'California.SanFrancisco',2",
"d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,'California.SanFrancisco',2",
"d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,'California.SanFrancisco',3",
"d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,'California.LosAngeles',2",
"d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,'California.LosAngeles',2",
"d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,'California.LosAngeles',3",
"d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,'California.LosAngeles',3"
);
}


/**
* The generated SQL is:
* INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000)
* power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000)
* power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000)
* power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000)
* power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000)
* power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000)
* power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000)
* power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000)
*/
private static String getSQL() {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (String line : getRawData()) {
String[] ps = line.split(",");
sb.append("power." + ps[0]).append(" USING power.meters TAGS(")
.append(ps[5]).append(", ") // tag: location
.append(ps[6]) // tag: groupId
.append(") VALUES(")
.append('\'').append(ps[1]).append('\'').append(",") // ts
.append(ps[2]).append(",") // current
.append(ps[3]).append(",") // voltage
.append(ps[4]).append(") "); // phase
}
return sb.toString();
}

public static void insertData() throws SQLException {
try (Connection conn = getConnection()) {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE power KEEP 3650");
stmt.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
"TAGS (location BINARY(64), groupId INT)");
String sql = getSQL();
int rowCount = stmt.executeUpdate(sql);
System.out.println("rowCount=" + rowCount); // rowCount=8
}
}
}

public static void main(String[] args) throws SQLException {
insertData();
}
}

查看源码

note
  1. 无论 RESTful 方式建立连接还是本地驱动方式建立连接,以上示例代码都能正常工作。
  2. 唯一需要注意的是:由于 RESTful 接口无状态, 不能使用 USE db; 语句来切换数据库, 所以在上面示例中使用了dbName.tbName指定表名。

参数绑定写入

TDengine 也提供了支持参数绑定的 Prepare API,与 MySQL 类似,这些 API 目前也仅支持用问号 ? 来代表待绑定的参数。在通过参数绑定接口写入数据时,就避免了 SQL 语法解析的资源消耗,从而在绝大多数情况下显著提升写入性能。

需要注意的是,只有使用原生连接的连接器,才能使用参数绑定功能。

package com.taos.example;

import com.taosdata.jdbc.TSDBPreparedStatement;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class StmtInsertExample {
private static String datePattern = "yyyy-MM-dd HH:mm:ss.SSS";
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);

private static List<String> getRawData(int size) {
SimpleDateFormat format = new SimpleDateFormat(datePattern);
List<String> result = new ArrayList<>();
long current = System.currentTimeMillis();
Random random = new Random();
for (int i = 0; i < size; i++) {
String time = format.format(current + i);
int id = random.nextInt(10);
result.add("d" + id + "," + time + ",10.30000,219,0.31000,California.SanFrancisco,2");
}
return result.stream()
.sorted(Comparator.comparing(s -> s.split(",")[0])).collect(Collectors.toList());
}

private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}

private static void createTable(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE if not exists power KEEP 3650");
stmt.executeUpdate("use power");
stmt.execute("CREATE STABLE if not exists meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
"TAGS (location BINARY(64), groupId INT)");
}
}

private static void insertData() throws SQLException {
try (Connection conn = getConnection()) {
createTable(conn);
String psql = "INSERT INTO ? USING power.meters TAGS(?, ?) VALUES(?, ?, ?, ?)";
try (TSDBPreparedStatement pst = (TSDBPreparedStatement) conn.prepareStatement(psql)) {
String tableName = null;
ArrayList<Long> ts = new ArrayList<>();
ArrayList<Float> current = new ArrayList<>();
ArrayList<Integer> voltage = new ArrayList<>();
ArrayList<Float> phase = new ArrayList<>();
for (String line : getRawData(100000)) {
String[] ps = line.split(",");
if (tableName == null) {
// bind table name and tags
tableName = "power." + ps[0];
pst.setTableName(ps[0]);
pst.setTagString(0, ps[5]);
pst.setTagInt(1, Integer.valueOf(ps[6]));
} else {
if (!tableName.equals(ps[0])) {
pst.setTimestamp(0, ts);
pst.setFloat(1, current);
pst.setInt(2, voltage);
pst.setFloat(3, phase);
pst.columnDataAddBatch();
pst.columnDataExecuteBatch();

// bind table name and tags
tableName = ps[0];
pst.setTableName(ps[0]);
pst.setTagString(0, ps[5]);
pst.setTagInt(1, Integer.valueOf(ps[6]));
ts.clear();
current.clear();
voltage.clear();
phase.clear();
}
}
// bind values
// ps[1] looks like: 2018-10-03 14:38:05.000
LocalDateTime localDateTime = LocalDateTime.parse(ps[1], formatter);
ts.add(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
current.add(Float.valueOf(ps[2]));
voltage.add(Integer.valueOf(ps[3]));
phase.add(Float.valueOf(ps[4]));
}
pst.setTimestamp(0, ts);
pst.setFloat(1, current);
pst.setInt(2, voltage);
pst.setFloat(3, phase);
pst.columnDataAddBatch();
pst.columnDataExecuteBatch();
}
}
}

public static void main(String[] args) throws SQLException {
insertData();
}
}

查看源码