执行 SQL
TDengine 对 SQL 语言提供了全面的支持,允许用户以熟悉的 SQL 语法进行数据的查询、插入和删除操作。 TDengine 的 SQL 还支持对数据库和数据表的管理操作,如创建、修改和删除数据库及数据表。TDengine 扩展了标准 SQL,引入了时序数据处理特有的功能,如时间序列数据的聚合查询、降采样、插值查询等,以适应时序数据的特点。这些扩展使得用户可以更高效地处理时间序列数据,进行复杂的数据分析和处理。 具体支持的 SQL 语法请参考 TDengine SQL
下面介绍使用各语言连接器通过执行 SQL 完成建库、建表、写入数据和查询数据。
REST 连接:各编程语言的连接器封装使用 HTTP
请求的连接,支持数据写入和查询操作,开发者依然使用连接器提供的接口访问 TDengine
。
REST API:直接调用 taosadapter
提供的 REST API 接口,进行数据写入和查询操作。代码示例使用 curl
命令来演示。
建库和表
下面以智能电表为例,展示使用各语言连接器如何执行 SQL 命令创建一个名为 power
的数据库,然后使用 power
数据库为默认数据库。
接着创建一个名为 meters
的超级表(STABLE),其表结构包含时间戳、电流、电压、相位等列,以及分组 ID 和位置作为标签。
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement()) {
// create database
int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
// you can check rowsAffected here
System.out.println("Create database power successfully, rowsAffected: " + rowsAffected);
// create table
rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
// you can check rowsAffected here
System.out.println("Create stable power.meters successfully, rowsAffected: " + rowsAffected);
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create database power or stable meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
import taosws
conn = None
host = "localhost"
port = 6041
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
# create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
# create super table
rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
except Exception as err:
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
import taos
conn = None
host = "localhost"
port = 6030
try:
conn = taos.connect(host=host,
port=port,
user="root",
password="taosdata")
# create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
# create super table
rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
except Exception as err:
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
import taosrest
conn = None
url = "http://localhost:6041"
try:
conn = taosrest.connect(url=url,
user="root",
password="taosdata",
timeout=30)
# create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
# create super table
rowsAffected = conn.execute(
f"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
except Exception as err:
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
// create database
res, err := db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
rowsAffected, err := res.RowsAffected()
if err != nil {
log.Fatalln("Failed to get create database rowsAffected, ErrMessage: " + err.Error())
}
// you can check rowsAffected here
fmt.Println("Create database power successfully, rowsAffected: ", rowsAffected)
// create table
res, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
}
rowsAffected, err = res.RowsAffected()
if err != nil {
log.Fatalln("Failed to get create stable rowsAffected, ErrMessage: " + err.Error())
}
// you can check rowsAffected here
fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected)
let taos = TaosBuilder::from_dsn(url)?.build().await?;
// create database and use it
match taos.exec_many([
"CREATE DATABASE IF NOT EXISTS power",
]).await {
Ok(afffected_rows) => println!("Create database power successfully, rowsAffected: {}", afffected_rows),
Err(err) => {
eprintln!("Failed to create database power, ErrMessage: {}", err);
return Err(err.into());
}
}
// create super table
match taos.exec_many([
"CREATE STABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
]).await {
Ok(afffected_rows) => println!("Create stable power.meters successfully, rowsAffected: {}", afffected_rows),
Err(err) => {
eprintln!("Failed to create stable power.meters, ErrMessage: {}", err);
return Err(err.into());
}
}
Ok(())
async function createDbAndTable() {
let wsSql = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power');
console.log("Create database power successfully.");
// create table
await wsSql.exec('CREATE STABLE IF NOT EXISTS power.meters ' +
'(ts timestamp, current float, voltage int, phase float) ' +
'TAGS (location binary(64), groupId int);');
console.log("Create stable power.meters successfully");
} catch (err) {
console.error(`Failed to create database power or stable meters, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
try
{
// create database
var affected = client.Exec("CREATE DATABASE IF NOT EXISTS power");
Console.WriteLine($"Create database power successfully, rowsAffected: {affected}");
// create table
affected = client.Exec(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
Console.WriteLine($"Create stable power.meters successfully, rowsAffected: {affected}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to create database power or stable meters, ErrCode: " + e.Code +
", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to create database power or stable meters, ErrMessage: " + e.Message);
throw;
}
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// create database
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
fprintf(stdout, "Create database power successfully.\n");
// create table
const char *sql =
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId "
"INT, location BINARY(24))";
result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
fprintf(stdout, "Create stable power.meters successfully.\n");
// close & clean
ws_close(taos);
return 0;
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// create database
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
fprintf(stdout, "Create database power successfully.\n");
// create table
const char *sql =
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId "
"INT, location BINARY(24))";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
fprintf(stdout, "Create stable power.meters successfully.\n");
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
创建数据库
curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' \
--data 'CREATE DATABASE IF NOT EXISTS power'
创建表,在 url 中指定数据库为 power
curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql/power' \
--data 'CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))'
注意:建议采用
<dbName>.<tableName>
的格式构造SQL语句,不推荐在应用中采用USE DBName
方式访问。
插入数据
下面以智能电表为例,展示如何使用连接器执行 SQL 来插入数据到 power
数据库的 meters
超级表。样例使用 TDengine 自动建表 SQL 语法,写入 d1001 子表中 3 条数据,写入 d1002 子表中 1 条数据,然后打印出实际插入数据条数。
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
// insert data, please make sure the database and table are created before
String insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement()) {
int affectedRows = stmt.executeUpdate(insertQuery);
// you can check affectedRows here
System.out.println("Successfully inserted " + affectedRows + " rows to power.meters.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, sql: %s, %sErrMessage: %s%n",
insertQuery,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
Note NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年) 。
import taosws
conn = None
host="localhost"
port=6041
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
sql = """
INSERT INTO
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
affectedRows = conn.execute(sql)
print(f"Successfully inserted {affectedRows} rows to power.meters.")
except Exception as err:
print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.")
raise err
finally:
if conn:
conn.close()
import taos
conn = None
host = "localhost"
port = 6030
try:
conn = taos.connect(host=host,
port=port,
user="root",
password="taosdata")
sql = """
INSERT INTO
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
affectedRows = conn.execute(sql)
print(f"Successfully inserted {affectedRows} rows to power.meters.")
except Exception as err:
print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.")
raise err
finally:
if conn:
conn.close()
import taosrest
conn = None
url="http://localhost:6041"
try:
conn = taosrest.connect(url=url,
user="root",
password="taosdata",
timeout=30)
sql = """
INSERT INTO
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
affectedRows = conn.execute(sql)
print(f"Successfully inserted {affectedRows} rows to power.meters.")
except Exception as err:
print(f"Failed to insert data to power.meters, sql:{sql}, ErrMessage:{err}.")
raise err
finally:
if conn:
conn.close()
// insert data, please make sure the database and table are created before
insertQuery := "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) "
res, err = db.Exec(insertQuery)
if err != nil {
log.Fatalf("Failed to insert data to power.meters, sql: %s, ErrMessage: %s\n", insertQuery, err.Error())
}
rowsAffected, err = res.RowsAffected()
if err != nil {
log.Fatalf("Failed to get insert rowsAffected, sql: %s, ErrMessage: %s\n", insertQuery, err.Error())
}
// you can check affectedRows here
fmt.Printf("Successfully inserted %d rows to power.meters.\n", rowsAffected)
let insert_sql = r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000)
(NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 218, 0.25000) "#;
match taos.exec(insert_sql).await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert data to power.meters, sql: {}, ErrMessage: {}", insert_sql, err);
return Err(err.into());
}
}
async function insertData() {
let wsSql = null
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
try {
wsSql = await createConnect();
taosResult = await wsSql.exec(insertQuery);
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to power.meters.");
} catch (err) {
console.error(`Failed to insert data to power.meters, sql: ${insertQuery}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
// insert data, please make sure the database and table are created before
var insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
try
{
var affectedRows = client.Exec(insertQuery);
Console.WriteLine("Successfully inserted " + affectedRows + " rows to power.meters.");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert data to power.meters, sql: " + insertQuery + ", ErrCode: " +
e.Code + ", ErrMessage: " +
e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert data to power.meters, sql: " + insertQuery + ", ErrMessage: " +
e.Message);
throw;
}
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// insert data, please make sure the database and table are already created
const char *sql =
"INSERT INTO "
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 219, 0.31000) "
"(NOW + 2a, 12.60000, 218, 0.33000) "
"(NOW + 3a, 12.30000, 221, 0.31000) "
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 218, 0.25000) ";
WS_RES *result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// you can check affectedRows here
int rows = ws_affected_rows(result);
fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows);
// close & clean
ws_close(taos);
return 0;
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// insert data, please make sure the database and table are already created
const char *sql =
"INSERT INTO "
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 219, 0.31000) "
"(NOW + 2a, 12.60000, 218, 0.33000) "
"(NOW + 3a, 12.30000, 221, 0.31000) "
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 218, 0.25000) ";
TAOS_RES *result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// you can check affectedRows here
int rows = taos_affected_rows(result);
fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
Note NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。
写入数据
curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' \
--data 'INSERT INTO power.d1001 USING power.meters TAGS(2,'\''California.SanFrancisco'\'') VALUES (NOW + 1a, 10.30000, 219, 0.31000) (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000) power.d1002 USING power.meters TAGS(3, '\''California.SanFrancisco'\'') VALUES (NOW + 1a, 10.30000, 218, 0.25000)'
查询数据
下面以智能电表为例,展示如何使用各语言连接器执行 SQL 来查询数据,从 power
数据库 meters
超级表中查询最多 100 行数据,并将获取到的结果按行打印出来。
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
String sql = "SELECT ts, current, location FROM power.meters limit 100";
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement();
// query data, make sure the database and table are created before
ResultSet resultSet = stmt.executeQuery(sql)) {
Timestamp ts;
float current;
String location;
while (resultSet.next()) {
ts = resultSet.getTimestamp(1);
current = resultSet.getFloat(2);
// we recommend using the column name to get the value
location = resultSet.getString("location");
// you can check data here
System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to query data from power.meters, sql: %s, %sErrMessage: %s%n",
sql,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
Note 查询和操作关系型数据库一致,使用下标获取返回字段内容时从 1 开始,建议使用字段名称获取。
import taosws
conn = None
host="localhost"
port=6041
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
sql = "SELECT ts, current, location FROM power.meters limit 100"
result = conn.query(sql)
for row in result:
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
except Exception as err:
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
import taos
host="localhost"
port=6030
conn = None
try:
conn = taos.connect(host=host,
port=port,
user="root",
password="taosdata")
sql = "SELECT ts, current, location FROM power.meters limit 100"
result = conn.query(sql)
# Get data from result as list of tuple
data = result.fetch_all()
for row in data:
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
except Exception as err:
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
import taosrest
client = None
url="http://localhost:6041"
try:
client = taosrest.RestClient(url=url,
user="root",
password="taosdata",
timeout=30)
sql = f"SELECT ts, current, location FROM power.meters limit 100"
result = client.sql(sql)
if result["data"]:
for row in result["data"]:
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
except Exception as err:
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
raise err
// query data, make sure the database and table are created before
sql := "SELECT ts, current, location FROM power.meters limit 100"
rows, err := db.Query(sql)
if err != nil {
log.Fatalf("Failed to query data from power.meters, sql: %s, ErrMessage: %s\n", sql, err.Error())
}
for rows.Next() {
// Add your data processing logic here
var (
ts time.Time
current float32
location string
)
err = rows.Scan(&ts, ¤t, &location)
if err != nil {
log.Fatalf("Failed to scan data, sql: %s, ErrMessage: %s\n", sql, err)
}
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
}
// query data, make sure the database and table are created before
let sql = "SELECT ts, current, location FROM power.meters limit 100";
match taos.query(sql).await{
Ok(mut result) => {
for field in result.fields() {
println!("got field: {}", field.name());
}
let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
}
Err(err) => {
eprintln!("Failed to query data from power.meters, sql: {}, ErrMessage: {}", sql, err);
return Err(err.into());
}
}
rust 连接器还支持使用 serde 进行反序列化行为结构体的结果获取方式:
// query data, make sure the database and table are created before
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// binary/varchar to String
location: String,
}
let sql = "SELECT ts, current, location FROM power.meters limit 100";
match taos.query("SELECT ts, current, location FROM power.meters limit 100").await {
Ok(mut query) => {
match query.deserialize::<Record>().try_collect::<Vec<_>>().await {
Ok(records) => {
dbg!(records);
}
Err(err) => {
eprintln!("Failed to deserialize query results; ErrMessage: {}", err);
return Err(err.into());
}
}
}
Err(err) => {
eprintln!("Failed to query data from power.meters, sql: {}, ErrMessage: {}", sql, err);
return Err(err.into());
}
}
async function queryData() {
let wsRows = null;
let wsSql = null;
let sql = 'SELECT ts, current, location FROM power.meters limit 100';
try {
wsSql = await createConnect();
wsRows = await wsSql.query(sql);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log('ts: ' + row[0] + ', current: ' + row[1] + ', location: ' + row[2]);
}
}
catch (err) {
console.error(`Failed to query data from power.meters, sql: ${sql}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}
// query data, make sure the database and table are created before
var query = "SELECT ts, current, location FROM power.meters limit 100";
try
{
using (var rows = client.Query(query))
{
while (rows.Read())
{
// Add your data processing logic here
var ts = (DateTime)rows.GetValue(0);
var current = (float)rows.GetValue(1);
var location = Encoding.UTF8.GetString((byte[])rows.GetValue(2));
Console.WriteLine(
$"ts: {ts:yyyy-MM-dd HH:mm:ss.fff}, current: {current}, location: {location}");
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrCode: " + e.Code +
", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
"Failed to query data from power.meters, sql: " + query + ", ErrMessage: " + e.Message);
throw;
}
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// query data, please make sure the database and table are already created
const char *sql = "SELECT ts, current, location FROM power.meters limit 100";
WS_RES *result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
ws_errstr(result));
ws_close(taos);
return -1;
}
WS_ROW row = NULL;
int rows = 0;
int num_fields = ws_field_count(result);
const WS_FIELD *fields = ws_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = ws_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
ws_free_result(result);
// close & clean
ws_close(taos);
return 0;
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// query data, please make sure the database and table are already created
const char *sql = "SELECT ts, current, location FROM power.meters limit 100";
TAOS_RES *result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
TAOS_ROW row = NULL;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
查询数据
curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' \
--data 'SELECT ts, current, location FROM power.meters limit 100'
执行带有 reqId 的 SQL
reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
使用 reqId 有下面好处:
- 请求追踪:通过将同一个 reqId 关联到一个请求的所有相关操作,可以追踪请求在系统中的完整路径
- 性能分析:通过分析一个请求的 reqId,可以了解请求在各个服务和模块中的处理时间,从而找出性能瓶颈
- 故障诊断:当一个请求失败时,可以通过查看与该请求关联的 reqId 来找出问题发生的位置
如果用户不设置 reqId,连接器会在内部随机生成一个,但建议由显式用户设置以以更好地跟用户请求关联起来。
下面是各语言连接器设置 reqId 执行 SQL 的代码样例。
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
long reqId = 3L;
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
// Create a statement that allows specifying a request ID
AbstractStatement aStmt = (AbstractStatement) connection.createStatement()) {
try (ResultSet resultSet = aStmt.executeQuery("SELECT ts, current, location FROM power.meters limit 1", reqId)) {
Timestamp ts;
float current;
String location;
while (resultSet.next()) {
ts = resultSet.getTimestamp(1);
current = resultSet.getFloat(2);
// we recommend using the column name to get the value
location = resultSet.getString("location");
// you can check data here
System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location);
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute sql with reqId: %s, %sErrMessage: %s%n", reqId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
import taosws
conn = None
reqId = 3
host="localhost"
port=6041
try:
conn = taosws.connect(
user="root",
password="taosdata",
host=host,
port=port,
)
result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=3)
# Get data from result as list of tuple
for row in result:
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
except Exception as err:
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
import taos
conn = None
reqId = 3
host="localhost"
port=6030
try:
conn = taos.connect(host=host,
port=port,
user="root",
password="taosdata")
result = conn.query("SELECT ts, current, location FROM power.meters limit 100", reqId)
# Get data from result as list of tuple
data = result.fetch_all()
for row in data:
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
except Exception as err:
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
import taosrest
client = None
url="http://localhost:6041"
reqId = 3
try:
client = taosrest.RestClient(url=url,
user="root",
password="taosdata",
timeout=30)
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", reqId)
if result["data"]:
for row in result["data"]:
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
except Exception as err:
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
raise err
// use context to set request id
reqId := int64(3)
ctx := context.WithValue(context.Background(), "taos_req_id", reqId)
// execute query with context
querySql := "SELECT ts, current, location FROM power.meters limit 1"
rows, err := db.QueryContext(ctx, querySql)
if err != nil {
log.Fatalf("Failed to execute sql with reqId: %d, url: %s, sql: %s, ErrMessage: %s\n", reqId, taosDSN, querySql, err.Error())
}
for rows.Next() {
// Add your data processing logic here
var (
ts time.Time
current float32
location string
)
err = rows.Scan(&ts, ¤t, &location)
if err != nil {
log.Fatalf("Failed to scan data, reqId: %d, url:%s, sql: %s, ErrMessage: %s\n", reqId, taosDSN, querySql, err)
}
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
}
let req_id :u64 = 3;
match taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", req_id).await{
Ok(mut result) => {
for field in result.fields() {
println!("got field: {}", field.name());
}
let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
}
Err(err) => {
eprintln!("Failed to execute sql with reqId: {}, ErrMessage: {}", req_id, err);
return Err(err.into());
}
}
async function sqlWithReqid() {
let wsRows = null;
let wsSql = null;
let reqId = 1;
try {
wsSql = await createConnect();
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100', reqId);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log('ts: ' + row[0] + ', current: ' + row[1] + ', location: ' + row[2]);
}
}
catch (err) {
console.error(`Failed to query data from power.meters, reqId: ${reqId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}
var reqId = (long)3;
// query data
var query = "SELECT ts, current, location FROM power.meters limit 1";
try
{
// query with request id 3
using (var rows = client.Query(query, reqId))
{
while (rows.Read())
{
var ts = (DateTime)rows.GetValue(0);
var current = (float)rows.GetValue(1);
var location = Encoding.UTF8.GetString((byte[])rows.GetValue(2));
Console.WriteLine(
$"ts: {ts:yyyy-MM-dd HH:mm:ss.fff}, current: {current}, location: {location}");
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", sql: " + query + ", ErrCode: " +
e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", sql: " + query + ", ErrMessage: " +
e.Message);
throw;
}
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
const char *sql = "SELECT ts, current, location FROM power.meters limit 1";
// query data with reqid
long reqid = 3L;
WS_RES *result = ws_query_with_reqid(taos, sql, reqid);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute sql withQID: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code,
ws_errstr(result));
ws_close(taos);
return -1;
}
WS_ROW row = NULL;
int rows = 0;
int num_fields = ws_field_count(result);
const WS_FIELD *fields = ws_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = ws_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
ws_free_result(result);
// close & clean
ws_close(taos);
return 0;
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
const char *sql = "SELECT ts, current, location FROM power.meters limit 1";
// query data with reqid
long reqid = 3L;
TAOS_RES *result = taos_query_with_reqid(taos, sql, reqid);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute sql withQID: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
TAOS_ROW row = NULL;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
查询数据,指定 reqId 为 3
curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql?req_id=3' \
--data 'SELECT ts, current, location FROM power.meters limit 1'