跳到主要内容

数据同步 SQL 手册

本文档介绍用于管理 TDengine 数据同步功能的 SQL 命令,包括 Xnode 节点、同步 Task 任务以及 Job 分片的管理。

XNODE 节点管理

XNODE 节点是数据同步服务的基本执行单元,负责具体的数据传输工作。

创建节点

语法

CREATE XNODE 'url'
CREATE XNODE 'url' USER name PASS 'password';

参数说明

  • url: Xnode 节点的地址,格式为 host:port
  • 首次创建需要指定用户名和密码,用于 xnoded 连接 taosd

示例

taos> CREATE XNODE "h1:6050";
Create OK, 0 row(s) affected (0.050798s)

taos> CREATE XNODE 'x1:6050' USER root PASS 'taosdata';
Create OK, 0 row(s) affected (0.050798s)

查看节点

语法

SHOW XNODES

示例

taos> SHOW XNODES;

输出结果:

id | url     | status | create_time                 | update_time             |
===============================================================================
1 | h1:6050 | online | 2025-12-14 01:01:34.655 | 2025-12-14 01:01:34.655 |
Query OK, 1 row(s) in set (0.005518s)

排空节点

将一个节点已有任务重新分配到其他节点中执行。

语法

DRAIN XNODE id

参数说明

  • id: Xnode 节点的 ID

示例

taos> DRAIN XNODE 4;
Query OK, 0 row(s) affected (0.014246s)

删除节点

语法

DROP XNODE [FORCE] id | 'url'

参数说明

  • id: Xnode 节点的 ID
  • url: Xnode 节点的地址
  • FORCE: 强制删除节点

示例

taos> DROP XNODE 1;
Drop OK, 0 row(s) affected (0.038173s)

taos> DROP XNODE "h2:6050";
Drop OK, 0 row(s) affected (0.038593s)

TASK 任务管理

TASK 任务定义了数据同步的源端、目标端以及数据解析规则。

创建任务

语法

CREATE XNODE TASK 'name'
FROM { 'from_dns' | DATABASE 'dbname' | TOPIC 'topic' }
TO { 'to_dns' | DATABASE 'dbname' }
[ WITH task_options ]

task_options:
[ PARSER 'parser' ]
[ STATUS 'status' ]
[ VIA viaId ]
[ XNODE_ID xnodeId ]
[ REASON 'reason' ]

语法说明:task_options 各选项可同时使用,空格分隔,顺序无关

参数说明

参数说明
name任务名称
from_dns源端连接字符串(如 mqtt://...
dbname数据库名称
topicTopic 名称
to_dns目标端连接字符串(如 taos://...
parser数据解析配置(JSON 格式)
status任务状态
xnodeId任务所在的 xnode 节点 ID
viaId任务所在的 agent 的 ID
reason任务最近执行失败原因

示例

taos> CREATE XNODE TASK "t4" FROM 'kafka://localhost:9092?topics=abc&group=abcgroup' TO 'taos+ws://localhost:6041/test' WITH parser '{"model":{"name":"cc_abc","using":"cc","tags":["g"],"columns":["ts","b"]},"mutate":[{"map":{"ts":{"cast":"ts","as":"TIMESTAMP(ms)"},"b":{"cast":"a","as":"VARCHAR"},"g":{"value":"1","as":"INT"}}}]}';
Create OK, 0 row(s) affected (0.038959s)

查看任务

语法

SHOW XNODE TASKS;

示例

taos> SHOW XNODE TASKS;

输出结果:

taos> SHOW XNODE TASKS \G;
#### ************************* 1.row *************************
id: 3
name: t4
from: kafka://localhost:9092?topics=abc&group=abcgroup
to: taos+ws://localhost:6041/test
parser: {"model":{"name":"cc_abc","using":"cc","tags":["g"],"columns":["ts","b"]},"mutate":[{"map":{"ts":{"cast":"ts","as":"TIMESTAMP(ms)"},"b":{"cast":"a","as":"VARCHAR"},"g":{"value":"1","as":"INT"}}}]}
via: NULL
xnode_id: NULL
status: NULL
reason: NULL
create_time: 2025-12-29 13:48:21.058
update_time: 2025-12-29 13:48:21.058
Query OK, 1 row(s) in set (0.005281s)

启动任务

语法

START XNODE TASK id | 'name';

示例

taos> START XNODE TASK 1;
DB error: Xnode url response http code not 200 error [0x8000800C] (0.002160s)

停止任务

语法

STOP XNODE TASK id | 'name';

示例

taos> STOP XNODE TASK 1;
DB error: Xnode url response http code not 200 error [0x8000800C] (0.002047s)

修改任务

语法

ALTER XNODE TASK { id | 'name' }
[ FROM { 'from_dns' | DATABASE 'dbname' | TOPIC 'topic' } ]
[ TO { 'to_dns' | DATABASE 'dbname' } ]
[ WITH alter_options ]

alter_options:
[ PARSER 'parser' ]
[ NAME 'name' ]
[ STATUS 'status' ]
[ VIA viaId ]
[ XNODE_ID xnodeId ]
[ REASON 'reason' ]

语法说明:task_options 各选项含义与创建任务相同

示例

taos> ALTER XNODE TASK 3 FROM 'pulsar://zgc...' TO 'testdb' WITH xnode_id 33 via 333 reason 'zgc_test';
Query OK, 0 row(s) affected (0.036077s)

删除任务

语法

DROP XNODE TASK id | 'name';

示例

taos> DROP XNODE TASK 3;
Drop OK, 0 row(s) affected (0.038191s)

JOB 任务分片管理

JOB 是 TASK 任务的执行分片,支持手动和自动负载均衡。

查看 JOB 分片

语法

SHOW XNODE JOBS;

示例

taos> SHOW XNODE JOBS\G;
#### ************************* 1.row *************************
id: 1
task_id: 3
config: config_json
via: -1
xnode_id: 11
status: running
reason: NULL
create_time: 2025-12-14 02:52:31.281
update_time: 2025-12-14 02:52:31.281
Query OK, 1 row(s) in set (0.004714s)

手动负载均衡

语法

REBALANCE XNODE JOB jid WITH XNODE_ID xnodeId;

语法说明:手动负载均衡当前只支持 xnode_id 参数,必须附带 xnode id 信息。

示例

taos> REBALANCE XNODE JOB 1 WITH xnode_id 1;
Query OK, 0 row(s) affected (0.011808s)

自动负载均衡

语法

REBALANCE XNODE JOBS [ WHERE job_conditions ]

语法说明:WHERE job_conditions 可选,是用来过滤符合条件的 job 数据。不支持函数,支持 SHOW XNODE JOBS 命令中出现的所有字段。没有 WHERE 条件语句时表示所有 job 均进行自动负载均衡。

示例

taos> REBALANCE XNODE JOBS WHERE id>1;
Query OK, 0 row(s) affected (0.014246s)

taos> REBALANCE XNODE JOBS WHERE task_id=1 and (xnode_id=3 or xnode_id=4);
Query OK, 0 row(s) affected (0.007237s)

taos> REBALANCE XNODE JOBS;
Query OK, 0 row(s) affected (0.023245s)