Package taos
Expand source code
# encoding:UTF-8
from .connection import TaosConnection
# For some reason, the following is needed for VS Code (through PyLance) to
# recognize that "error" is a valid module of the "taos" package.
from .error import *
from .bind import *
from .field import *
from .cursor import *
from .result import *
from .statement import *
from .subscription import *
from .schemaless import *
try:
from .sqlalchemy import *
except:
pass
from taos._version import __version__
# Globals
threadsafety = 0
"""sqlalchemy will read this attribute"""
paramstyle = "pyformat"
"""sqlalchemy will read this attribute"""
__all__ = [
"__version__",
"IS_V3",
# functions
"connect",
"new_bind_param",
"new_bind_params",
"new_multi_binds",
"new_multi_bind",
# objects
"TaosBind",
"TaosConnection",
"TaosCursor",
"TaosResult",
"TaosRows",
"TaosRow",
"TaosStmt",
"PrecisionEnum",
"SmlPrecision",
"SmlProtocol"
]
def connect(*args, **kwargs):
# type: (..., ...) -> TaosConnection
"""Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TaosConnection(*args, **kwargs)
Sub-modules
taos.bind
taos.cinterface
taos.connection
taos.constants
-
Constants in TDengine python
taos.cursor
taos.error
-
Python exceptions
taos.field
taos.field_v3
taos.precision
taos.result
taos.schemaless
taos.sqlalchemy
taos.statement
taos.subscription
taos.timestamp
taos.tmq
Functions
def connect(*args, **kwargs)
-
Function to return a TDengine connector object
Current supporting keyword parameters: @dsn: Data source name as string @user: Username as string(optional) @password: Password as string(optional) @host: Hostname(optional) @database: Database name(optional)
@rtype: TDengineConnector
Expand source code
def connect(*args, **kwargs): # type: (..., ...) -> TaosConnection """Function to return a TDengine connector object Current supporting keyword parameters: @dsn: Data source name as string @user: Username as string(optional) @password: Password as string(optional) @host: Hostname(optional) @database: Database name(optional) @rtype: TDengineConnector """ return TaosConnection(*args, **kwargs)
def new_bind_param()
-
Expand source code
def new_bind_param(): # type: () -> TaosBind if IS_V3: return TaosMultiBind() else: return TaosBind()
def new_bind_params(size)
-
Expand source code
def new_bind_params(size): # type: (int) -> Array[TaosBind] if IS_V3: return (TaosMultiBind * size)() else: return (TaosBind * size)()
def new_multi_bind()
-
Expand source code
def new_multi_bind(): # type: () -> TaosMultiBind return TaosMultiBind()
def new_multi_binds(size)
-
Expand source code
def new_multi_binds(size): # type: (int) -> Array[TaosMultiBind] return (TaosMultiBind * size)()
Classes
class PrecisionEnum
-
Precision enums
Expand source code
class PrecisionEnum(object): """Precision enums""" Milliseconds = 0 Microseconds = 1 Nanoseconds = 2
Class variables
var Microseconds
var Milliseconds
var Nanoseconds
class SmlPrecision
-
Schemaless timestamp precision constants
Expand source code
class SmlPrecision: """Schemaless timestamp precision constants""" NOT_CONFIGURED = 0 # C.TSDB_SML_TIMESTAMP_NOT_CONFIGURED HOURS = 1 MINUTES = 2 SECONDS = 3 MILLI_SECONDS = 4 MICRO_SECONDS = 5 NANO_SECONDS = 6
Class variables
var HOURS
var MICRO_SECONDS
var MILLI_SECONDS
var MINUTES
var NANO_SECONDS
var NOT_CONFIGURED
var SECONDS
class SmlProtocol
-
Schemaless protocol constants
Expand source code
class SmlProtocol: """Schemaless protocol constants""" UNKNOWN_PROTOCOL = 0 LINE_PROTOCOL = 1 TELNET_PROTOCOL = 2 JSON_PROTOCOL = 3
Class variables
var JSON_PROTOCOL
var LINE_PROTOCOL
var TELNET_PROTOCOL
var UNKNOWN_PROTOCOL
class TaosBind (*args, **kwargs)
-
Structure base class
Expand source code
class TaosBind(ctypes.Structure): _fields_ = [ ("buffer_type", c_int), ("buffer", c_void_p), ("buffer_length", c_size_t), ("length", POINTER(c_size_t)), ("is_null", POINTER(c_int)), ("is_unsigned", c_int), ("error", POINTER(c_int)), ("u", c_int64), ("allocated", c_int), ] def bool(self, value): self.buffer_type = FieldType.C_BOOL if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_bool(value)), c_void_p) self.buffer_length = sizeof(c_bool) def tinyint(self, value): self.buffer_type = FieldType.C_TINYINT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_int8(value)), c_void_p) self.buffer_length = sizeof(c_int8) def smallint(self, value): self.buffer_type = FieldType.C_SMALLINT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_int16(value)), c_void_p) self.buffer_length = sizeof(c_int16) def int(self, value): self.buffer_type = FieldType.C_INT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_int32(value)), c_void_p) self.buffer_length = sizeof(c_int32) def bigint(self, value): self.buffer_type = FieldType.C_BIGINT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_int64(value)), c_void_p) self.buffer_length = sizeof(c_int64) def float(self, value): self.buffer_type = FieldType.C_FLOAT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_float(value)), c_void_p) self.buffer_length = sizeof(c_float) def double(self, value): self.buffer_type = FieldType.C_DOUBLE if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_double(value)), c_void_p) self.buffer_length = sizeof(c_double) def binary(self, value): buffer = None length = 0 self.buffer_type = FieldType.C_BINARY if value is None: self.is_null = pointer(c_int(1)) else: if isinstance(value, str): bytes = value.encode("utf-8") buffer = create_string_buffer(bytes) length = len(bytes) else: buffer = value length = len(value) self.buffer = cast(buffer, c_void_p) self.buffer_length = length self.length = pointer(c_size_t(self.buffer_length)) def timestamp(self, value, precision=PrecisionEnum.Milliseconds): self.buffer_type = FieldType.C_TIMESTAMP if value is None: self.is_null = pointer(c_int(1)) else: if type(value) is datetime: if precision == PrecisionEnum.Milliseconds: ts = int(round((value - _datetime_epoch).total_seconds() * 1000)) elif precision == PrecisionEnum.Microseconds: ts = int(round((value - _datetime_epoch).total_seconds() * 10000000)) else: raise PrecisionError("datetime do not support nanosecond precision") elif type(value) is float: if precision == PrecisionEnum.Milliseconds: ts = int(round(value * 1000)) elif precision == PrecisionEnum.Microseconds: ts = int(round(value * 10000000)) else: raise PrecisionError("time float do not support nanosecond precision") elif isinstance(value, int) and not isinstance(value, bool): ts = value elif isinstance(value, str): value = datetime.fromisoformat(value) if precision == PrecisionEnum.Milliseconds: ts = int(round(value * 1000)) elif precision == PrecisionEnum.Microseconds: ts = int(round(value * 10000000)) else: raise PrecisionError("datetime do not support nanosecond precision") self.buffer = cast(pointer(c_int64(ts)), c_void_p) self.buffer_length = sizeof(c_int64) def nchar(self, value): buffer = None length = 0 self.buffer_type = FieldType.C_NCHAR if value is None: self.is_null = pointer(c_int(1)) else: if isinstance(value, str): bytes = value.encode("utf-8") buffer = create_string_buffer(bytes) length = len(bytes) else: buffer = value length = len(value) self.buffer = cast(buffer, c_void_p) self.buffer_length = length self.length = pointer(c_size_t(self.buffer_length)) def json(self, value): buffer = None length = 0 self.buffer_type = FieldType.C_JSON if value is None: self.is_null = pointer(c_int(1)) else: if isinstance(value, str): bytes = value.encode("utf-8") buffer = create_string_buffer(bytes) length = len(bytes) else: buffer = value length = len(value) self.buffer = cast(buffer, c_void_p) self.buffer_length = length self.length = pointer(c_size_t(self.buffer_length)) def tinyint_unsigned(self, value): self.buffer_type = FieldType.C_TINYINT_UNSIGNED if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_uint8(value)), c_void_p) self.buffer_length = sizeof(c_uint8) def smallint_unsigned(self, value): self.buffer_type = FieldType.C_SMALLINT_UNSIGNED if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_uint16(value)), c_void_p) self.buffer_length = sizeof(c_uint16) def int_unsigned(self, value): self.buffer_type = FieldType.C_INT_UNSIGNED if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_uint32(value)), c_void_p) self.buffer_length = sizeof(c_uint32) def bigint_unsigned(self, value): self.buffer_type = FieldType.C_BIGINT_UNSIGNED if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_uint64(value)), c_void_p) self.buffer_length = sizeof(c_uint64)
Ancestors
- _ctypes.Structure
- _ctypes._CData
Instance variables
var allocated
-
Structure/Union member
var buffer
-
Structure/Union member
var buffer_length
-
Structure/Union member
var buffer_type
-
Structure/Union member
var error
-
Structure/Union member
var is_null
-
Structure/Union member
var is_unsigned
-
Structure/Union member
var length
-
Structure/Union member
var u
-
Structure/Union member
Methods
def bigint(self, value)
-
Expand source code
def bigint(self, value): self.buffer_type = FieldType.C_BIGINT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_int64(value)), c_void_p) self.buffer_length = sizeof(c_int64)
def bigint_unsigned(self, value)
-
Expand source code
def bigint_unsigned(self, value): self.buffer_type = FieldType.C_BIGINT_UNSIGNED if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_uint64(value)), c_void_p) self.buffer_length = sizeof(c_uint64)
def binary(self, value)
-
Expand source code
def binary(self, value): buffer = None length = 0 self.buffer_type = FieldType.C_BINARY if value is None: self.is_null = pointer(c_int(1)) else: if isinstance(value, str): bytes = value.encode("utf-8") buffer = create_string_buffer(bytes) length = len(bytes) else: buffer = value length = len(value) self.buffer = cast(buffer, c_void_p) self.buffer_length = length self.length = pointer(c_size_t(self.buffer_length))
def bool(self, value)
-
Expand source code
def bool(self, value): self.buffer_type = FieldType.C_BOOL if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_bool(value)), c_void_p) self.buffer_length = sizeof(c_bool)
def double(self, value)
-
Expand source code
def double(self, value): self.buffer_type = FieldType.C_DOUBLE if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_double(value)), c_void_p) self.buffer_length = sizeof(c_double)
def float(self, value)
-
Expand source code
def float(self, value): self.buffer_type = FieldType.C_FLOAT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_float(value)), c_void_p) self.buffer_length = sizeof(c_float)
def int(self, value)
-
Expand source code
def int(self, value): self.buffer_type = FieldType.C_INT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_int32(value)), c_void_p) self.buffer_length = sizeof(c_int32)
def int_unsigned(self, value)
-
Expand source code
def int_unsigned(self, value): self.buffer_type = FieldType.C_INT_UNSIGNED if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_uint32(value)), c_void_p) self.buffer_length = sizeof(c_uint32)
def json(self, value)
-
Expand source code
def json(self, value): buffer = None length = 0 self.buffer_type = FieldType.C_JSON if value is None: self.is_null = pointer(c_int(1)) else: if isinstance(value, str): bytes = value.encode("utf-8") buffer = create_string_buffer(bytes) length = len(bytes) else: buffer = value length = len(value) self.buffer = cast(buffer, c_void_p) self.buffer_length = length self.length = pointer(c_size_t(self.buffer_length))
def nchar(self, value)
-
Expand source code
def nchar(self, value): buffer = None length = 0 self.buffer_type = FieldType.C_NCHAR if value is None: self.is_null = pointer(c_int(1)) else: if isinstance(value, str): bytes = value.encode("utf-8") buffer = create_string_buffer(bytes) length = len(bytes) else: buffer = value length = len(value) self.buffer = cast(buffer, c_void_p) self.buffer_length = length self.length = pointer(c_size_t(self.buffer_length))
def smallint(self, value)
-
Expand source code
def smallint(self, value): self.buffer_type = FieldType.C_SMALLINT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_int16(value)), c_void_p) self.buffer_length = sizeof(c_int16)
def smallint_unsigned(self, value)
-
Expand source code
def smallint_unsigned(self, value): self.buffer_type = FieldType.C_SMALLINT_UNSIGNED if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_uint16(value)), c_void_p) self.buffer_length = sizeof(c_uint16)
def timestamp(self, value, precision=0)
-
Expand source code
def timestamp(self, value, precision=PrecisionEnum.Milliseconds): self.buffer_type = FieldType.C_TIMESTAMP if value is None: self.is_null = pointer(c_int(1)) else: if type(value) is datetime: if precision == PrecisionEnum.Milliseconds: ts = int(round((value - _datetime_epoch).total_seconds() * 1000)) elif precision == PrecisionEnum.Microseconds: ts = int(round((value - _datetime_epoch).total_seconds() * 10000000)) else: raise PrecisionError("datetime do not support nanosecond precision") elif type(value) is float: if precision == PrecisionEnum.Milliseconds: ts = int(round(value * 1000)) elif precision == PrecisionEnum.Microseconds: ts = int(round(value * 10000000)) else: raise PrecisionError("time float do not support nanosecond precision") elif isinstance(value, int) and not isinstance(value, bool): ts = value elif isinstance(value, str): value = datetime.fromisoformat(value) if precision == PrecisionEnum.Milliseconds: ts = int(round(value * 1000)) elif precision == PrecisionEnum.Microseconds: ts = int(round(value * 10000000)) else: raise PrecisionError("datetime do not support nanosecond precision") self.buffer = cast(pointer(c_int64(ts)), c_void_p) self.buffer_length = sizeof(c_int64)
def tinyint(self, value)
-
Expand source code
def tinyint(self, value): self.buffer_type = FieldType.C_TINYINT if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_int8(value)), c_void_p) self.buffer_length = sizeof(c_int8)
def tinyint_unsigned(self, value)
-
Expand source code
def tinyint_unsigned(self, value): self.buffer_type = FieldType.C_TINYINT_UNSIGNED if value is None: self.is_null = pointer(c_int(1)) else: self.buffer = cast(pointer(c_uint8(value)), c_void_p) self.buffer_length = sizeof(c_uint8)
class TaosConnection (*args, **kwargs)
-
TDengine connection object
Expand source code
class TaosConnection(object): """TDengine connection object""" def __init__(self, *args, **kwargs): self._conn = None self._host = None self._user = "root" self._password = "taosdata" self._database = None self._port = 0 self._config = None self._tz = None self._init_config(**kwargs) self._chandle = CTaosInterface(self._config, self._tz) self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port) def _init_config(self, **kwargs): # host if "host" in kwargs: self._host = kwargs["host"] # user if "user" in kwargs: self._user = kwargs["user"] # password if "password" in kwargs: self._password = kwargs["password"] # database if "database" in kwargs: self._database = kwargs["database"] # port if "port" in kwargs: self._port = kwargs["port"] # config if "config" in kwargs: self._config = kwargs["config"] # timezone if "timezone" in kwargs: self._tz = kwargs["timezone"] def close(self): """Close current connection.""" if self._conn: taos_close(self._conn) self._conn = None @property def client_info(self): # type: () -> str return taos_get_client_info() @property def server_info(self): # type: () -> str return taos_get_server_info(self._conn) def select_db(self, database): # type: (str) -> None taos_select_db(self._conn, database) def execute(self, sql): # type: (str) -> int """Simplely execute sql ignoring the results""" return self.query(sql).affected_rows def query(self, sql): # type: (str) -> TaosResult result = taos_query(self._conn, sql) return TaosResult(result, True, self) def query_a(self, sql, callback, param): # type: (str, async_query_callback_type, c_void_p) -> None """Asynchronously query a sql with callback function""" taos_query_a(self._conn, sql, callback, param) def subscribe(self, restart, topic, sql, interval, callback=None, param=None): # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription """Create a subscription.""" if self._conn is None: return None sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param) if not sub: errno = taos_errno(c_void_p(None)) msg = taos_errstr(c_void_p(None)) raise Error(msg, errno) return TaosSubscription(sub, callback != None) def statement(self, sql=None): # type: (str | None) -> TaosStmt if self._conn is None: return None stmt = taos_stmt_init(self._conn) if sql != None: taos_stmt_prepare(stmt, sql) return TaosStmt(stmt) def load_table_info(self, tables): # type: (str) -> None taos_load_table_info(self._conn, tables) def schemaless_insert(self, lines, protocol, precision): # type: (list[str], SmlProtocol, SmlPrecision) -> int """ 1.Line protocol and schemaless support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532', ] conn.schemaless_insert(lines, 0, "ns") ``` 2.OpenTSDB telnet style API format support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"', ] conn.schemaless_insert(lines, 1, None) ``` 3.OpenTSDB HTTP JSON format support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") payload = [''' { "metric": "cpu_load_0", "timestamp": 1626006833610123, "value": 55.5, "tags": { "host": "ubuntu", "interface": "eth0", "Id": "tb0" } } '''] conn.schemaless_insert(lines, 2, None) ``` """ return taos_schemaless_insert(self._conn, lines, protocol, precision) def cursor(self): # type: () -> TaosCursor """Return a new Cursor object using the connection.""" return TaosCursor(self) def commit(self): """Commit any pending transaction to the database. Since TDengine do not support transactions, the implement is void functionality. """ pass def rollback(self): """Void functionality""" pass def clear_result_set(self): """Clear unused result set on this connection.""" pass def __del__(self): self.close()
Instance variables
var client_info
-
Expand source code
@property def client_info(self): # type: () -> str return taos_get_client_info()
var server_info
-
Expand source code
@property def server_info(self): # type: () -> str return taos_get_server_info(self._conn)
Methods
def clear_result_set(self)
-
Clear unused result set on this connection.
Expand source code
def clear_result_set(self): """Clear unused result set on this connection.""" pass
def close(self)
-
Close current connection.
Expand source code
def close(self): """Close current connection.""" if self._conn: taos_close(self._conn) self._conn = None
def commit(self)
-
Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
Expand source code
def commit(self): """Commit any pending transaction to the database. Since TDengine do not support transactions, the implement is void functionality. """ pass
def cursor(self)
-
Return a new Cursor object using the connection.
Expand source code
def cursor(self): # type: () -> TaosCursor """Return a new Cursor object using the connection.""" return TaosCursor(self)
def execute(self, sql)
-
Simplely execute sql ignoring the results
Expand source code
def execute(self, sql): # type: (str) -> int """Simplely execute sql ignoring the results""" return self.query(sql).affected_rows
def load_table_info(self, tables)
-
Expand source code
def load_table_info(self, tables): # type: (str) -> None taos_load_table_info(self._conn, tables)
def query(self, sql)
-
Expand source code
def query(self, sql): # type: (str) -> TaosResult result = taos_query(self._conn, sql) return TaosResult(result, True, self)
def query_a(self, sql, callback, param)
-
Asynchronously query a sql with callback function
Expand source code
def query_a(self, sql, callback, param): # type: (str, async_query_callback_type, c_void_p) -> None """Asynchronously query a sql with callback function""" taos_query_a(self._conn, sql, callback, param)
def rollback(self)
-
Void functionality
Expand source code
def rollback(self): """Void functionality""" pass
def schemaless_insert(self, lines, protocol, precision)
-
1.Line protocol and schemaless support
Example
import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532', ] conn.schemaless_insert(lines, 0, "ns")
2.OpenTSDB telnet style API format support
Example
import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"', ] conn.schemaless_insert(lines, 1, None)
3.OpenTSDB HTTP JSON format support
Example
import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") payload = [''' { "metric": "cpu_load_0", "timestamp": 1626006833610123, "value": 55.5, "tags": { "host": "ubuntu", "interface": "eth0", "Id": "tb0" } } '''] conn.schemaless_insert(lines, 2, None)
Expand source code
def schemaless_insert(self, lines, protocol, precision): # type: (list[str], SmlProtocol, SmlPrecision) -> int """ 1.Line protocol and schemaless support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532', ] conn.schemaless_insert(lines, 0, "ns") ``` 2.OpenTSDB telnet style API format support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"', ] conn.schemaless_insert(lines, 1, None) ``` 3.OpenTSDB HTTP JSON format support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") payload = [''' { "metric": "cpu_load_0", "timestamp": 1626006833610123, "value": 55.5, "tags": { "host": "ubuntu", "interface": "eth0", "Id": "tb0" } } '''] conn.schemaless_insert(lines, 2, None) ``` """ return taos_schemaless_insert(self._conn, lines, protocol, precision)
def select_db(self, database)
-
Expand source code
def select_db(self, database): # type: (str) -> None taos_select_db(self._conn, database)
def statement(self, sql=None)
-
Expand source code
def statement(self, sql=None): # type: (str | None) -> TaosStmt if self._conn is None: return None stmt = taos_stmt_init(self._conn) if sql != None: taos_stmt_prepare(stmt, sql) return TaosStmt(stmt)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None)
-
Create a subscription.
Expand source code
def subscribe(self, restart, topic, sql, interval, callback=None, param=None): # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription """Create a subscription.""" if self._conn is None: return None sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param) if not sub: errno = taos_errno(c_void_p(None)) msg = taos_errstr(c_void_p(None)) raise Error(msg, errno) return TaosSubscription(sub, callback != None)
class TaosCursor (connection=None)
-
Database cursor which is used to manage the context of a fetch operation.
Attributes
.description: Read-only attribute consists of 7-item sequences:
> name (mandatory) > type_code (mandatory) > display_size > internal_size > precision > scale > null_ok This attribute will be None for operations that do not return rows or if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last .execute*() produced (for DQL statements like SELECT) or affected
Expand source code
class TaosCursor(object): """Database cursor which is used to manage the context of a fetch operation. Attributes: .description: Read-only attribute consists of 7-item sequences: > name (mandatory) > type_code (mandatory) > display_size > internal_size > precision > scale > null_ok This attribute will be None for operations that do not return rows or if the cursor has not had an operation invoked via the .execute*() method yet. .rowcount:This read-only attribute specifies the number of rows that the last .execute*() produced (for DQL statements like SELECT) or affected """ def __init__(self, connection=None): self._description = [] self._rowcount = -1 self._connection = None self._result = None self._fields = None self._block = None self._block_rows = -1 self._block_iter = 0 self._affected_rows = 0 self._logfile = "" if connection is not None: self._connection = connection def __iter__(self): return self def __next__(self): return self._taos_next() def next(self): return self._taos_next() def _taos_next(self): if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetch iterator") if self._block_rows <= self._block_iter: block, self._block_rows = taos_fetch_row( self._result, self._fields) if self._block_rows == 0: raise StopIteration self._block = list(map(tuple, zip(*block))) self._block_iter = 0 data = self._block[self._block_iter] self._block_iter += 1 return data @property def description(self): """Return the description of the object.""" return self._description @property def rowcount(self): """ For INSERT statement, rowcount is assigned immediately after execute the statement. For SELECT statement, rowcount will not get correct value until fetched all data. """ return self._rowcount @property def affected_rows(self): """Return the rowcount of insertion""" return self._affected_rows def callproc(self, procname, *args): """Call a stored database procedure with the given name. Void functionality since no stored procedures. """ pass def log(self, logfile): self._logfile = logfile def close(self): """Close the cursor.""" if self._connection is None: return False self._reset_result() self._connection = None return True def execute(self, operation, params=None): # type: (str, Any) -> int """Prepare and execute a database operation (query or command).""" if not operation: return None if not self._connection: # TODO : change the exception raised here raise ProgrammingError("Cursor is not connected") self._reset_result() stmt = operation if params is not None: pass # global querySeqNum # querySeqNum += 1 # localSeqNum = querySeqNum # avoid race condition # print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt))) self._result = taos_query(self._connection._conn, stmt) # print(" << Query ({}) Exec Done".format(localSeqNum)) if self._logfile: with open(self._logfile, "a", encoding="utf-8") as logfile: logfile.write("%s;\n" % operation) if taos_field_count(self._result) == 0: affected_rows = taos_affected_rows(self._result) self._affected_rows = affected_rows self._rowcount = affected_rows return affected_rows else: self._fields = taos_fetch_fields(self._result) return self._handle_result() def executemany(self, operation, seq_of_parameters): """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.""" self.execute(operation) pass def fetchone(self): """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.""" pass def fetchmany(self): pass def istype(self, col, dataType): if dataType.upper() == "BOOL": if self._description[col][1] == FieldType.C_BOOL: return True if dataType.upper() == "TINYINT": if self._description[col][1] == FieldType.C_TINYINT: return True if dataType.upper() == "TINYINT UNSIGNED": if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED: return True if dataType.upper() == "SMALLINT": if self._description[col][1] == FieldType.C_SMALLINT: return True if dataType.upper() == "SMALLINT UNSIGNED": if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED: return True if dataType.upper() == "INT": if self._description[col][1] == FieldType.C_INT: return True if dataType.upper() == "INT UNSIGNED": if self._description[col][1] == FieldType.C_INT_UNSIGNED: return True if dataType.upper() == "BIGINT": if self._description[col][1] == FieldType.C_BIGINT: return True if dataType.upper() == "BIGINT UNSIGNED": if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED: return True if dataType.upper() == "FLOAT": if self._description[col][1] == FieldType.C_FLOAT: return True if dataType.upper() == "DOUBLE": if self._description[col][1] == FieldType.C_DOUBLE: return True if dataType.upper() == "BINARY": if self._description[col][1] == FieldType.C_BINARY: return True if dataType.upper() == "TIMESTAMP": if self._description[col][1] == FieldType.C_TIMESTAMP: return True if dataType.upper() == "NCHAR": if self._description[col][1] == FieldType.C_NCHAR: return True if dataType.upper() == "JSON": if self._description[col][1] == FieldType.C_JSON: return True return False def fetchall_row(self): """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.""" if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetchall") buffer = [[] for i in range(len(self._fields))] self._rowcount = 0 while True: block, num_of_rows = taos_fetch_row(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_rows == 0: break self._rowcount += num_of_rows for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer))) def fetchall(self): if self._result is None: raise OperationalError("Invalid use of fetchall") fields = self._fields if self._fields is not None else taos_fetch_fields( self._result) buffer = [[] for i in range(len(fields))] self._rowcount = 0 while True: block, num_of_rows = taos_fetch_block(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_rows == 0: break self._rowcount += num_of_rows for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer))) def stop_query(self): if self._result is not None: taos_stop_query(self._result) def nextset(self): """ """ pass def setinputsize(self, sizes): pass def setutputsize(self, size, column=None): pass def _reset_result(self): """Reset the result to unused version.""" self._description = [] self._rowcount = -1 if self._result is not None: taos_free_result(self._result) self._result = None self._fields = None self._block = None self._block_rows = -1 self._block_iter = 0 self._affected_rows = 0 def _handle_result(self): """Handle the return result from query.""" self._description = [] for ele in self._fields: self._description.append( (ele["name"], ele["type"], None, None, None, None, False)) return self._result def __del__(self): self.close()
Instance variables
var affected_rows
-
Return the rowcount of insertion
Expand source code
@property def affected_rows(self): """Return the rowcount of insertion""" return self._affected_rows
var description
-
Return the description of the object.
Expand source code
@property def description(self): """Return the description of the object.""" return self._description
var rowcount
-
For INSERT statement, rowcount is assigned immediately after execute the statement. For SELECT statement, rowcount will not get correct value until fetched all data.
Expand source code
@property def rowcount(self): """ For INSERT statement, rowcount is assigned immediately after execute the statement. For SELECT statement, rowcount will not get correct value until fetched all data. """ return self._rowcount
Methods
def callproc(self, procname, *args)
-
Call a stored database procedure with the given name.
Void functionality since no stored procedures.
Expand source code
def callproc(self, procname, *args): """Call a stored database procedure with the given name. Void functionality since no stored procedures. """ pass
def close(self)
-
Close the cursor.
Expand source code
def close(self): """Close the cursor.""" if self._connection is None: return False self._reset_result() self._connection = None return True
def execute(self, operation, params=None)
-
Prepare and execute a database operation (query or command).
Expand source code
def execute(self, operation, params=None): # type: (str, Any) -> int """Prepare and execute a database operation (query or command).""" if not operation: return None if not self._connection: # TODO : change the exception raised here raise ProgrammingError("Cursor is not connected") self._reset_result() stmt = operation if params is not None: pass # global querySeqNum # querySeqNum += 1 # localSeqNum = querySeqNum # avoid race condition # print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt))) self._result = taos_query(self._connection._conn, stmt) # print(" << Query ({}) Exec Done".format(localSeqNum)) if self._logfile: with open(self._logfile, "a", encoding="utf-8") as logfile: logfile.write("%s;\n" % operation) if taos_field_count(self._result) == 0: affected_rows = taos_affected_rows(self._result) self._affected_rows = affected_rows self._rowcount = affected_rows return affected_rows else: self._fields = taos_fetch_fields(self._result) return self._handle_result()
def executemany(self, operation, seq_of_parameters)
-
Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
Expand source code
def executemany(self, operation, seq_of_parameters): """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.""" self.execute(operation) pass
def fetchall(self)
-
Expand source code
def fetchall(self): if self._result is None: raise OperationalError("Invalid use of fetchall") fields = self._fields if self._fields is not None else taos_fetch_fields( self._result) buffer = [[] for i in range(len(fields))] self._rowcount = 0 while True: block, num_of_rows = taos_fetch_block(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_rows == 0: break self._rowcount += num_of_rows for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer)))
def fetchall_row(self)
-
Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
Expand source code
def fetchall_row(self): """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.""" if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetchall") buffer = [[] for i in range(len(self._fields))] self._rowcount = 0 while True: block, num_of_rows = taos_fetch_row(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_rows == 0: break self._rowcount += num_of_rows for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer)))
def fetchmany(self)
-
Expand source code
def fetchmany(self): pass
def fetchone(self)
-
Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
Expand source code
def fetchone(self): """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.""" pass
def istype(self, col, dataType)
-
Expand source code
def istype(self, col, dataType): if dataType.upper() == "BOOL": if self._description[col][1] == FieldType.C_BOOL: return True if dataType.upper() == "TINYINT": if self._description[col][1] == FieldType.C_TINYINT: return True if dataType.upper() == "TINYINT UNSIGNED": if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED: return True if dataType.upper() == "SMALLINT": if self._description[col][1] == FieldType.C_SMALLINT: return True if dataType.upper() == "SMALLINT UNSIGNED": if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED: return True if dataType.upper() == "INT": if self._description[col][1] == FieldType.C_INT: return True if dataType.upper() == "INT UNSIGNED": if self._description[col][1] == FieldType.C_INT_UNSIGNED: return True if dataType.upper() == "BIGINT": if self._description[col][1] == FieldType.C_BIGINT: return True if dataType.upper() == "BIGINT UNSIGNED": if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED: return True if dataType.upper() == "FLOAT": if self._description[col][1] == FieldType.C_FLOAT: return True if dataType.upper() == "DOUBLE": if self._description[col][1] == FieldType.C_DOUBLE: return True if dataType.upper() == "BINARY": if self._description[col][1] == FieldType.C_BINARY: return True if dataType.upper() == "TIMESTAMP": if self._description[col][1] == FieldType.C_TIMESTAMP: return True if dataType.upper() == "NCHAR": if self._description[col][1] == FieldType.C_NCHAR: return True if dataType.upper() == "JSON": if self._description[col][1] == FieldType.C_JSON: return True return False
def log(self, logfile)
-
Expand source code
def log(self, logfile): self._logfile = logfile
def next(self)
-
Expand source code
def next(self): return self._taos_next()
def nextset(self)
-
Expand source code
def nextset(self): """ """ pass
def setinputsize(self, sizes)
-
Expand source code
def setinputsize(self, sizes): pass
def setutputsize(self, size, column=None)
-
Expand source code
def setutputsize(self, size, column=None): pass
def stop_query(self)
-
Expand source code
def stop_query(self): if self._result is not None: taos_stop_query(self._result)
class TaosResult (result, close_after=False, conn=None)
-
TDengine result interface
Expand source code
class TaosResult(object): """TDengine result interface""" def __init__(self, result, close_after=False, conn=None): # type: (c_void_p, bool, TaosConnection) -> TaosResult # to make the __del__ order right self._conn = conn self._close_after = close_after if isinstance(result, c_void_p): self._result = result else: self._result = c_void_p(result) self._fields = None self._field_count = None self._precision = None self._block = None self._block_length = None self._row_count = 0 def __iter__(self): return self def __next__(self): return self._next_row() def next(self): # fetch next row return self._next_row() def _next_row(self): if self._result is None or self.fields is None: raise OperationalError("Invalid use of fetch iterator") if self._block is None or self._block_iter >= self._block_length: self._block, self._block_length = self.fetch_block() self._block_iter = 0 # self._row_count += self._block_length raw = self._block[self._block_iter] self._block_iter += 1 self._row_count += 1 return raw @property def fields(self): """fields definitions of the current result""" if self._result is None: raise ResultError("no result object") if self._fields is None: self._fields = taos_fetch_fields(self._result) return self._fields @property def field_count(self): """Field count of the current result, eq to taos_field_count(result)""" return self.fields.count @property def row_count(self): """Return the rowcount of the object""" return self._row_count @property def precision(self): if self._precision is None: self._precision = taos_result_precision(self._result) return self._precision @property def affected_rows(self): return taos_affected_rows(self._result) # @property def field_lengths(self): return taos_fetch_lengths(self._result, self.field_count) def rows_iter(self, num_of_rows=None): return TaosRows(self, num_of_rows) def blocks_iter(self): return TaosBlocks(self) def fetch_block(self): if self._result is None: raise OperationalError("Invalid use of fetch iterator") blocks, length = taos_fetch_block(self._result) if length == 0: raise StopIteration return list(map(tuple, zip(*blocks))), length def fetch_all(self): if self._result is None: raise OperationalError("Invalid use of fetchall") if self._fields is None: self._fields = taos_fetch_fields(self._result) buffer = [[] for i in range(len(self._fields))] self._row_count = 0 while True: block, num_of_fields = taos_fetch_block(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_fields == 0: break self._row_count += num_of_fields for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer))) def fetch_all_into_dict(self): """Fetch all rows and convert it to dict""" names = [field.name for field in self.fields] rows = self.fetch_all() return list(dict(zip(names, row)) for row in rows) def fetch_rows_a(self, callback, param): taos_fetch_rows_a(self._result, callback, param) def stop_query(self): return taos_stop_query(self._result) def get_topic_name(self): return tmq_get_topic_name(self._result) def get_vgroup_id(self): return tmq_get_vgroup_id(self._result) def get_table_name(self): return tmq_get_table_name(self._result) def get_db_name(self): return tmq_get_db_name(self._result) def errno(self): """**DO NOT** use this directly unless you know what you are doing""" return taos_errno(self._result) def errstr(self): return taos_errstr(self._result) def check_error(self, errno=None, close=True): if errno is None: errno = self.errno() if errno != 0: msg = self.errstr() self.close() raise OperationalError(msg, errno) def close(self): """free result object.""" if self._result is not None and self._close_after: taos_free_result(self._result) self._result = None self._fields = None self._field_count = None self._field_lengths = None def __del__(self): self.close()
Instance variables
var affected_rows
-
Expand source code
@property def affected_rows(self): return taos_affected_rows(self._result)
var field_count
-
Field count of the current result, eq to taos_field_count(result)
Expand source code
@property def field_count(self): """Field count of the current result, eq to taos_field_count(result)""" return self.fields.count
var fields
-
fields definitions of the current result
Expand source code
@property def fields(self): """fields definitions of the current result""" if self._result is None: raise ResultError("no result object") if self._fields is None: self._fields = taos_fetch_fields(self._result) return self._fields
var precision
-
Expand source code
@property def precision(self): if self._precision is None: self._precision = taos_result_precision(self._result) return self._precision
var row_count
-
Return the rowcount of the object
Expand source code
@property def row_count(self): """Return the rowcount of the object""" return self._row_count
Methods
def blocks_iter(self)
-
Expand source code
def blocks_iter(self): return TaosBlocks(self)
def check_error(self, errno=None, close=True)
-
Expand source code
def check_error(self, errno=None, close=True): if errno is None: errno = self.errno() if errno != 0: msg = self.errstr() self.close() raise OperationalError(msg, errno)
def close(self)
-
free result object.
Expand source code
def close(self): """free result object.""" if self._result is not None and self._close_after: taos_free_result(self._result) self._result = None self._fields = None self._field_count = None self._field_lengths = None
def errno(self)
-
DO NOT use this directly unless you know what you are doing
Expand source code
def errno(self): """**DO NOT** use this directly unless you know what you are doing""" return taos_errno(self._result)
def errstr(self)
-
Expand source code
def errstr(self): return taos_errstr(self._result)
def fetch_all(self)
-
Expand source code
def fetch_all(self): if self._result is None: raise OperationalError("Invalid use of fetchall") if self._fields is None: self._fields = taos_fetch_fields(self._result) buffer = [[] for i in range(len(self._fields))] self._row_count = 0 while True: block, num_of_fields = taos_fetch_block(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_fields == 0: break self._row_count += num_of_fields for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer)))
def fetch_all_into_dict(self)
-
Fetch all rows and convert it to dict
Expand source code
def fetch_all_into_dict(self): """Fetch all rows and convert it to dict""" names = [field.name for field in self.fields] rows = self.fetch_all() return list(dict(zip(names, row)) for row in rows)
def fetch_block(self)
-
Expand source code
def fetch_block(self): if self._result is None: raise OperationalError("Invalid use of fetch iterator") blocks, length = taos_fetch_block(self._result) if length == 0: raise StopIteration return list(map(tuple, zip(*blocks))), length
def fetch_rows_a(self, callback, param)
-
Expand source code
def fetch_rows_a(self, callback, param): taos_fetch_rows_a(self._result, callback, param)
def field_lengths(self)
-
Expand source code
def field_lengths(self): return taos_fetch_lengths(self._result, self.field_count)
def get_db_name(self)
-
Expand source code
def get_db_name(self): return tmq_get_db_name(self._result)
def get_table_name(self)
-
Expand source code
def get_table_name(self): return tmq_get_table_name(self._result)
def get_topic_name(self)
-
Expand source code
def get_topic_name(self): return tmq_get_topic_name(self._result)
def get_vgroup_id(self)
-
Expand source code
def get_vgroup_id(self): return tmq_get_vgroup_id(self._result)
def next(self)
-
Expand source code
def next(self): # fetch next row return self._next_row()
def rows_iter(self, num_of_rows=None)
-
Expand source code
def rows_iter(self, num_of_rows=None): return TaosRows(self, num_of_rows)
def stop_query(self)
-
Expand source code
def stop_query(self): return taos_stop_query(self._result)
class TaosRow (result, row)
-
Expand source code
class TaosRow: def __init__(self, result, row): self._result = result self._row = row def __str__(self): return taos_print_row(self._row, self._result.fields, self._result.field_count) def __call__(self): return self.as_tuple() def _astuple(self): return self.as_tuple() def __iter__(self): return self.as_tuple() def as_ptr(self): return self._row def as_tuple(self): precision = self._result.precision field_count = self._result.field_count blocks = [None] * field_count fields = self._result.fields field_lens = self._result.field_lengths() for i in range(field_count): data = ctypes.cast(self._row, ctypes.POINTER(ctypes.c_void_p))[i] if fields[i].type not in CONVERT_FUNC: raise DatabaseError("Invalid data type returned from database") if data is None: blocks[i] = None else: blocks[i] = CONVERT_FUNC[fields[i].type](data, [False], 1, field_lens[i], precision)[0] return tuple(blocks) def as_dict(self): values = self.as_tuple() names = self._result.fields dict(zip(names, values))
Methods
def as_dict(self)
-
Expand source code
def as_dict(self): values = self.as_tuple() names = self._result.fields dict(zip(names, values))
def as_ptr(self)
-
Expand source code
def as_ptr(self): return self._row
def as_tuple(self)
-
Expand source code
def as_tuple(self): precision = self._result.precision field_count = self._result.field_count blocks = [None] * field_count fields = self._result.fields field_lens = self._result.field_lengths() for i in range(field_count): data = ctypes.cast(self._row, ctypes.POINTER(ctypes.c_void_p))[i] if fields[i].type not in CONVERT_FUNC: raise DatabaseError("Invalid data type returned from database") if data is None: blocks[i] = None else: blocks[i] = CONVERT_FUNC[fields[i].type](data, [False], 1, field_lens[i], precision)[0] return tuple(blocks)
class TaosRows (result, num_of_rows=None)
-
TDengine result rows iterator
Expand source code
class TaosRows: """TDengine result rows iterator""" def __init__(self, result, num_of_rows=None): self._result = result self._num_of_rows = num_of_rows def __iter__(self): return self def __next__(self): return self._next_row() def next(self): return self._next_row() def _next_row(self): if self._result is None: raise OperationalError("Invalid use of fetch iterator") if self._num_of_rows is not None and self._num_of_rows <= self._result._row_count: raise StopIteration row = taos_fetch_row_raw(self._result._result) if not row: raise StopIteration self._result._row_count += 1 return TaosRow(self._result, row) @property def row_count(self): """Return the rowcount of the object""" return self._result._row_count
Instance variables
var row_count
-
Return the rowcount of the object
Expand source code
@property def row_count(self): """Return the rowcount of the object""" return self._result._row_count
Methods
def next(self)
-
Expand source code
def next(self): return self._next_row()
class TaosStmt (stmt, conn=None)
-
TDengine STMT interface
Expand source code
class TaosStmt(object): """TDengine STMT interface""" def __init__(self, stmt, conn = None): self._conn = conn self._stmt = stmt def set_tbname(self, name): """Set table name if needed. Note that the set_tbname* method should only used in insert statement """ if self._stmt is None: raise StatementError("Invalid use of set_tbname") taos_stmt_set_tbname(self._stmt, name) def prepare(self, sql): # type: (str) -> None taos_stmt_prepare(self._stmt, sql) def set_tbname_tags(self, name, tags): # type: (str, Array[TaosBind]) -> None """Set table name with tags, tags is array of BindParams""" if self._stmt is None: raise StatementError("Invalid use of set_tbname") taos_stmt_set_tbname_tags(self._stmt, name, tags) def bind_param(self, params, add_batch=True): # type: (Array[TaosBind], bool) -> None if self._stmt is None: raise StatementError("Invalid use of stmt") taos_stmt_bind_param(self._stmt, params) if add_batch: taos_stmt_add_batch(self._stmt) def bind_param_batch(self, binds, add_batch=True): # type: (Array[TaosMultiBind], bool) -> None if self._stmt is None: raise StatementError("Invalid use of stmt") taos_stmt_bind_param_batch(self._stmt, binds) if add_batch: taos_stmt_add_batch(self._stmt) def add_batch(self): if self._stmt is None: raise StatementError("Invalid use of stmt") taos_stmt_add_batch(self._stmt) def execute(self): if self._stmt is None: raise StatementError("Invalid use of execute") taos_stmt_execute(self._stmt) def use_result(self): """NOTE: Don't use a stmt result more than once.""" result = taos_stmt_use_result(self._stmt) return TaosResult(result, True) @property def affected_rows(self): # type: () -> int return taos_stmt_affected_rows(self._stmt) def close(self): """Close stmt.""" if self._stmt is None: return taos_stmt_close(self._stmt) self._stmt = None def __del__(self): self.close()
Instance variables
var affected_rows
-
Expand source code
@property def affected_rows(self): # type: () -> int return taos_stmt_affected_rows(self._stmt)
Methods
def add_batch(self)
-
Expand source code
def add_batch(self): if self._stmt is None: raise StatementError("Invalid use of stmt") taos_stmt_add_batch(self._stmt)
def bind_param(self, params, add_batch=True)
-
Expand source code
def bind_param(self, params, add_batch=True): # type: (Array[TaosBind], bool) -> None if self._stmt is None: raise StatementError("Invalid use of stmt") taos_stmt_bind_param(self._stmt, params) if add_batch: taos_stmt_add_batch(self._stmt)
def bind_param_batch(self, binds, add_batch=True)
-
Expand source code
def bind_param_batch(self, binds, add_batch=True): # type: (Array[TaosMultiBind], bool) -> None if self._stmt is None: raise StatementError("Invalid use of stmt") taos_stmt_bind_param_batch(self._stmt, binds) if add_batch: taos_stmt_add_batch(self._stmt)
def close(self)
-
Close stmt.
Expand source code
def close(self): """Close stmt.""" if self._stmt is None: return taos_stmt_close(self._stmt) self._stmt = None
def execute(self)
-
Expand source code
def execute(self): if self._stmt is None: raise StatementError("Invalid use of execute") taos_stmt_execute(self._stmt)
def prepare(self, sql)
-
Expand source code
def prepare(self, sql): # type: (str) -> None taos_stmt_prepare(self._stmt, sql)
def set_tbname(self, name)
-
Set table name if needed.
Note that the set_tbname* method should only used in insert statement
Expand source code
def set_tbname(self, name): """Set table name if needed. Note that the set_tbname* method should only used in insert statement """ if self._stmt is None: raise StatementError("Invalid use of set_tbname") taos_stmt_set_tbname(self._stmt, name)
-
Set table name with tags, tags is array of BindParams
Expand source code
def set_tbname_tags(self, name, tags): # type: (str, Array[TaosBind]) -> None """Set table name with tags, tags is array of BindParams""" if self._stmt is None: raise StatementError("Invalid use of set_tbname") taos_stmt_set_tbname_tags(self._stmt, name, tags)
def use_result(self)
-
NOTE: Don't use a stmt result more than once.
Expand source code
def use_result(self): """NOTE: Don't use a stmt result more than once.""" result = taos_stmt_use_result(self._stmt) return TaosResult(result, True)