参数绑定写入
通过参数绑定方式写入数据时,能避免SQL语法解析的资源消耗,从而显著提升写入性能。参数绑定能提高写入效率的原因主要有以下几点:
- 减少解析时间:通过参数绑定,SQL 语句的结构在第一次执行时就已经确定,后续的执行只需要替换参数值,这样可以避免每次执行时都进行语法解析,从而减少解析时间。
- 预编译:当使用参数绑定时,SQL 语句可以被预编译并缓存,后续使用不同的参数值执行时,可以直接使用预编译的版本,提高执行效率。
- 减少网络开销:参数绑定还可以减少发送到数据库的数据量,因为只需要发送参数值而不是完整的 SQL 语句,特别是在执行大量相似的插入或更新操作时,这种差异尤为明显。
Tips: 数据写入推荐使用参数绑定方式
备注
我们只推荐使用下面两种形式的 SQL 进行参数绑定写入:
一、确定子表存在:
1. INSERT INTO meters (tbname, ts, current, voltage, phase) VALUES(?, ?, ?, ?, ?)
二、自动建表:
1. INSERT INTO meters (tbname, ts, current, voltage, phase, location, group_id) VALUES(?, ?, ?, ?, ?, ?, ?)
2. INSERT INTO ? USING meters TAGS (?, ?) VALUES (?, ?, ?, ?)
下面我们继续以智能电表为例,展示各语言连接器使用参数绑定高效写入的功能:
- 准备一个参数化的 SQL 插入语句,用于向超级表
meters
中插入数据。这个语句允许动态地指定子表名、标签和列值。 - 循环生成多个子表及其对应的数据行。对于每个子表:
- 设置子表的名称和标签值(分组 ID 和位置)。
- 生成多行数据,每行包括一个时间戳、随机生成的电流、电压和相位值。
- 执行批量插入操作,将这些数据行插入到对应的子表中。
- 最后打印实际插入表中的行数。
WebSocket 连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
参数绑定有两种接口使用方式,一种是 JDBC 标准接口,一种是扩展接口,扩展接口性能更好一些。
public class WSParameterBindingStdInterfaceDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-WS://" + host + ":6041";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
// If you are certain that the child table exists, you can avoid binding the tag column to improve performance.
String sql = "INSERT INTO power.meters (tbname, groupid, location, ts, current, voltage, phase) VALUES (?,?,?,?,?,?,?)";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
long current = System.currentTimeMillis();
for (int i = 1; i <= numOfSubTable; i++) {
for (int j = 0; j < numOfRow; j++) {
pstmt.setString(1, "d_bind_" + i);
pstmt.setInt(2, i);
pstmt.setString(3, "location_" + i);
pstmt.setTimestamp(4, new Timestamp(current + j));
pstmt.setFloat(5, random.nextFloat() * 30);
pstmt.setInt(6, random.nextInt(300));
pstmt.setFloat(7, random.nextFloat());
pstmt.addBatch();
}
}
int[] exeResult = pstmt.executeBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + exeResult.length + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
public class WSParameterBindingExtendInterfaceDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-WS://" + host + ":6041";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> currentList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
currentList.add(random.nextFloat() * 30);
pstmt.setFloat(1, currentList);
// set column voltage
ArrayList<Integer> voltageList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
voltageList.add(random.nextInt(300));
pstmt.setInt(2, voltageList);
// set column phase
ArrayList<Float> phaseList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
phaseList.add(random.nextFloat());
pstmt.setFloat(3, phaseList);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + (numOfSubTable * numOfRow) + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
这是一个更详细的参数绑定示例
from datetime import datetime
import random
import taosws
numOfSubTable = 10
numOfRow = 10
conn = None
stmt = None
host="localhost"
port=6041
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
conn.execute("CREATE DATABASE IF NOT EXISTS power")
conn.execute("USE power")
conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# ANCHOR: stmt
sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
stmt = conn.statement()
stmt.prepare(sql)
for i in range(numOfSubTable):
tbname = f"d_bind_{i}"
tags = [
taosws.int_to_tag(i),
taosws.varchar_to_tag(f"location_{i}"),
]
stmt.set_tbname_tags(tbname, tags)
current = int(datetime.now().timestamp() * 1000)
timestamps = []
currents = []
voltages = []
phases = []
for j in range (numOfRow):
timestamps.append(current + i)
currents.append(random.random() * 30)
voltages.append(random.randint(100, 300))
phases.append(random.random())
stmt.bind_param(
[
taosws.millis_timestamps_to_column(timestamps),
taosws.floats_to_column(currents),
taosws.ints_to_column(voltages),
taosws.floats_to_column(phases),
]
)
stmt.add_batch()
stmt.execute()
print(f"Successfully inserted to power.meters.")
except Exception as err:
print(f"Failed to insert to table meters using stmt, ErrMessage:{err}")
raise err
finally:
if stmt:
stmt.close()
if conn:
conn.close()
package main
import (
"database/sql"
"fmt"
"log"
"math/rand"
"time"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/ws/stmt"
)
func main() {
host := "127.0.0.1"
numOfSubTable := 10
numOfRow := 10
taosDSN := fmt.Sprintf("root:taosdata@http(%s:6041)/", host)
db, err := sql.Open("taosRestful", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
}
defer db.Close()
// prepare database and table
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable power.meters, ErrMessage: " + err.Error())
}
config := stmt.NewConfig(fmt.Sprintf("ws://%s:6041", host), 0)
config.SetConnectUser("root")
config.SetConnectPass("taosdata")
config.SetConnectDB("power")
config.SetMessageTimeout(common.DefaultMessageTimeout)
config.SetWriteWait(common.DefaultWriteWait)
connector, err := stmt.NewConnector(config)
if err != nil {
log.Fatalln("Failed to create stmt connector,url: " + taosDSN + "; ErrMessage: " + err.Error())
}
// prepare statement
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
stmt, err := connector.Init()
if err != nil {
log.Fatalln("Failed to init stmt, sql: " + sql + ", ErrMessage: " + err.Error())
}
err = stmt.Prepare(sql)
if err != nil {
log.Fatal("Failed to prepare sql, sql: " + sql + ", ErrMessage: " + err.Error())
}
for i := 1; i <= numOfSubTable; i++ {
tableName := fmt.Sprintf("d_bind_%d", i)
tags := param.NewParam(2).AddInt(i).AddBinary([]byte(fmt.Sprintf("location_%d", i)))
tagsType := param.NewColumnType(2).AddInt().AddBinary(24)
columnType := param.NewColumnType(4).AddTimestamp().AddFloat().AddInt().AddFloat()
// set tableName
err = stmt.SetTableName(tableName)
if err != nil {
log.Fatal("Failed to set table name, tableName: " + tableName + "; ErrMessage: " + err.Error())
}
// set tags
err = stmt.SetTags(tags, tagsType)
if err != nil {
log.Fatal("Failed to set tags, ErrMessage: " + err.Error())
}
// bind column data
current := time.Now()
for j := 0; j < numOfRow; j++ {
columnData := make([]*param.Param, 4)
columnData[0] = param.NewParam(1).AddTimestamp(current.Add(time.Millisecond*time.Duration(j)), common.PrecisionMilliSecond)
columnData[1] = param.NewParam(1).AddFloat(rand.Float32() * 30)
columnData[2] = param.NewParam(1).AddInt(rand.Intn(300))
columnData[3] = param.NewParam(1).AddFloat(rand.Float32())
err = stmt.BindParam(columnData, columnType)
if err != nil {
log.Fatal("Failed to bind params, ErrMessage: " + err.Error())
}
}
// add batch
err = stmt.AddBatch()
if err != nil {
log.Fatal("Failed to add batch, ErrMessage: " + err.Error())
}
// execute batch
err = stmt.Exec()
if err != nil {
log.Fatal("Failed to exec, ErrMessage: " + err.Error())
}
// get affected rows
affected := stmt.GetAffectedRows()
// you can check exeResult here
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
}
err = stmt.Close()
if err != nil {
log.Fatal("Failed to close stmt, ErrMessage: " + err.Error())
}
}
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", table_name, tags, err);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to bind values, values:{:?}, ErrMessage: {}", values, err);
return Err(err.into());
}
}
}
match stmt.add_batch().await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert to table meters using stmt, ErrMessage: {}", err);
return Err(err.into());
}
}
Ok(())
}
const taos = require("@tdengine/websocket");
let db = 'power';
let stable = 'meters';
let numOfSubTable = 10;
let numOfRow = 10;
let dsn = 'ws://localhost:6041'
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1)) + min;
}
async function prepare() {
let conf = new taos.WSConfig(dsn);
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb(db)
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(`CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`);
await wsSql.exec(`CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`);
return wsSql
}
async function test() {
let stmt = null;
let connector = null;
try {
connector = await prepare();
stmt = await connector.stmtInit();
await stmt.prepare(`INSERT INTO ? USING ${db}.${stable} (location, groupId) TAGS (?, ?) VALUES (?, ?, ?, ?)`);
for (let i = 0; i < numOfSubTable; i++) {
await stmt.setTableName(`d_bind_${i}`);
let tagParams = stmt.newStmtParam();
tagParams.setVarchar([`location_${i}`]);
tagParams.setInt([i]);
await stmt.setTags(tagParams);
let timestampParams = [];
let currentParams = [];
let voltageParams = [];
let phaseParams = [];
const currentMillis = new Date().getTime();
for (let j = 0; j < numOfRow; j++) {
timestampParams.push(currentMillis + j);
currentParams.push(Math.random() * 30);
voltageParams.push(getRandomInt(100, 300));
phaseParams.push(Math.random());
}
let bindParams = stmt.newStmtParam();
bindParams.setTimestamp(timestampParams);
bindParams.setFloat(currentParams);
bindParams.setInt(voltageParams);
bindParams.setFloat(phaseParams);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
console.log("Successfully inserted " + stmt.getLastAffected() + " to power.meters.");
}
}
catch (err) {
console.error(`Failed to insert to table meters using stmt, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (stmt) {
await stmt.close();
}
if (connector) {
await connector.close();
}
taos.destroy();
}
}
test()
public static void Main(string[] args)
{
var host = "127.0.0.1";
var numOfSubTable = 10;
var numOfRow = 10;
var random = new Random();
var connectionString = $"protocol=WebSocket;host={host};port=6041;useSSL=false;username=root;password=taosdata";
try
{
var builder = new ConnectionStringBuilder(connectionString);
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// create table
client.Exec(
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
stmt.Prepare(sql);
for (int i = 1; i <= numOfSubTable; i++)
{
var tableName = $"d_bind_{i}";
// set table name
stmt.SetTableName(tableName);
// set tags
stmt.SetTags(new object[] { i, $"location_{i}" });
var current = DateTime.Now;
// bind rows
for (int j = 0; j < numOfRow; j++)
{
stmt.BindRow(new object[]
{
current.Add(TimeSpan.FromMilliseconds(j)),
random.NextSingle() * 30,
random.Next(300),
random.NextSingle()
});
}
// add batch
stmt.AddBatch();
// execute
stmt.Exec();
// get affected rows
var affectedRows = stmt.Affected();
Console.WriteLine($"Successfully inserted {affectedRows} rows to {tableName}.");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert to table meters using stmt, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert to table meters using stmt, ErrMessage: " + e.Message);
throw;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taosws.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(WS_TAOS *taos, const char *sql) {
WS_RES *res = ws_query(taos, sql);
int code = ws_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", ws_errstr(res));
ws_free_result(res);
ws_close(taos);
exit(EXIT_FAILURE);
}
ws_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(WS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg, code, ws_stmt_errstr(stmt));
ws_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
int num_of_sub_table = 10;
int num_of_row = 10;
int total_affected = 0;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(WS_TAOS *taos) {
// init
WS_STMT *stmt = ws_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init ws_stmt, error: %s\n", ws_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = ws_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute ws_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
char location[20];
sprintf(location, "location_%d", i);
// set table name and tags
WS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = &i;
tags[0].is_null = NULL;
tags[0].num = 1;
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length = (int32_t *)&tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = ws_stmt_set_tbname_tags(stmt, table_name, tags, 2);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
WS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].length = (int32_t *)¶ms[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
// current
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].length = (int32_t *)¶ms[1].buffer_length;
params[1].is_null = NULL;
params[1].num = 1;
// voltage
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].length = (int32_t *)¶ms[2].buffer_length;
params[2].is_null = NULL;
params[2].num = 1;
// phase
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].length = (int32_t *)¶ms[3].buffer_length;
params[3].is_null = NULL;
params[3].num = 1;
for (int j = 0; j < num_of_row; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
int64_t ts = milliseconds + j;
float current = (float)rand() / RAND_MAX * 30;
int voltage = rand() % 300;
float phase = (float)rand() / RAND_MAX;
params[0].buffer = &ts;
params[1].buffer = ¤t;
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = ws_stmt_bind_param_batch(stmt, params, 4);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = ws_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
int affected_rows = 0;
code = ws_stmt_execute(stmt, &affected_rows);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = ws_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
ws_stmt_close(stmt);
}
int main() {
int code = 0;
char *dsn = "ws://localhost:6041";
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
ws_close(taos);
}
不支持
原生连接
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class ParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> currentList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
currentList.add(random.nextFloat() * 30);
pstmt.setFloat(1, currentList);
// set column voltage
ArrayList<Integer> voltageList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
voltageList.add(random.nextInt(300));
pstmt.setInt(2, voltageList);
// set column phase
ArrayList<Float> phaseList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
phaseList.add(random.nextFloat());
pstmt.setFloat(3, phaseList);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + (numOfSubTable * numOfRow) + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
这是一个更详细的参数绑定示例
import taos
from datetime import datetime
import random
numOfSubTable = 10
numOfRow = 10
conn = None
stmt2 = None
host="localhost"
port=6030
try:
# 1 connect
conn = taos.connect(
user="root",
password="taosdata",
host=host,
port=port,
)
# 2 create db and table
conn.execute("CREATE DATABASE IF NOT EXISTS power")
conn.execute("USE power")
conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# 3 prepare
sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
stmt2 = conn.statement2(sql)
tbnames = []
tags = []
datas = []
for i in range(numOfSubTable):
# tbnames
tbnames.append(f"d_bind_{i}")
# tags
tags.append([i, f"location_{i}"])
# datas
current = int(datetime.now().timestamp() * 1000)
timestamps = []
currents = []
voltages = []
phases = []
for j in range (numOfRow):
timestamps.append(current + i*1000 + j)
currents.append(float(random.random() * 30))
voltages.append(random.randint(100, 300))
phases.append(float(random.random()))
data = [timestamps, currents, voltages, phases]
datas.append(data)
# 4 bind param
stmt2.bind_param(tbnames, tags, datas)
# 5 execute
stmt2.execute()
# show
print(f"Successfully inserted with stmt2 to power.meters. child={numOfSubTable} rows={numOfRow} \n")
except Exception as err:
print(f"Failed to insert to table meters using stmt2, ErrMessage:{err}")
raise err
finally:
if stmt2:
stmt2.close()
if conn:
conn.close()
stmt2 绑定参数的示例代码如下(go 连接器 v3.6.0 及以上,TDengine v3.3.5.0 及以上):
package main
import (
"database/sql/driver"
"fmt"
"log"
"math/rand"
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/stmt"
)
func main() {
host := "127.0.0.1"
numOfSubTable := 10
numOfRow := 10
db, err := af.Open(host, "root", "taosdata", "", 0)
if err != nil {
log.Fatalln("Failed to connect to " + host + "; ErrMessage: " + err.Error())
}
defer db.Close()
// prepare database and table
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("USE power")
if err != nil {
log.Fatalln("Failed to use database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
}
// prepare statement
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
reqID := common.GetReqID()
stmt2 := db.Stmt2(reqID, false)
err = stmt2.Prepare(sql)
if err != nil {
log.Fatalln("Failed to prepare sql, sql: " + sql + ", ErrMessage: " + err.Error())
}
for i := 1; i <= numOfSubTable; i++ {
// generate column data
current := time.Now()
columns := make([][]driver.Value, 4)
for j := 0; j < numOfRow; j++ {
columns[0] = append(columns[0], current.Add(time.Millisecond*time.Duration(j)))
columns[1] = append(columns[1], rand.Float32()*30)
columns[2] = append(columns[2], rand.Int31n(300))
columns[3] = append(columns[3], rand.Float32())
}
// generate bind data
tableName := fmt.Sprintf("d_bind_%d", i)
tags := []driver.Value{int32(i), []byte(fmt.Sprintf("location_%d", i))}
bindData := []*stmt.TaosStmt2BindData{
{
TableName: tableName,
Tags: tags,
Cols: columns,
},
}
// bind params
err = stmt2.Bind(bindData)
if err != nil {
log.Fatalln("Failed to bind params, ErrMessage: " + err.Error())
}
// execute batch
err = stmt2.Execute()
if err != nil {
log.Fatalln("Failed to exec, ErrMessage: " + err.Error())
}
// get affected rows
affected := stmt2.GetAffectedRows()
// you can check exeResult here
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
}
err = stmt2.Close()
if err != nil {
log.Fatal("failed to close statement, err:", err)
}
}
stmt 绑定参数的示例代码如下:
package main
import (
"fmt"
"log"
"math/rand"
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)
func main() {
host := "127.0.0.1"
numOfSubTable := 10
numOfRow := 10
db, err := af.Open(host, "root", "taosdata", "", 0)
if err != nil {
log.Fatalln("Failed to connect to " + host + "; ErrMessage: " + err.Error())
}
defer db.Close()
// prepare database and table
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("USE power")
if err != nil {
log.Fatalln("Failed to use database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
}
// prepare statement
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
stmt := db.Stmt()
err = stmt.Prepare(sql)
if err != nil {
log.Fatalln("Failed to prepare sql, sql: " + sql + ", ErrMessage: " + err.Error())
}
for i := 1; i <= numOfSubTable; i++ {
tableName := fmt.Sprintf("d_bind_%d", i)
tags := param.NewParam(2).AddInt(i).AddBinary([]byte(fmt.Sprintf("location_%d", i)))
// set tableName and tags
err = stmt.SetTableNameWithTags(tableName, tags)
if err != nil {
log.Fatalln("Failed to set table name and tags, tableName: " + tableName + "; ErrMessage: " + err.Error())
}
// bind column data
current := time.Now()
for j := 0; j < numOfRow; j++ {
row := param.NewParam(4).
AddTimestamp(current.Add(time.Millisecond*time.Duration(j)), common.PrecisionMilliSecond).
AddFloat(rand.Float32() * 30).
AddInt(rand.Intn(300)).
AddFloat(rand.Float32())
err = stmt.BindRow(row)
if err != nil {
log.Fatalln("Failed to bind params, ErrMessage: " + err.Error())
}
}
// add batch
err = stmt.AddBatch()
if err != nil {
log.Fatalln("Failed to add batch, ErrMessage: " + err.Error())
}
// execute batch
err = stmt.Execute()
if err != nil {
log.Fatalln("Failed to exec, ErrMessage: " + err.Error())
}
// get affected rows
affected := stmt.GetAffectedRows()
// you can check exeResult here
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
}
err = stmt.Close()
if err != nil {
log.Fatal("failed to close statement, err:", err)
}
}
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", table_name, tags, err);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to bind values, values:{:?}, ErrMessage: {}", values, err);
return Err(err.into());
}
}
}
match stmt.add_batch().await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert to table meters using stmt, ErrMessage: {}", err);
return Err(err.into());
}
}
Ok(())
}
不支持
public static void Main(string[] args)
{
var host = "127.0.0.1";
var numOfSubTable = 10;
var numOfRow = 10;
var random = new Random();
var connectionString = $"host={host};port=6030;username=root;password=taosdata";
try
{
var builder = new ConnectionStringBuilder(connectionString);
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// create table
client.Exec(
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
stmt.Prepare(sql);
for (int i = 1; i <= numOfSubTable; i++)
{
var tableName = $"d_bind_{i}";
// set table name
stmt.SetTableName(tableName);
// set tags
stmt.SetTags(new object[] { i, $"location_{i}" });
var current = DateTime.Now;
// bind rows
for (int j = 0; j < numOfRow; j++)
{
stmt.BindRow(new object[]
{
current.Add(TimeSpan.FromMilliseconds(j)),
random.NextSingle() * 30,
random.Next(300),
random.NextSingle()
});
}
// add batch
stmt.AddBatch();
// execute
stmt.Exec();
// get affected rows
var affectedRows = stmt.Affected();
Console.WriteLine($"Successfully inserted {affectedRows} rows to {tableName}.");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert to table meters using stmt, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert to table meters using stmt, ErrMessage: " + e.Message);
throw;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taos.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
int num_of_sub_table = 10;
int num_of_row = 10;
int total_affected = 0;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute taos_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
char location[20];
sprintf(location, "location_%d", i);
// set table name and tags
TAOS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = &i;
tags[0].is_null = NULL;
tags[0].num = 1;
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length =(int32_t *) &tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = taos_stmt_set_tbname_tags(stmt, table_name, tags);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
TAOS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].length = (int32_t *)¶ms[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
// current
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].length = (int32_t *)¶ms[1].buffer_length;
params[1].is_null = NULL;
params[1].num = 1;
// voltage
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].length = (int32_t *)¶ms[2].buffer_length;
params[2].is_null = NULL;
params[2].num = 1;
// phase
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].length = (int32_t *)¶ms[3].buffer_length;
params[3].is_null = NULL;
params[3].num = 1;
for (int j = 0; j < num_of_row; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
int64_t ts = milliseconds + j;
float current = (float)rand() / RAND_MAX * 30;
int voltage = rand() % 300;
float phase = (float)rand() / RAND_MAX;
params[0].buffer = &ts;
params[1].buffer = ¤t;
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = taos_stmt_bind_param(stmt, params);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = taos_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = taos_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
taos_stmt_close(stmt);
}
int main() {
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
taos_close(taos);
taos_cleanup();
}
不支持