SQL 写入
SQL 写入简介
应用通过连接器执行 INSERT 语句来插入数据,用户还可以通过 TAOS Shell,手动输入 INSERT 语句插入数据。
一次写入一条
下面这条 INSERT 就将一条记录写入到表 d1001 中:
INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31);
一次写入多条
TDengine 支持一次写入多条记录,比如下面这条命令就将两条记录写入到表 d1001 中:
INSERT INTO d1001 VALUES (1538548684000, 10.2, 220, 0.23) (1538548696650, 10.3, 218, 0.25);
一次写入多表
TDengine 也支持一次向多个表写入数据,比如下面这条命令就向 d1001 写入两条记录,向 d1002 写入一条记录:
INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, 218, 0.33) d1002 VALUES (1538548696800, 12.3, 221, 0.31);
详细的 SQL INSERT 语法规则参考 TAOS SQL 的数据写入。
信息
- 要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过 48K,一条 SQL 语句总长度不能超过 1M 。
- TDengine 支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开 20 个以上的线程同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程频繁切换,带来额外开销。
注意
- 对同一张表,如果新插入记录的时间戳已经存在,默认情形下(UPDATE=0)新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。如果在创建数据库时使用了 UPDATE 1 选项,插入相同时间戳的新记录将覆盖原有记录。
- 写入的数据的时间戳必须大于当前时间减去配置参数 keep 的时间。如果 keep 配置为 3650 天,那么无法写入比 3650 天还早的数据。写入数据的时间戳也不能大于当前时间加配置参数 days。如果 days 为 2,那么无法写入比当前时间还晚 2 天的数据。
示例程序
普通 SQL 写入
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- PHP
package com.taos.example;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
public class RestInsertExample {
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
private static List<String> getRawData() {
return Arrays.asList(
"d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,California.SanFrancisco,2",
"d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,California.SanFrancisco,2",
"d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,California.SanFrancisco,2",
"d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,California.SanFrancisco,3",
"d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,California.LosAngeles,2",
"d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,California.LosAngeles,2",
"d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,California.LosAngeles,3",
"d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,California.LosAngeles,3"
);
}
/**
* The generated SQL is:
* INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000)
* power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000)
* power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000)
* power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000)
* power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000)
* power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000)
* power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000)
* power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000)
*/
private static String getSQL() {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (String line : getRawData()) {
String[] ps = line.split(",");
sb.append("power." + ps[0]).append(" USING power.meters TAGS(")
.append(ps[5]).append(", ") // tag: location
.append(ps[6]) // tag: groupId
.append(") VALUES(")
.append('\'').append(ps[1]).append('\'').append(",") // ts
.append(ps[2]).append(",") // current
.append(ps[3]).append(",") // voltage
.append(ps[4]).append(") "); // phase
}
return sb.toString();
}
public static void insertData() throws SQLException {
try (Connection conn = getConnection()) {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE power KEEP 3650");
stmt.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
"TAGS (location BINARY(64), groupId INT)");
String sql = getSQL();
int rowCount = stmt.executeUpdate(sql);
System.out.println("rowCount=" + rowCount); // rowCount=8
}
}
}
public static void main(String[] args) throws SQLException {
insertData();
}
}
import taos
lines = ["d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,California.SanFrancisco,2",
"d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,California.LosAngeles,3",
"d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,California.LosAngeles,2",
"d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,California.LosAngeles,3",
"d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,California.SanFrancisco,3",
"d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,California.SanFrancisco,2",
"d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,California.SanFrancisco,2",
"d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,California.LosAngeles,2"]
def get_connection() -> taos.TaosConnection:
"""
create connection use firstEp in taos.cfg and use default user and password.
"""
return taos.connect()
def create_stable(conn: taos.TaosConnection):
conn.execute("CREATE DATABASE power")
conn.execute("USE power")
conn.execute("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
# The generated SQL is:
# INSERT INTO d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
# d1002 USING meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
# d1003 USING meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
# d1004 USING meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)
def get_sql():
global lines
lines = map(lambda line: line.split(','), lines) # [['d1001', ...]...]
lines = sorted(lines, key=lambda ls: ls[0]) # sort by table name
sql = "INSERT INTO "
tb_name = None
for ps in lines:
tmp_tb_name = ps[0]
if tb_name != tmp_tb_name:
tb_name = tmp_tb_name
sql += f"{tb_name} USING meters TAGS({ps[5]}, {ps[6]}) VALUES "
sql += f"('{ps[1]}', {ps[2]}, {ps[3]}, {ps[4]}) "
return sql
def insert_data(conn: taos.TaosConnection):
sql = get_sql()
affected_rows = conn.execute(sql)
print("affected_rows", affected_rows) # 8
if __name__ == '__main__':
connection = get_connection()
try:
create_stable(connection)
insert_data(connection)
finally:
connection.close()
package main
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v2/taosRestful"
)
func createStable(taos *sql.DB) {
_, err := taos.Exec("CREATE DATABASE power")
if err != nil {
fmt.Println("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
if err != nil {
fmt.Println("failed to create stable, err:", err)
}
}
func insertData(taos *sql.DB) {
sql := `INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
result, err := taos.Exec(sql)
if err != nil {
fmt.Println("failed to insert, err:", err)
return
}
rowsAffected, err := result.RowsAffected()
if err != nil {
fmt.Println("failed to get affected rows, err:", err)
return
}
fmt.Println("RowsAffected", rowsAffected)
}
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
defer taos.Close()
createStable(taos)
insertData(taos)
}
use libtaos::*;
#[tokio::main]
async fn main() -> Result<(), Error> {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.create_database("power").await?;
taos.exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let sql = "INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
let result = taos.query(sql).await?;
println!("{:?}", result);
Ok(())
}
// output:
// TaosQueryData { column_meta: [ColumnMeta { name: "affected_rows", type_: Int, bytes: 4 }], rows: [[Int(8)]] }
const taos = require("td2.0-connector");
const conn = taos.connect({
host: "localhost",
});
const cursor = conn.cursor();
try {
cursor.execute("CREATE DATABASE power");
cursor.execute("USE power");
cursor.execute(
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
);
var sql = `INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`;
cursor.execute(sql);
} finally {
cursor.close();
conn.close();
}
// run with: node insert_example.js
// output:
// Successfully connected to TDengine
// Query OK, 0 row(s) affected (0.00509570s)
// Query OK, 0 row(s) affected (0.00130880s)
// Query OK, 0 row(s) affected (0.00467900s)
// Query OK, 8 row(s) affected (0.04043550s)
// Connection is closed
using TDengineDriver;
namespace TDengineExample
{
internal class SQLInsertExample
{
static void Main()
{
IntPtr conn = GetConnection();
IntPtr res = TDengine.Query(conn, "CREATE DATABASE power");
CheckRes(conn, res, "failed to create database");
res = TDengine.Query(conn, "USE power");
CheckRes(conn, res, "failed to change database");
res = TDengine.Query(conn, "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
CheckRes(conn, res, "failed to create stable");
var sql = "INSERT INTO d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
"d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
"d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
"d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
res = TDengine.Query(conn, sql);
CheckRes(conn, res, "failed to insert data");
int affectedRows = TDengine.AffectRows(res);
Console.WriteLine("affectedRows " + affectedRows);
ExitProgram(conn, 0);
}
static IntPtr GetConnection()
{
string host = "localhost";
short port = 6030;
string username = "root";
string password = "taosdata";
string dbname = "";
var conn = TDengine.Connect(host, username, password, dbname, port);
if (conn == IntPtr.Zero)
{
Console.WriteLine("Connect to TDengine failed");
Environment.Exit(0);
}
else
{
Console.WriteLine("Connect to TDengine success");
}
return conn;
}
static void CheckRes(IntPtr conn, IntPtr res, String errorMsg)
{
if (TDengine.ErrorNo(res) != 0)
{
Console.Write(errorMsg + " since: " + TDengine.Error(res));
ExitProgram(conn, 1);
}
}
static void ExitProgram(IntPtr conn, int exitCode)
{
TDengine.Close(conn);
TDengine.Cleanup();
Environment.Exit(exitCode);
}
}
}
// output:
// Connect to TDengine success
// affectedRows 8
// compile with
// gcc -o insert_example insert_example.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include "taos.h"
/**
* @brief execute sql and print affected rows.
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
printf("Error code: %d; Message: %s\n", code, taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
int affectedRows = taos_affected_rows(res);
printf("affected rows %d\n", affectedRows);
taos_free_result(res);
}
int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
if (taos == NULL) {
printf("failed to connect to server\n");
exit(EXIT_FAILURE);
}
executeSQL(taos, "CREATE DATABASE power");
executeSQL(taos, "USE power");
executeSQL(taos, "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
executeSQL(taos, "INSERT INTO d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)"
"d1002 USING meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)"
"d1003 USING meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)"
"d1004 USING meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)");
taos_close(taos);
taos_cleanup();
}
// output:
// affected rows 0
// affected rows 0
// affected rows 0
// affected rows 8
<?php
use TDengine\Connection;
use TDengine\Exception\TDengineException;
try {
// instantiate
$host = 'localhost';
$port = 6030;
$username = 'root';
$password = 'taosdata';
$dbname = 'power';
$connection = new Connection($host, $port, $username, $password, $dbname);
// connect
$connection->connect();
// insert
$connection->query('CREATE DATABASE if not exists power');
$connection->query('CREATE STABLE if not exists meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)');
$resource = $connection->query(<<<'SQL'
INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)
SQL);
// get affected rows
var_dump($resource->affectedRows());
} catch (TDengineException $e) {
// throw exception
throw $e;
}
备注
- 无论 RESTful 方式建立连接还是本地驱动方式建立连接,以上示例代码都能正常工作。
- 唯一需要注意的是:由于 RESTful 接口无状态, 不能使用
use db
语句来切换数据库, 所以在上面示例中使用了dbName.tbName
指定表名。
参数绑定写入
TDengine 也提供了支持参数绑定的 Prepare API,与 MySQL 类似,这些 API 目前也仅支持用问号 ?
来代表待绑定的参数。从 2.1.1.0 和 2.1.2.0 版本开始,TDengine 大幅改进了参数绑定接口对数据写入(INSERT)场景的支持。这样在通过参数绑定接口写入数据时,就避免了 SQL 语法解析的资源消耗,从而在绝大多数情况下显著提升写入性能。
需要注意的是,只有使用原生连接的连接器,才能使用参数绑定功能。
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- PHP
package com.taos.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class StmtInsertExample {
private static ArrayList<Long> tsToLongArray(String ts) {
ArrayList<Long> result = new ArrayList<>();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
LocalDateTime localDateTime = LocalDateTime.parse(ts, formatter);
result.add(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
return result;
}
private static <T> ArrayList<T> toArray(T v) {
ArrayList<T> result = new ArrayList<>();
result.add(v);
return result;
}
private static List<String> getRawData() {
return Arrays.asList(
"d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,California.SanFrancisco,2",
"d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,California.SanFrancisco,2",
"d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,California.SanFrancisco,2",
"d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,California.SanFrancisco,3",
"d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,California.LosAngeles,2",
"d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,California.LosAngeles,2",
"d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,California.LosAngeles,3",
"d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,California.LosAngeles,3"
);
}
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
private static void createTable(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE power KEEP 3650");
stmt.executeUpdate("USE power");
stmt.execute("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
"TAGS (location BINARY(64), groupId INT)");
}
}
private static void insertData() throws SQLException {
try (Connection conn = getConnection()) {
createTable(conn);
String psql = "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)";
try (TSDBPreparedStatement pst = (TSDBPreparedStatement) conn.prepareStatement(psql)) {
for (String line : getRawData()) {
String[] ps = line.split(",");
// bind table name and tags
pst.setTableName(ps[0]);
pst.setTagString(0, ps[5]);
pst.setTagInt(1, Integer.valueOf(ps[6]));
// bind values
pst.setTimestamp(0, tsToLongArray(ps[1])); //ps[1] looks like: 2018-10-03 14:38:05.000
pst.setFloat(1, toArray(Float.valueOf(ps[2])));
pst.setInt(2, toArray(Integer.valueOf(ps[3])));
pst.setFloat(3, toArray(Float.valueOf(ps[4])));
pst.columnDataAddBatch();
}
pst.columnDataExecuteBatch();
}
}
}
public static void main(String[] args) throws SQLException {
insertData();
}
}
import taos
from datetime import datetime
# note: lines have already been sorted by table name
lines = [('d1001', '2018-10-03 14:38:05.000', 10.30000, 219, 0.31000, 'California.SanFrancisco', 2),
('d1001', '2018-10-03 14:38:15.000', 12.60000, 218, 0.33000, 'California.SanFrancisco', 2),
('d1001', '2018-10-03 14:38:16.800', 12.30000, 221, 0.31000, 'California.SanFrancisco', 2),
('d1002', '2018-10-03 14:38:16.650', 10.30000, 218, 0.25000, 'California.SanFrancisco', 3),
('d1003', '2018-10-03 14:38:05.500', 11.80000, 221, 0.28000, 'California.LosAngeles', 2),
('d1003', '2018-10-03 14:38:16.600', 13.40000, 223, 0.29000, 'California.LosAngeles', 2),
('d1004', '2018-10-03 14:38:05.000', 10.80000, 223, 0.29000, 'California.LosAngeles', 3),
('d1004', '2018-10-03 14:38:06.500', 11.50000, 221, 0.35000, 'California.LosAngeles', 3)]
def get_ts(ts: str):
dt = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S.%f')
return int(dt.timestamp() * 1000)
def create_stable():
conn = taos.connect()
try:
conn.execute("CREATE DATABASE power")
conn.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
finally:
conn.close()
def bind_row_by_row(stmt: taos.TaosStmt):
tb_name = None
for row in lines:
if tb_name != row[0]:
tb_name = row[0]
tags: taos.TaosBind = taos.new_bind_params(2) # 2 is count of tags
tags[0].binary(row[5]) # location
tags[1].int(row[6]) # groupId
stmt.set_tbname_tags(tb_name, tags)
values: taos.TaosBind = taos.new_bind_params(4) # 4 is count of columns
values[0].timestamp(get_ts(row[1]))
values[1].float(row[2])
values[2].int(row[3])
values[3].float(row[4])
stmt.bind_param(values)
def insert_data():
conn = taos.connect(database="power")
try:
stmt = conn.statement("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")
bind_row_by_row(stmt)
stmt.execute()
stmt.close()
finally:
conn.close()
if __name__ == '__main__':
create_stable()
insert_data()
table_tags = {
"d1001": ('California.SanFrancisco', 2),
"d1002": ('California.SanFrancisco', 3),
"d1003": ('California.LosAngeles', 2),
"d1004": ('California.LosAngeles', 3)
}
table_values = {
"d1001": [
['2018-10-03 14:38:05.000', '2018-10-03 14:38:15.000', '2018-10-03 14:38:16.800'],
[10.3, 12.6, 12.3],
[219, 218, 221],
[0.31, 0.33, 0.32]
],
"d1002": [
['2018-10-03 14:38:16.650'], [10.3], [218], [0.25]
],
"d1003": [
['2018-10-03 14:38:05.500', '2018-10-03 14:38:16.600'],
[11.8, 13.4],
[221, 223],
[0.28, 0.29]
],
"d1004": [
['2018-10-03 14:38:05.500', '2018-10-03 14:38:06.500'],
[10.8, 11.5],
[223, 221],
[0.29, 0.35]
]
}
def bind_multi_rows(stmt: taos.TaosStmt):
"""
batch bind example
"""
for tb_name in table_values.keys():
tags = table_tags[tb_name]
tag_params = taos.new_bind_params(2)
tag_params[0].binary(tags[0])
tag_params[1].int(tags[1])
stmt.set_tbname_tags(tb_name, tag_params)
values = table_values[tb_name]
value_params = taos.new_multi_binds(4)
value_params[0].timestamp([get_ts(t) for t in values[0]])
value_params[1].float(values[1])
value_params[2].int(values[2])
value_params[3].float(values[3])
stmt.bind_param_batch(value_params)
def insert_data():
conn = taos.connect(database="power")
try:
stmt = conn.statement("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")
bind_multi_rows(stmt)
stmt.execute()
stmt.close()
finally:
conn.close()
信息
一次绑定一行效率不如一次绑定多行,但支持非 INSERT 语句。一次绑定多行效率更高,但仅支持 INSERT 语句。
package main
import (
"fmt"
"time"
"github.com/taosdata/driver-go/v2/af"
"github.com/taosdata/driver-go/v2/af/param"
"github.com/taosdata/driver-go/v2/common"
)
func checkErr(err error, prompt string) {
if err != nil {
fmt.Printf("%s\n", prompt)
panic(err)
}
}
func prepareStable(conn *af.Connector) {
_, err := conn.Exec("CREATE DATABASE power")
checkErr(err, "failed to create database")
_, err = conn.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
checkErr(err, "failed to create stable")
_, err = conn.Exec("USE power")
checkErr(err, "failed to change database")
}
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
checkErr(err, "fail to connect")
defer conn.Close()
prepareStable(conn)
// create stmt
stmt := conn.InsertStmt()
defer stmt.Close()
err = stmt.Prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")
checkErr(err, "failed to create prepare statement")
// bind table name and tags
tagParams := param.NewParam(2).AddBinary([]byte("California.SanFrancisco")).AddInt(2)
err = stmt.SetTableNameWithTags("d1001", tagParams)
checkErr(err, "failed to execute SetTableNameWithTags")
// specify ColumnType
var bindType *param.ColumnType = param.NewColumnType(4).AddTimestamp().AddFloat().AddInt().AddFloat()
// bind values. note: can only bind one row each time.
valueParams := []*param.Param{
param.NewParam(1).AddTimestamp(time.Unix(1648432611, 249300000), common.PrecisionMilliSecond),
param.NewParam(1).AddFloat(10.3),
param.NewParam(1).AddInt(219),
param.NewParam(1).AddFloat(0.31),
}
err = stmt.BindParam(valueParams, bindType)
checkErr(err, "BindParam error")
err = stmt.AddBatch()
checkErr(err, "AddBatch error")
// bind one more row
valueParams = []*param.Param{
param.NewParam(1).AddTimestamp(time.Unix(1648432611, 749300000), common.PrecisionMilliSecond),
param.NewParam(1).AddFloat(12.6),
param.NewParam(1).AddInt(218),
param.NewParam(1).AddFloat(0.33),
}
err = stmt.BindParam(valueParams, bindType)
checkErr(err, "BindParam error")
err = stmt.AddBatch()
checkErr(err, "AddBatch error")
// execute
err = stmt.Execute()
checkErr(err, "Execute batch error")
}
提示
driver-go 的模块 github.com/taosdata/driver-go/v2/wrapper
是 C 接口的底层封装。使用这个模块也可以实现参数绑定写入。
use bstr::BString;
use libtaos::*;
#[tokio::main]
async fn main() -> Result<(), Error> {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let mut stmt = taos.stmt("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
// bind table name and tags
stmt.set_tbname_tags(
"d1001",
[
Field::Binary(BString::from("California.SanFrancisco")),
Field::Int(2),
],
)?;
// bind values.
let values = vec![
Field::Timestamp(Timestamp::new(1648432611249, TimestampPrecision::Milli)),
Field::Float(10.3),
Field::Int(219),
Field::Float(0.31),
];
stmt.bind(&values)?;
// bind one more row
let values2 = vec![
Field::Timestamp(Timestamp::new(1648432611749, TimestampPrecision::Milli)),
Field::Float(12.6),
Field::Int(218),
Field::Float(0.33),
];
stmt.bind(&values2)?;
// execute
stmt.execute()?;
Ok(())
}
const taos = require("td2.0-connector");
const conn = taos.connect({
host: "localhost",
});
const cursor = conn.cursor();
function prepareSTable() {
cursor.execute("CREATE DATABASE power");
cursor.execute("USE power");
cursor.execute(
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
);
}
function insertData() {
// init
cursor.stmtInit();
// prepare
cursor.stmtPrepare(
"INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)"
);
// bind table name and tags
let tagBind = new taos.TaosBind(2);
tagBind.bindBinary("California.SanFrancisco");
tagBind.bindInt(2);
cursor.stmtSetTbnameTags("d1001", tagBind.getBind());
// bind values
let rows = [
[1648432611249, 10.3, 219, 0.31],
[1648432611749, 12.6, 218, 0.33],
];
for (let row of rows) {
let valueBind = new taos.TaosBind(4);
valueBind.bindTimestamp(row[0]);
valueBind.bindFloat(row[1]);
valueBind.bindInt(row[2]);
valueBind.bindFloat(row[3]);
cursor.stmtBindParam(valueBind.getBind());
cursor.stmtAddBatch();
}
// execute
cursor.stmtExecute();
cursor.stmtClose();
}
try {
prepareSTable();
insertData();
} finally {
cursor.close();
conn.close();
}
function insertData() {
// init
cursor.stmtInit();
// prepare
cursor.stmtPrepare(
"INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)"
);
// bind table name and tags
let tagBind = new taos.TaosBind(2);
tagBind.bindBinary("California.SanFrancisco");
tagBind.bindInt(2);
cursor.stmtSetTbnameTags("d1001", tagBind.getBind());
// bind values
let valueBind = new taos.TaosMultiBindArr(4);
valueBind.multiBindTimestamp([1648432611249, 1648432611749]);
valueBind.multiBindFloat([10.3, 12.6]);
valueBind.multiBindInt([219, 218]);
valueBind.multiBindFloat([0.31, 0.33]);
cursor.stmtBindParamBatch(valueBind.getMultiBindArr());
cursor.stmtAddBatch();
// execute
cursor.stmtExecute();
cursor.stmtClose();
}
信息
一次绑定一行效率不如一次绑定多行,但支持非 INSERT 语句。一次绑定多行效率更高,但仅支持 INSERT 语句。
using TDengineDriver;
namespace TDengineExample
{
internal class StmtInsertExample
{
private static IntPtr conn;
private static IntPtr stmt;
static void Main()
{
conn = GetConnection();
PrepareSTable();
// 1. init and prepare
stmt = TDengine.StmtInit(conn);
if (stmt == IntPtr.Zero)
{
Console.WriteLine("failed to init stmt, " + TDengine.Error(stmt));
ExitProgram();
}
int res = TDengine.StmtPrepare(stmt, "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)");
CheckStmtRes(res, "failed to prepare stmt");
// 2. bind table name and tags
TAOS_BIND[] tags = new TAOS_BIND[2] { TaosBind.BindBinary("California.SanFrancisco"), TaosBind.BindInt(2) };
res = TDengine.StmtSetTbnameTags(stmt, "d1001", tags);
CheckStmtRes(res, "failed to bind table name and tags");
// 3. bind values
TAOS_MULTI_BIND[] values = new TAOS_MULTI_BIND[4] {
TaosMultiBind.MultiBindTimestamp(new long[2] { 1648432611249, 1648432611749}),
TaosMultiBind.MultiBindFloat(new float?[2] { 10.3f, 12.6f}),
TaosMultiBind.MultiBindInt(new int?[2] { 219, 218}),
TaosMultiBind.MultiBindFloat(new float?[2]{ 0.31f, 0.33f})
};
res = TDengine.StmtBindParamBatch(stmt, values);
CheckStmtRes(res, "failed to bind params");
// 4. add batch
res = TDengine.StmtAddBatch(stmt);
CheckStmtRes(res, "failed to add batch");
// 5. execute
res = TDengine.StmtExecute(stmt);
CheckStmtRes(res, "faild to execute");
// 6. free
TaosBind.FreeTaosBind(tags);
TaosMultiBind.FreeTaosBind(values);
TDengine.Close(conn);
TDengine.Cleanup();
}
static IntPtr GetConnection()
{
string host = "localhost";
short port = 6030;
string username = "root";
string password = "taosdata";
string dbname = "";
var conn = TDengine.Connect(host, username, password, dbname, port);
if (conn == IntPtr.Zero)
{
Console.WriteLine("Connect to TDengine failed");
Environment.Exit(0);
}
else
{
Console.WriteLine("Connect to TDengine success");
}
return conn;
}
static void PrepareSTable()
{
IntPtr res = TDengine.Query(conn, "CREATE DATABASE power");
CheckResPtr(res, "failed to create database");
res = TDengine.Query(conn, "USE power");
CheckResPtr(res, "failed to change database");
res = TDengine.Query(conn, "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
CheckResPtr(res, "failed to create stable");
}
static void CheckStmtRes(int res, string errorMsg)
{
if (res != 0)
{
Console.WriteLine(errorMsg + ", " + TDengine.StmtErrorStr(stmt));
int code = TDengine.StmtClose(stmt);
if (code != 0)
{
Console.WriteLine($"falied to close stmt, {code} reason: {TDengine.StmtErrorStr(stmt)} ");
}
ExitProgram();
}
}
static void CheckResPtr(IntPtr res, string errorMsg)
{
if (TDengine.ErrorNo(res) != 0)
{
Console.WriteLine(errorMsg + " since:" + TDengine.Error(res));
ExitProgram();
}
}
static void ExitProgram()
{
TDengine.Close(conn);
TDengine.Cleanup();
Environment.Exit(1);
}
}
}
// compile with
// gcc -o stmt_example stmt_example.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
printf("%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char* msg) {
if (code != 0) {
printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
// bind table name and tags
TAOS_BIND tags[2];
char* location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[0].buffer_length = strlen(location);
tags[0].length = &tags[0].buffer_length;
tags[0].buffer = location;
tags[0].is_null = NULL;
tags[1].buffer_type = TSDB_DATA_TYPE_INT;
tags[1].buffer_length = sizeof(int);
tags[1].length = &tags[1].buffer_length;
tags[1].buffer = &groupId;
tags[1].is_null = NULL;
code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");
// insert two rows
Row rows[2] = {
{1648432611249, 10.3, 219, 0.31},
{1648432611749, 12.6, 218, 0.33},
};
TAOS_BIND values[4];
values[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
values[0].buffer_length = sizeof(int64_t);
values[0].length = &values[0].buffer_length;
values[0].is_null = NULL;
values[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
values[1].buffer_length = sizeof(float);
values[1].length = &values[1].buffer_length;
values[1].is_null = NULL;
values[2].buffer_type = TSDB_DATA_TYPE_INT;
values[2].buffer_length = sizeof(int);
values[2].length = &values[2].buffer_length;
values[2].is_null = NULL;
values[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
values[3].buffer_length = sizeof(float);
values[3].length = &values[3].buffer_length;
values[3].is_null = NULL;
for (int i = 0; i < 2; ++i) {
values[0].buffer = &rows[i].ts;
values[1].buffer = &rows[i].current;
values[2].buffer = &rows[i].voltage;
values[3].buffer = &rows[i].phase;
code = taos_stmt_bind_param(stmt, values); // bind param
checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param");
code = taos_stmt_add_batch(stmt); // add batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");
}
// execute
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");
int affectedRows = taos_stmt_affected_rows(stmt);
printf("successfully inserted %d rows\n", affectedRows);
// close
taos_stmt_close(stmt);
}
int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
if (taos == NULL) {
printf("failed to connect to server\n");
exit(EXIT_FAILURE);
}
executeSQL(taos, "CREATE DATABASE power");
executeSQL(taos, "USE power");
executeSQL(taos, "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
insertData(taos);
taos_close(taos);
taos_cleanup();
}
// output:
// successfully inserted 2 rows
// compile with
// gcc -o multi_bind_example multi_bind_example.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"
/**
* @brief execute sql only and ignore result set
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
printf("%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
// bind table name and tags
TAOS_BIND tags[2];
char *location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[0].buffer_length = strlen(location);
tags[0].length = &tags[0].buffer_length;
tags[0].buffer = location;
tags[0].is_null = NULL;
tags[1].buffer_type = TSDB_DATA_TYPE_INT;
tags[1].buffer_length = sizeof(int);
tags[1].length = &tags[1].buffer_length;
tags[1].buffer = &groupId;
tags[1].is_null = NULL;
code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");
// insert two rows with multi binds
TAOS_MULTI_BIND params[4];
// values to bind
int64_t ts[] = {1648432611249, 1648432611749};
float current[] = {10.3, 12.6};
int voltage[] = {219, 218};
float phase[] = {0.31, 0.33};
// is_null array
char is_null[2] = {0};
// length array
int32_t int64Len[2] = {sizeof(int64_t)};
int32_t floatLen[2] = {sizeof(float)};
int32_t intLen[2] = {sizeof(int)};
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].buffer = ts;
params[0].length = int64Len;
params[0].is_null = is_null;
params[0].num = 2;
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].buffer = current;
params[1].length = floatLen;
params[1].is_null = is_null;
params[1].num = 2;
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].buffer = voltage;
params[2].length = intLen;
params[2].is_null = is_null;
params[2].num = 2;
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].buffer = phase;
params[3].length = floatLen;
params[3].is_null = is_null;
params[3].num = 2;
code = taos_stmt_bind_param_batch(stmt, params); // bind batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch");
code = taos_stmt_add_batch(stmt); // add batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");
// execute
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");
int affectedRows = taos_stmt_affected_rows(stmt);
printf("successfully inserted %d rows\n", affectedRows);
// close
taos_stmt_close(stmt);
}
int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
if (taos == NULL) {
printf("failed to connect to server\n");
exit(EXIT_FAILURE);
}
executeSQL(taos, "DROP DATABASE IF EXISTS power");
executeSQL(taos, "CREATE DATABASE power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), "
"groupId INT)");
insertData(taos);
taos_close(taos);
taos_cleanup();
}
// output:
// successfully inserted 2 rows
<?php
use TDengine\Connection;
use TDengine\Exception\TDengineException;
try {
// instantiate
$host = 'localhost';
$port = 6030;
$username = 'root';
$password = 'taosdata';
$dbname = 'power';
$connection = new Connection($host, $port, $username, $password, $dbname);
// connect
$connection->connect();
// insert
$connection->query('CREATE DATABASE if not exists power');
$connection->query('CREATE STABLE if not exists meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)');
$stmt = $connection->prepare('INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)');
// set table name and tags
$stmt->setTableNameTags('d1001', [
// same format as parameter binding
[TDengine\TSDB_DATA_TYPE_BINARY, 'California.SanFrancisco'],
[TDengine\TSDB_DATA_TYPE_INT, 2],
]);
$stmt->bindParams([
[TDengine\TSDB_DATA_TYPE_TIMESTAMP, 1648432611249],
[TDengine\TSDB_DATA_TYPE_FLOAT, 10.3],
[TDengine\TSDB_DATA_TYPE_INT, 219],
[TDengine\TSDB_DATA_TYPE_FLOAT, 0.31],
]);
$stmt->bindParams([
[TDengine\TSDB_DATA_TYPE_TIMESTAMP, 1648432611749],
[TDengine\TSDB_DATA_TYPE_FLOAT, 12.6],
[TDengine\TSDB_DATA_TYPE_INT, 218],
[TDengine\TSDB_DATA_TYPE_FLOAT, 0.33],
]);
$resource = $stmt->execute();
// get affected rows
var_dump($resource->affectedRows());
} catch (TDengineException $e) {
// throw exception
throw $e;
}