Package taos

Expand source code
# encoding:UTF-8
from .connection import TaosConnection

# For some reason, the following is needed for VS Code (through PyLance) to
# recognize that "error" is a valid module of the "taos" package.
from .error import *
from .bind import *
from .field import *
from .cursor import *
from .result import *
from .statement import *
from .subscription import *
from .schemaless import *

try:
    from .sqlalchemy import *
except:
    pass

from taos._version import __version__

# Globals
threadsafety = 0
"""sqlalchemy will read this attribute"""
paramstyle = "pyformat"
"""sqlalchemy will read this attribute"""

__all__ = [
    "__version__",
    "IS_V3",
    # functions
    "connect",
    "new_bind_param",
    "new_bind_params",
    "new_multi_binds",
    "new_multi_bind",
    # objects
    "TaosBind",
    "TaosConnection",
    "TaosCursor",
    "TaosResult",
    "TaosRows",
    "TaosRow",
    "TaosStmt",
    "PrecisionEnum",
    "SmlPrecision",
    "SmlProtocol"
]


def connect(*args, **kwargs):
    # type: (..., ...) -> TaosConnection
    """Function to return a TDengine connector object

    Current supporting keyword parameters:
    @dsn: Data source name as string
    @user: Username as string(optional)
    @password: Password as string(optional)
    @host: Hostname(optional)
    @database: Database name(optional)

    @rtype: TDengineConnector
    """
    return TaosConnection(*args, **kwargs)

Sub-modules

taos.bind
taos.cinterface
taos.connection
taos.constants

Constants in TDengine python

taos.cursor
taos.error

Python exceptions

taos.field
taos.field_v3
taos.precision
taos.result
taos.schemaless
taos.sqlalchemy
taos.statement
taos.subscription
taos.timestamp
taos.tmq

Functions

def connect(*args, **kwargs)

Function to return a TDengine connector object

Current supporting keyword parameters: @dsn: Data source name as string @user: Username as string(optional) @password: Password as string(optional) @host: Hostname(optional) @database: Database name(optional)

@rtype: TDengineConnector

Expand source code
def connect(*args, **kwargs):
    # type: (..., ...) -> TaosConnection
    """Function to return a TDengine connector object

    Current supporting keyword parameters:
    @dsn: Data source name as string
    @user: Username as string(optional)
    @password: Password as string(optional)
    @host: Hostname(optional)
    @database: Database name(optional)

    @rtype: TDengineConnector
    """
    return TaosConnection(*args, **kwargs)
def new_bind_param()
Expand source code
def new_bind_param():
    # type: () -> TaosBind
    if IS_V3:
        return TaosMultiBind()
    else:
        return TaosBind()
def new_bind_params(size)
Expand source code
def new_bind_params(size):
    # type: (int) -> Array[TaosBind]
    if IS_V3:
        return (TaosMultiBind * size)()
    else:
        return (TaosBind * size)()
def new_multi_bind()
Expand source code
def new_multi_bind():
    # type: () -> TaosMultiBind
    return TaosMultiBind()
def new_multi_binds(size)
Expand source code
def new_multi_binds(size):
    # type: (int) -> Array[TaosMultiBind]
    return (TaosMultiBind * size)()

Classes

class PrecisionEnum

Precision enums

Expand source code
class PrecisionEnum(object):
    """Precision enums"""

    Milliseconds = 0
    Microseconds = 1
    Nanoseconds = 2

Class variables

var Microseconds
var Milliseconds
var Nanoseconds
class SmlPrecision

Schemaless timestamp precision constants

Expand source code
class SmlPrecision:
    """Schemaless timestamp precision constants"""
    NOT_CONFIGURED = 0 # C.TSDB_SML_TIMESTAMP_NOT_CONFIGURED
    HOURS = 1
    MINUTES = 2
    SECONDS = 3
    MILLI_SECONDS = 4
    MICRO_SECONDS = 5
    NANO_SECONDS = 6

Class variables

var HOURS
var MICRO_SECONDS
var MILLI_SECONDS
var MINUTES
var NANO_SECONDS
var NOT_CONFIGURED
var SECONDS
class SmlProtocol

Schemaless protocol constants

Expand source code
class SmlProtocol:
    """Schemaless protocol constants"""
    UNKNOWN_PROTOCOL = 0
    LINE_PROTOCOL = 1
    TELNET_PROTOCOL = 2
    JSON_PROTOCOL = 3

Class variables

var JSON_PROTOCOL
var LINE_PROTOCOL
var TELNET_PROTOCOL
var UNKNOWN_PROTOCOL
class TaosBind (*args, **kwargs)

Structure base class

Expand source code
class TaosBind(ctypes.Structure):
    _fields_ = [
        ("buffer_type", c_int),
        ("buffer", c_void_p),
        ("buffer_length", c_size_t),
        ("length", POINTER(c_size_t)),
        ("is_null", POINTER(c_int)),
        ("is_unsigned", c_int),
        ("error", POINTER(c_int)),
        ("u", c_int64),
        ("allocated", c_int),
    ]

    def bool(self, value):
        self.buffer_type = FieldType.C_BOOL
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_bool(value)), c_void_p)
            self.buffer_length = sizeof(c_bool)

    def tinyint(self, value):
        self.buffer_type = FieldType.C_TINYINT
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_int8(value)), c_void_p)
            self.buffer_length = sizeof(c_int8)

    def smallint(self, value):
        self.buffer_type = FieldType.C_SMALLINT
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_int16(value)), c_void_p)
            self.buffer_length = sizeof(c_int16)

    def int(self, value):
        self.buffer_type = FieldType.C_INT
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_int32(value)), c_void_p)
            self.buffer_length = sizeof(c_int32)

    def bigint(self, value):
        self.buffer_type = FieldType.C_BIGINT
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_int64(value)), c_void_p)
            self.buffer_length = sizeof(c_int64)

    def float(self, value):
        self.buffer_type = FieldType.C_FLOAT
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_float(value)), c_void_p)
            self.buffer_length = sizeof(c_float)

    def double(self, value):
        self.buffer_type = FieldType.C_DOUBLE
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_double(value)), c_void_p)
            self.buffer_length = sizeof(c_double)

    def binary(self, value):
        buffer = None
        length = 0
        self.buffer_type = FieldType.C_BINARY
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            if isinstance(value, str):
                bytes = value.encode("utf-8")
                buffer = create_string_buffer(bytes)
                length = len(bytes)
            else:
                buffer = value
                length = len(value)
            self.buffer = cast(buffer, c_void_p)
            self.buffer_length = length
            self.length = pointer(c_size_t(self.buffer_length))

    def timestamp(self, value, precision=PrecisionEnum.Milliseconds):
        self.buffer_type = FieldType.C_TIMESTAMP
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            if type(value) is datetime:
                if precision == PrecisionEnum.Milliseconds:
                    ts = int(round((value - _datetime_epoch).total_seconds() * 1000))
                elif precision == PrecisionEnum.Microseconds:
                    ts = int(round((value - _datetime_epoch).total_seconds() * 10000000))
                else:
                    raise PrecisionError("datetime do not support nanosecond precision")
            elif type(value) is float:
                if precision == PrecisionEnum.Milliseconds:
                    ts = int(round(value * 1000))
                elif precision == PrecisionEnum.Microseconds:
                    ts = int(round(value * 10000000))
                else:
                    raise PrecisionError("time float do not support nanosecond precision")
            elif isinstance(value, int) and not isinstance(value, bool):
                ts = value
            elif isinstance(value, str):
                value = datetime.fromisoformat(value)
                if precision == PrecisionEnum.Milliseconds:
                    ts = int(round(value * 1000))
                elif precision == PrecisionEnum.Microseconds:
                    ts = int(round(value * 10000000))
                else:
                    raise PrecisionError("datetime do not support nanosecond precision")
            self.buffer = cast(pointer(c_int64(ts)), c_void_p)
            self.buffer_length = sizeof(c_int64)

    def nchar(self, value):
        buffer = None
        length = 0
        self.buffer_type = FieldType.C_NCHAR
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            if isinstance(value, str):
                bytes = value.encode("utf-8")
                buffer = create_string_buffer(bytes)
                length = len(bytes)
            else:
                buffer = value
                length = len(value)
            self.buffer = cast(buffer, c_void_p)
            self.buffer_length = length
            self.length = pointer(c_size_t(self.buffer_length))

    def json(self, value):
        buffer = None
        length = 0
        self.buffer_type = FieldType.C_JSON
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            if isinstance(value, str):
                bytes = value.encode("utf-8")
                buffer = create_string_buffer(bytes)
                length = len(bytes)
            else:
                buffer = value
                length = len(value)
            self.buffer = cast(buffer, c_void_p)
            self.buffer_length = length
            self.length = pointer(c_size_t(self.buffer_length))

    def tinyint_unsigned(self, value):
        self.buffer_type = FieldType.C_TINYINT_UNSIGNED
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_uint8(value)), c_void_p)
            self.buffer_length = sizeof(c_uint8)

    def smallint_unsigned(self, value):
        self.buffer_type = FieldType.C_SMALLINT_UNSIGNED
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_uint16(value)), c_void_p)
            self.buffer_length = sizeof(c_uint16)

    def int_unsigned(self, value):
        self.buffer_type = FieldType.C_INT_UNSIGNED
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_uint32(value)), c_void_p)
            self.buffer_length = sizeof(c_uint32)
        

    def bigint_unsigned(self, value):
        self.buffer_type = FieldType.C_BIGINT_UNSIGNED
        if value is None:
            self.is_null = pointer(c_int(1))
        else:
            self.buffer = cast(pointer(c_uint64(value)), c_void_p)
            self.buffer_length = sizeof(c_uint64)

Ancestors

  • _ctypes.Structure
  • _ctypes._CData

Instance variables

var allocated

Structure/Union member

var buffer

Structure/Union member

var buffer_length

Structure/Union member

var buffer_type

Structure/Union member

var error

Structure/Union member

var is_null

Structure/Union member

var is_unsigned

Structure/Union member

var length

Structure/Union member

var u

Structure/Union member

Methods

def bigint(self, value)
Expand source code
def bigint(self, value):
    self.buffer_type = FieldType.C_BIGINT
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_int64(value)), c_void_p)
        self.buffer_length = sizeof(c_int64)
def bigint_unsigned(self, value)
Expand source code
def bigint_unsigned(self, value):
    self.buffer_type = FieldType.C_BIGINT_UNSIGNED
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_uint64(value)), c_void_p)
        self.buffer_length = sizeof(c_uint64)
def binary(self, value)
Expand source code
def binary(self, value):
    buffer = None
    length = 0
    self.buffer_type = FieldType.C_BINARY
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        if isinstance(value, str):
            bytes = value.encode("utf-8")
            buffer = create_string_buffer(bytes)
            length = len(bytes)
        else:
            buffer = value
            length = len(value)
        self.buffer = cast(buffer, c_void_p)
        self.buffer_length = length
        self.length = pointer(c_size_t(self.buffer_length))
def bool(self, value)
Expand source code
def bool(self, value):
    self.buffer_type = FieldType.C_BOOL
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_bool(value)), c_void_p)
        self.buffer_length = sizeof(c_bool)
def double(self, value)
Expand source code
def double(self, value):
    self.buffer_type = FieldType.C_DOUBLE
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_double(value)), c_void_p)
        self.buffer_length = sizeof(c_double)
def float(self, value)
Expand source code
def float(self, value):
    self.buffer_type = FieldType.C_FLOAT
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_float(value)), c_void_p)
        self.buffer_length = sizeof(c_float)
def int(self, value)
Expand source code
def int(self, value):
    self.buffer_type = FieldType.C_INT
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_int32(value)), c_void_p)
        self.buffer_length = sizeof(c_int32)
def int_unsigned(self, value)
Expand source code
def int_unsigned(self, value):
    self.buffer_type = FieldType.C_INT_UNSIGNED
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_uint32(value)), c_void_p)
        self.buffer_length = sizeof(c_uint32)
def json(self, value)
Expand source code
def json(self, value):
    buffer = None
    length = 0
    self.buffer_type = FieldType.C_JSON
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        if isinstance(value, str):
            bytes = value.encode("utf-8")
            buffer = create_string_buffer(bytes)
            length = len(bytes)
        else:
            buffer = value
            length = len(value)
        self.buffer = cast(buffer, c_void_p)
        self.buffer_length = length
        self.length = pointer(c_size_t(self.buffer_length))
def nchar(self, value)
Expand source code
def nchar(self, value):
    buffer = None
    length = 0
    self.buffer_type = FieldType.C_NCHAR
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        if isinstance(value, str):
            bytes = value.encode("utf-8")
            buffer = create_string_buffer(bytes)
            length = len(bytes)
        else:
            buffer = value
            length = len(value)
        self.buffer = cast(buffer, c_void_p)
        self.buffer_length = length
        self.length = pointer(c_size_t(self.buffer_length))
def smallint(self, value)
Expand source code
def smallint(self, value):
    self.buffer_type = FieldType.C_SMALLINT
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_int16(value)), c_void_p)
        self.buffer_length = sizeof(c_int16)
def smallint_unsigned(self, value)
Expand source code
def smallint_unsigned(self, value):
    self.buffer_type = FieldType.C_SMALLINT_UNSIGNED
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_uint16(value)), c_void_p)
        self.buffer_length = sizeof(c_uint16)
def timestamp(self, value, precision=0)
Expand source code
def timestamp(self, value, precision=PrecisionEnum.Milliseconds):
    self.buffer_type = FieldType.C_TIMESTAMP
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        if type(value) is datetime:
            if precision == PrecisionEnum.Milliseconds:
                ts = int(round((value - _datetime_epoch).total_seconds() * 1000))
            elif precision == PrecisionEnum.Microseconds:
                ts = int(round((value - _datetime_epoch).total_seconds() * 10000000))
            else:
                raise PrecisionError("datetime do not support nanosecond precision")
        elif type(value) is float:
            if precision == PrecisionEnum.Milliseconds:
                ts = int(round(value * 1000))
            elif precision == PrecisionEnum.Microseconds:
                ts = int(round(value * 10000000))
            else:
                raise PrecisionError("time float do not support nanosecond precision")
        elif isinstance(value, int) and not isinstance(value, bool):
            ts = value
        elif isinstance(value, str):
            value = datetime.fromisoformat(value)
            if precision == PrecisionEnum.Milliseconds:
                ts = int(round(value * 1000))
            elif precision == PrecisionEnum.Microseconds:
                ts = int(round(value * 10000000))
            else:
                raise PrecisionError("datetime do not support nanosecond precision")
        self.buffer = cast(pointer(c_int64(ts)), c_void_p)
        self.buffer_length = sizeof(c_int64)
def tinyint(self, value)
Expand source code
def tinyint(self, value):
    self.buffer_type = FieldType.C_TINYINT
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_int8(value)), c_void_p)
        self.buffer_length = sizeof(c_int8)
def tinyint_unsigned(self, value)
Expand source code
def tinyint_unsigned(self, value):
    self.buffer_type = FieldType.C_TINYINT_UNSIGNED
    if value is None:
        self.is_null = pointer(c_int(1))
    else:
        self.buffer = cast(pointer(c_uint8(value)), c_void_p)
        self.buffer_length = sizeof(c_uint8)
class TaosConnection (*args, **kwargs)

TDengine connection object

Expand source code
class TaosConnection(object):
    """TDengine connection object"""

    def __init__(self, *args, **kwargs):
        self._conn = None
        self._host = None
        self._user = "root"
        self._password = "taosdata"
        self._database = None
        self._port = 0
        self._config = None
        self._tz = None
        self._init_config(**kwargs)
        self._chandle = CTaosInterface(self._config, self._tz)
        self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)

    def _init_config(self, **kwargs):
        # host
        if "host" in kwargs:
            self._host = kwargs["host"]

        # user
        if "user" in kwargs:
            self._user = kwargs["user"]

        # password
        if "password" in kwargs:
            self._password = kwargs["password"]

        # database
        if "database" in kwargs:
            self._database = kwargs["database"]

        # port
        if "port" in kwargs:
            self._port = kwargs["port"]

        # config
        if "config" in kwargs:
            self._config = kwargs["config"]

        # timezone
        if "timezone" in kwargs:
            self._tz = kwargs["timezone"]

    def close(self):
        """Close current connection."""
        if self._conn:
            taos_close(self._conn)
            self._conn = None

    @property
    def client_info(self):
        # type: () -> str
        return taos_get_client_info()

    @property
    def server_info(self):
        # type: () -> str
        return taos_get_server_info(self._conn)

    def select_db(self, database):
        # type: (str) -> None
        taos_select_db(self._conn, database)

    def execute(self, sql):
        # type: (str) -> int
        """Simplely execute sql ignoring the results"""
        return self.query(sql).affected_rows

    def query(self, sql):
        # type: (str) -> TaosResult
        result = taos_query(self._conn, sql)
        return TaosResult(result, True, self)

    def query_a(self, sql, callback, param):
        # type: (str, async_query_callback_type, c_void_p) -> None
        """Asynchronously query a sql with callback function"""
        taos_query_a(self._conn, sql, callback, param)

    def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
        # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
        """Create a subscription."""
        if self._conn is None:
            return None
        sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
        if not sub:
            errno = taos_errno(c_void_p(None))
            msg = taos_errstr(c_void_p(None))
            raise Error(msg, errno)
        return TaosSubscription(sub, callback != None)

    def statement(self, sql=None):
        # type: (str | None) -> TaosStmt
        if self._conn is None:
            return None
        stmt = taos_stmt_init(self._conn)
        if sql != None:
            taos_stmt_prepare(stmt, sql)

        return TaosStmt(stmt)

    def load_table_info(self, tables):
        # type: (str) -> None
        taos_load_table_info(self._conn, tables)

    def schemaless_insert(self, lines, protocol, precision):
        # type: (list[str], SmlProtocol, SmlPrecision) -> int
        """
        1.Line protocol and schemaless support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
        ]
        conn.schemaless_insert(lines, 0, "ns")
        ```

        2.OpenTSDB telnet style API format support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
        ]
        conn.schemaless_insert(lines, 1, None)
        ```

        3.OpenTSDB HTTP JSON format support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        payload = ['''
        {
            "metric": "cpu_load_0",
            "timestamp": 1626006833610123,
            "value": 55.5,
            "tags":
                {
                    "host": "ubuntu",
                    "interface": "eth0",
                    "Id": "tb0"
                }
        }
        ''']
        conn.schemaless_insert(lines, 2, None)
        ```
        """
        return taos_schemaless_insert(self._conn, lines, protocol, precision)

    def cursor(self):
        # type: () -> TaosCursor
        """Return a new Cursor object using the connection."""
        return TaosCursor(self)

    def commit(self):
        """Commit any pending transaction to the database.

        Since TDengine do not support transactions, the implement is void functionality.
        """
        pass

    def rollback(self):
        """Void functionality"""
        pass

    def clear_result_set(self):
        """Clear unused result set on this connection."""
        pass

    def __del__(self):
        self.close()

Instance variables

var client_info
Expand source code
@property
def client_info(self):
    # type: () -> str
    return taos_get_client_info()
var server_info
Expand source code
@property
def server_info(self):
    # type: () -> str
    return taos_get_server_info(self._conn)

Methods

def clear_result_set(self)

Clear unused result set on this connection.

Expand source code
def clear_result_set(self):
    """Clear unused result set on this connection."""
    pass
def close(self)

Close current connection.

Expand source code
def close(self):
    """Close current connection."""
    if self._conn:
        taos_close(self._conn)
        self._conn = None
def commit(self)

Commit any pending transaction to the database.

Since TDengine do not support transactions, the implement is void functionality.

Expand source code
def commit(self):
    """Commit any pending transaction to the database.

    Since TDengine do not support transactions, the implement is void functionality.
    """
    pass
def cursor(self)

Return a new Cursor object using the connection.

Expand source code
def cursor(self):
    # type: () -> TaosCursor
    """Return a new Cursor object using the connection."""
    return TaosCursor(self)
def execute(self, sql)

Simplely execute sql ignoring the results

Expand source code
def execute(self, sql):
    # type: (str) -> int
    """Simplely execute sql ignoring the results"""
    return self.query(sql).affected_rows
def load_table_info(self, tables)
Expand source code
def load_table_info(self, tables):
    # type: (str) -> None
    taos_load_table_info(self._conn, tables)
def query(self, sql)
Expand source code
def query(self, sql):
    # type: (str) -> TaosResult
    result = taos_query(self._conn, sql)
    return TaosResult(result, True, self)
def query_a(self, sql, callback, param)

Asynchronously query a sql with callback function

Expand source code
def query_a(self, sql, callback, param):
    # type: (str, async_query_callback_type, c_void_p) -> None
    """Asynchronously query a sql with callback function"""
    taos_query_a(self._conn, sql, callback, param)
def rollback(self)

Void functionality

Expand source code
def rollback(self):
    """Void functionality"""
    pass
def schemaless_insert(self, lines, protocol, precision)

1.Line protocol and schemaless support

Example

import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
    'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
]
conn.schemaless_insert(lines, 0, "ns")

2.OpenTSDB telnet style API format support

Example

import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
    'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
]
conn.schemaless_insert(lines, 1, None)

3.OpenTSDB HTTP JSON format support

Example

import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
payload = ['''
{
    "metric": "cpu_load_0",
    "timestamp": 1626006833610123,
    "value": 55.5,
    "tags":
        {
            "host": "ubuntu",
            "interface": "eth0",
            "Id": "tb0"
        }
}
''']
conn.schemaless_insert(lines, 2, None)
Expand source code
def schemaless_insert(self, lines, protocol, precision):
    # type: (list[str], SmlProtocol, SmlPrecision) -> int
    """
    1.Line protocol and schemaless support

    ## Example

    ```python
    import taos
    conn = taos.connect()
    conn.exec("drop database if exists test")
    conn.select_db("test")
    lines = [
        'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
    ]
    conn.schemaless_insert(lines, 0, "ns")
    ```

    2.OpenTSDB telnet style API format support

    ## Example

    ```python
    import taos
    conn = taos.connect()
    conn.exec("drop database if exists test")
    conn.select_db("test")
    lines = [
        'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
    ]
    conn.schemaless_insert(lines, 1, None)
    ```

    3.OpenTSDB HTTP JSON format support

    ## Example

    ```python
    import taos
    conn = taos.connect()
    conn.exec("drop database if exists test")
    conn.select_db("test")
    payload = ['''
    {
        "metric": "cpu_load_0",
        "timestamp": 1626006833610123,
        "value": 55.5,
        "tags":
            {
                "host": "ubuntu",
                "interface": "eth0",
                "Id": "tb0"
            }
    }
    ''']
    conn.schemaless_insert(lines, 2, None)
    ```
    """
    return taos_schemaless_insert(self._conn, lines, protocol, precision)
def select_db(self, database)
Expand source code
def select_db(self, database):
    # type: (str) -> None
    taos_select_db(self._conn, database)
def statement(self, sql=None)
Expand source code
def statement(self, sql=None):
    # type: (str | None) -> TaosStmt
    if self._conn is None:
        return None
    stmt = taos_stmt_init(self._conn)
    if sql != None:
        taos_stmt_prepare(stmt, sql)

    return TaosStmt(stmt)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None)

Create a subscription.

Expand source code
def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
    # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
    """Create a subscription."""
    if self._conn is None:
        return None
    sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
    if not sub:
        errno = taos_errno(c_void_p(None))
        msg = taos_errstr(c_void_p(None))
        raise Error(msg, errno)
    return TaosSubscription(sub, callback != None)
class TaosCursor (connection=None)

Database cursor which is used to manage the context of a fetch operation.

Attributes

.description: Read-only attribute consists of 7-item sequences:

> name (mandatory)
> type_code (mandatory)
> display_size
> internal_size
> precision
> scale
> null_ok

This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.

.rowcount:This read-only attribute specifies the number of rows that the last .execute*() produced (for DQL statements like SELECT) or affected

Expand source code
class TaosCursor(object):
    """Database cursor which is used to manage the context of a fetch operation.

    Attributes:
        .description: Read-only attribute consists of 7-item sequences:

            > name (mandatory)
            > type_code (mandatory)
            > display_size
            > internal_size
            > precision
            > scale
            > null_ok

            This attribute will be None for operations that do not return rows or
            if the cursor has not had an operation invoked via the .execute*() method yet.

        .rowcount:This read-only attribute specifies the number of rows that the last
            .execute*() produced (for DQL statements like SELECT) or affected
    """

    def __init__(self, connection=None):
        self._description = []
        self._rowcount = -1
        self._connection = None
        self._result = None
        self._fields = None
        self._block = None
        self._block_rows = -1
        self._block_iter = 0
        self._affected_rows = 0
        self._logfile = ""

        if connection is not None:
            self._connection = connection

    def __iter__(self):
        return self

    def __next__(self):
        return self._taos_next()

    def next(self):
        return self._taos_next()

    def _taos_next(self):
        if self._result is None or self._fields is None:
            raise OperationalError("Invalid use of fetch iterator")

        if self._block_rows <= self._block_iter:
            block, self._block_rows = taos_fetch_row(
                self._result, self._fields)
            if self._block_rows == 0:
                raise StopIteration
            self._block = list(map(tuple, zip(*block)))
            self._block_iter = 0

        data = self._block[self._block_iter]
        self._block_iter += 1

        return data

    @property
    def description(self):
        """Return the description of the object."""
        return self._description

    @property
    def rowcount(self):
        """
        For INSERT statement, rowcount is assigned immediately after execute the statement.
        For SELECT statement, rowcount will not get correct value until fetched all data.
        """
        return self._rowcount

    @property
    def affected_rows(self):
        """Return the rowcount of insertion"""
        return self._affected_rows

    def callproc(self, procname, *args):
        """Call a stored database procedure with the given name.

        Void functionality since no stored procedures.
        """
        pass

    def log(self, logfile):
        self._logfile = logfile

    def close(self):
        """Close the cursor."""
        if self._connection is None:
            return False

        self._reset_result()
        self._connection = None

        return True

    def execute(self, operation, params=None):
        # type: (str, Any) -> int
        """Prepare and execute a database operation (query or command)."""
        if not operation:
            return None

        if not self._connection:
            # TODO : change the exception raised here
            raise ProgrammingError("Cursor is not connected")

        self._reset_result()

        stmt = operation
        if params is not None:
            pass

        # global querySeqNum
        # querySeqNum += 1
        # localSeqNum = querySeqNum # avoid race condition
        # print("   >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
        self._result = taos_query(self._connection._conn, stmt)
        # print("   << Query ({}) Exec Done".format(localSeqNum))
        if self._logfile:
            with open(self._logfile, "a", encoding="utf-8") as logfile:
                logfile.write("%s;\n" % operation)

        if taos_field_count(self._result) == 0:
            affected_rows = taos_affected_rows(self._result)
            self._affected_rows = affected_rows
            self._rowcount = affected_rows
            return affected_rows
        else:
            self._fields = taos_fetch_fields(self._result)
            return self._handle_result()

    def executemany(self, operation, seq_of_parameters):
        """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters."""
        self.execute(operation)
        pass

    def fetchone(self):
        """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available."""
        pass

    def fetchmany(self):
        pass

    def istype(self, col, dataType):
        if dataType.upper() == "BOOL":
            if self._description[col][1] == FieldType.C_BOOL:
                return True
        if dataType.upper() == "TINYINT":
            if self._description[col][1] == FieldType.C_TINYINT:
                return True
        if dataType.upper() == "TINYINT UNSIGNED":
            if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED:
                return True
        if dataType.upper() == "SMALLINT":
            if self._description[col][1] == FieldType.C_SMALLINT:
                return True
        if dataType.upper() == "SMALLINT UNSIGNED":
            if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED:
                return True
        if dataType.upper() == "INT":
            if self._description[col][1] == FieldType.C_INT:
                return True
        if dataType.upper() == "INT UNSIGNED":
            if self._description[col][1] == FieldType.C_INT_UNSIGNED:
                return True
        if dataType.upper() == "BIGINT":
            if self._description[col][1] == FieldType.C_BIGINT:
                return True
        if dataType.upper() == "BIGINT UNSIGNED":
            if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED:
                return True
        if dataType.upper() == "FLOAT":
            if self._description[col][1] == FieldType.C_FLOAT:
                return True
        if dataType.upper() == "DOUBLE":
            if self._description[col][1] == FieldType.C_DOUBLE:
                return True
        if dataType.upper() == "BINARY":
            if self._description[col][1] == FieldType.C_BINARY:
                return True
        if dataType.upper() == "TIMESTAMP":
            if self._description[col][1] == FieldType.C_TIMESTAMP:
                return True
        if dataType.upper() == "NCHAR":
            if self._description[col][1] == FieldType.C_NCHAR:
                return True
        if dataType.upper() == "JSON":
            if self._description[col][1] == FieldType.C_JSON:
                return True

        return False

    def fetchall_row(self):
        """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation."""
        if self._result is None or self._fields is None:
            raise OperationalError("Invalid use of fetchall")

        buffer = [[] for i in range(len(self._fields))]
        self._rowcount = 0
        while True:
            block, num_of_rows = taos_fetch_row(self._result, self._fields)
            errno = taos_errno(self._result)
            if errno != 0:
                raise ProgrammingError(taos_errstr(self._result), errno)
            if num_of_rows == 0:
                break
            self._rowcount += num_of_rows
            for i in range(len(self._fields)):
                buffer[i].extend(block[i])
        return list(map(tuple, zip(*buffer)))

    def fetchall(self):
        if self._result is None:
            raise OperationalError("Invalid use of fetchall")
        fields = self._fields if self._fields is not None else taos_fetch_fields(
            self._result)
        buffer = [[] for i in range(len(fields))]
        self._rowcount = 0
        while True:
            block, num_of_rows = taos_fetch_block(self._result, self._fields)
            errno = taos_errno(self._result)
            if errno != 0:
                raise ProgrammingError(taos_errstr(self._result), errno)
            if num_of_rows == 0:
                break
            self._rowcount += num_of_rows
            for i in range(len(self._fields)):
                buffer[i].extend(block[i])
        return list(map(tuple, zip(*buffer)))

    def stop_query(self):
        if self._result is not None:
            taos_stop_query(self._result)

    def nextset(self):
        """ """
        pass

    def setinputsize(self, sizes):
        pass

    def setutputsize(self, size, column=None):
        pass

    def _reset_result(self):
        """Reset the result to unused version."""
        self._description = []
        self._rowcount = -1
        if self._result is not None:
            taos_free_result(self._result)
        self._result = None
        self._fields = None
        self._block = None
        self._block_rows = -1
        self._block_iter = 0
        self._affected_rows = 0

    def _handle_result(self):
        """Handle the return result from query."""
        self._description = []
        for ele in self._fields:
            self._description.append(
                (ele["name"], ele["type"], None, None, None, None, False))

        return self._result

    def __del__(self):
        self.close()

Instance variables

var affected_rows

Return the rowcount of insertion

Expand source code
@property
def affected_rows(self):
    """Return the rowcount of insertion"""
    return self._affected_rows
var description

Return the description of the object.

Expand source code
@property
def description(self):
    """Return the description of the object."""
    return self._description
var rowcount

For INSERT statement, rowcount is assigned immediately after execute the statement. For SELECT statement, rowcount will not get correct value until fetched all data.

Expand source code
@property
def rowcount(self):
    """
    For INSERT statement, rowcount is assigned immediately after execute the statement.
    For SELECT statement, rowcount will not get correct value until fetched all data.
    """
    return self._rowcount

Methods

def callproc(self, procname, *args)

Call a stored database procedure with the given name.

Void functionality since no stored procedures.

Expand source code
def callproc(self, procname, *args):
    """Call a stored database procedure with the given name.

    Void functionality since no stored procedures.
    """
    pass
def close(self)

Close the cursor.

Expand source code
def close(self):
    """Close the cursor."""
    if self._connection is None:
        return False

    self._reset_result()
    self._connection = None

    return True
def execute(self, operation, params=None)

Prepare and execute a database operation (query or command).

Expand source code
def execute(self, operation, params=None):
    # type: (str, Any) -> int
    """Prepare and execute a database operation (query or command)."""
    if not operation:
        return None

    if not self._connection:
        # TODO : change the exception raised here
        raise ProgrammingError("Cursor is not connected")

    self._reset_result()

    stmt = operation
    if params is not None:
        pass

    # global querySeqNum
    # querySeqNum += 1
    # localSeqNum = querySeqNum # avoid race condition
    # print("   >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
    self._result = taos_query(self._connection._conn, stmt)
    # print("   << Query ({}) Exec Done".format(localSeqNum))
    if self._logfile:
        with open(self._logfile, "a", encoding="utf-8") as logfile:
            logfile.write("%s;\n" % operation)

    if taos_field_count(self._result) == 0:
        affected_rows = taos_affected_rows(self._result)
        self._affected_rows = affected_rows
        self._rowcount = affected_rows
        return affected_rows
    else:
        self._fields = taos_fetch_fields(self._result)
        return self._handle_result()
def executemany(self, operation, seq_of_parameters)

Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.

Expand source code
def executemany(self, operation, seq_of_parameters):
    """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters."""
    self.execute(operation)
    pass
def fetchall(self)
Expand source code
def fetchall(self):
    if self._result is None:
        raise OperationalError("Invalid use of fetchall")
    fields = self._fields if self._fields is not None else taos_fetch_fields(
        self._result)
    buffer = [[] for i in range(len(fields))]
    self._rowcount = 0
    while True:
        block, num_of_rows = taos_fetch_block(self._result, self._fields)
        errno = taos_errno(self._result)
        if errno != 0:
            raise ProgrammingError(taos_errstr(self._result), errno)
        if num_of_rows == 0:
            break
        self._rowcount += num_of_rows
        for i in range(len(self._fields)):
            buffer[i].extend(block[i])
    return list(map(tuple, zip(*buffer)))
def fetchall_row(self)

Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.

Expand source code
def fetchall_row(self):
    """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation."""
    if self._result is None or self._fields is None:
        raise OperationalError("Invalid use of fetchall")

    buffer = [[] for i in range(len(self._fields))]
    self._rowcount = 0
    while True:
        block, num_of_rows = taos_fetch_row(self._result, self._fields)
        errno = taos_errno(self._result)
        if errno != 0:
            raise ProgrammingError(taos_errstr(self._result), errno)
        if num_of_rows == 0:
            break
        self._rowcount += num_of_rows
        for i in range(len(self._fields)):
            buffer[i].extend(block[i])
    return list(map(tuple, zip(*buffer)))
def fetchmany(self)
Expand source code
def fetchmany(self):
    pass
def fetchone(self)

Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.

Expand source code
def fetchone(self):
    """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available."""
    pass
def istype(self, col, dataType)
Expand source code
def istype(self, col, dataType):
    if dataType.upper() == "BOOL":
        if self._description[col][1] == FieldType.C_BOOL:
            return True
    if dataType.upper() == "TINYINT":
        if self._description[col][1] == FieldType.C_TINYINT:
            return True
    if dataType.upper() == "TINYINT UNSIGNED":
        if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED:
            return True
    if dataType.upper() == "SMALLINT":
        if self._description[col][1] == FieldType.C_SMALLINT:
            return True
    if dataType.upper() == "SMALLINT UNSIGNED":
        if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED:
            return True
    if dataType.upper() == "INT":
        if self._description[col][1] == FieldType.C_INT:
            return True
    if dataType.upper() == "INT UNSIGNED":
        if self._description[col][1] == FieldType.C_INT_UNSIGNED:
            return True
    if dataType.upper() == "BIGINT":
        if self._description[col][1] == FieldType.C_BIGINT:
            return True
    if dataType.upper() == "BIGINT UNSIGNED":
        if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED:
            return True
    if dataType.upper() == "FLOAT":
        if self._description[col][1] == FieldType.C_FLOAT:
            return True
    if dataType.upper() == "DOUBLE":
        if self._description[col][1] == FieldType.C_DOUBLE:
            return True
    if dataType.upper() == "BINARY":
        if self._description[col][1] == FieldType.C_BINARY:
            return True
    if dataType.upper() == "TIMESTAMP":
        if self._description[col][1] == FieldType.C_TIMESTAMP:
            return True
    if dataType.upper() == "NCHAR":
        if self._description[col][1] == FieldType.C_NCHAR:
            return True
    if dataType.upper() == "JSON":
        if self._description[col][1] == FieldType.C_JSON:
            return True

    return False
def log(self, logfile)
Expand source code
def log(self, logfile):
    self._logfile = logfile
def next(self)
Expand source code
def next(self):
    return self._taos_next()
def nextset(self)
Expand source code
def nextset(self):
    """ """
    pass
def setinputsize(self, sizes)
Expand source code
def setinputsize(self, sizes):
    pass
def setutputsize(self, size, column=None)
Expand source code
def setutputsize(self, size, column=None):
    pass
def stop_query(self)
Expand source code
def stop_query(self):
    if self._result is not None:
        taos_stop_query(self._result)
class TaosResult (result, close_after=False, conn=None)

TDengine result interface

Expand source code
class TaosResult(object):
    """TDengine result interface"""

    def __init__(self, result, close_after=False, conn=None):
        # type: (c_void_p, bool, TaosConnection) -> TaosResult
        # to make the __del__ order right
        self._conn = conn
        self._close_after = close_after
        if isinstance(result, c_void_p):
            self._result = result
        else:
            self._result = c_void_p(result)

        self._fields = None
        self._field_count = None
        self._precision = None

        self._block = None
        self._block_length = None
        self._row_count = 0

    def __iter__(self):
        return self

    def __next__(self):
        return self._next_row()

    def next(self):
        # fetch next row
        return self._next_row()

    def _next_row(self):
        if self._result is None or self.fields is None:
            raise OperationalError("Invalid use of fetch iterator")

        if self._block is None or self._block_iter >= self._block_length:
            self._block, self._block_length = self.fetch_block()
            self._block_iter = 0
            # self._row_count += self._block_length

        raw = self._block[self._block_iter]
        self._block_iter += 1
        self._row_count += 1
        return raw

    @property
    def fields(self):
        """fields definitions of the current result"""
        if self._result is None:
            raise ResultError("no result object")
        if self._fields is None:
            self._fields = taos_fetch_fields(self._result)

        return self._fields

    @property
    def field_count(self):
        """Field count of the current result, eq to taos_field_count(result)"""
        return self.fields.count

    @property
    def row_count(self):
        """Return the rowcount of the object"""
        return self._row_count

    @property
    def precision(self):
        if self._precision is None:
            self._precision = taos_result_precision(self._result)
        return self._precision

    @property
    def affected_rows(self):
        return taos_affected_rows(self._result)

    # @property
    def field_lengths(self):
        return taos_fetch_lengths(self._result, self.field_count)

    def rows_iter(self, num_of_rows=None):
        return TaosRows(self, num_of_rows)

    def blocks_iter(self):
        return TaosBlocks(self)

    def fetch_block(self):
        if self._result is None:
            raise OperationalError("Invalid use of fetch iterator")

        blocks, length = taos_fetch_block(self._result)
        if length == 0:
            raise StopIteration

        return list(map(tuple, zip(*blocks))), length

    def fetch_all(self):
        if self._result is None:
            raise OperationalError("Invalid use of fetchall")

        if self._fields is None:
            self._fields = taos_fetch_fields(self._result)
        buffer = [[] for i in range(len(self._fields))]
        self._row_count = 0
        while True:
            block, num_of_fields = taos_fetch_block(self._result, self._fields)
            errno = taos_errno(self._result)
            if errno != 0:
                raise ProgrammingError(taos_errstr(self._result), errno)
            if num_of_fields == 0:
                break
            self._row_count += num_of_fields
            for i in range(len(self._fields)):
                buffer[i].extend(block[i])
        return list(map(tuple, zip(*buffer)))

    def fetch_all_into_dict(self):
        """Fetch all rows and convert it to dict"""
        names = [field.name for field in self.fields]
        rows = self.fetch_all()
        return list(dict(zip(names, row)) for row in rows)

    def fetch_rows_a(self, callback, param):
        taos_fetch_rows_a(self._result, callback, param)

    def stop_query(self):
        return taos_stop_query(self._result)
    
    def get_topic_name(self):
        return tmq_get_topic_name(self._result)
        
    def get_vgroup_id(self):
        return tmq_get_vgroup_id(self._result)
        
    def get_table_name(self):
        return tmq_get_table_name(self._result)
    
    def get_db_name(self):
        return tmq_get_db_name(self._result)

    def errno(self):
        """**DO NOT** use this directly unless you know what you are doing"""
        return taos_errno(self._result)

    def errstr(self):
        return taos_errstr(self._result)

    def check_error(self, errno=None, close=True):
        if errno is None:
            errno = self.errno()
        if errno != 0:
            msg = self.errstr()
            self.close()
            raise OperationalError(msg, errno)

    def close(self):
        """free result object."""
        if self._result is not None and self._close_after:
            taos_free_result(self._result)
        self._result = None
        self._fields = None
        self._field_count = None
        self._field_lengths = None

    def __del__(self):
        self.close()

Instance variables

var affected_rows
Expand source code
@property
def affected_rows(self):
    return taos_affected_rows(self._result)
var field_count

Field count of the current result, eq to taos_field_count(result)

Expand source code
@property
def field_count(self):
    """Field count of the current result, eq to taos_field_count(result)"""
    return self.fields.count
var fields

fields definitions of the current result

Expand source code
@property
def fields(self):
    """fields definitions of the current result"""
    if self._result is None:
        raise ResultError("no result object")
    if self._fields is None:
        self._fields = taos_fetch_fields(self._result)

    return self._fields
var precision
Expand source code
@property
def precision(self):
    if self._precision is None:
        self._precision = taos_result_precision(self._result)
    return self._precision
var row_count

Return the rowcount of the object

Expand source code
@property
def row_count(self):
    """Return the rowcount of the object"""
    return self._row_count

Methods

def blocks_iter(self)
Expand source code
def blocks_iter(self):
    return TaosBlocks(self)
def check_error(self, errno=None, close=True)
Expand source code
def check_error(self, errno=None, close=True):
    if errno is None:
        errno = self.errno()
    if errno != 0:
        msg = self.errstr()
        self.close()
        raise OperationalError(msg, errno)
def close(self)

free result object.

Expand source code
def close(self):
    """free result object."""
    if self._result is not None and self._close_after:
        taos_free_result(self._result)
    self._result = None
    self._fields = None
    self._field_count = None
    self._field_lengths = None
def errno(self)

DO NOT use this directly unless you know what you are doing

Expand source code
def errno(self):
    """**DO NOT** use this directly unless you know what you are doing"""
    return taos_errno(self._result)
def errstr(self)
Expand source code
def errstr(self):
    return taos_errstr(self._result)
def fetch_all(self)
Expand source code
def fetch_all(self):
    if self._result is None:
        raise OperationalError("Invalid use of fetchall")

    if self._fields is None:
        self._fields = taos_fetch_fields(self._result)
    buffer = [[] for i in range(len(self._fields))]
    self._row_count = 0
    while True:
        block, num_of_fields = taos_fetch_block(self._result, self._fields)
        errno = taos_errno(self._result)
        if errno != 0:
            raise ProgrammingError(taos_errstr(self._result), errno)
        if num_of_fields == 0:
            break
        self._row_count += num_of_fields
        for i in range(len(self._fields)):
            buffer[i].extend(block[i])
    return list(map(tuple, zip(*buffer)))
def fetch_all_into_dict(self)

Fetch all rows and convert it to dict

Expand source code
def fetch_all_into_dict(self):
    """Fetch all rows and convert it to dict"""
    names = [field.name for field in self.fields]
    rows = self.fetch_all()
    return list(dict(zip(names, row)) for row in rows)
def fetch_block(self)
Expand source code
def fetch_block(self):
    if self._result is None:
        raise OperationalError("Invalid use of fetch iterator")

    blocks, length = taos_fetch_block(self._result)
    if length == 0:
        raise StopIteration

    return list(map(tuple, zip(*blocks))), length
def fetch_rows_a(self, callback, param)
Expand source code
def fetch_rows_a(self, callback, param):
    taos_fetch_rows_a(self._result, callback, param)
def field_lengths(self)
Expand source code
def field_lengths(self):
    return taos_fetch_lengths(self._result, self.field_count)
def get_db_name(self)
Expand source code
def get_db_name(self):
    return tmq_get_db_name(self._result)
def get_table_name(self)
Expand source code
def get_table_name(self):
    return tmq_get_table_name(self._result)
def get_topic_name(self)
Expand source code
def get_topic_name(self):
    return tmq_get_topic_name(self._result)
def get_vgroup_id(self)
Expand source code
def get_vgroup_id(self):
    return tmq_get_vgroup_id(self._result)
def next(self)
Expand source code
def next(self):
    # fetch next row
    return self._next_row()
def rows_iter(self, num_of_rows=None)
Expand source code
def rows_iter(self, num_of_rows=None):
    return TaosRows(self, num_of_rows)
def stop_query(self)
Expand source code
def stop_query(self):
    return taos_stop_query(self._result)
class TaosRow (result, row)
Expand source code
class TaosRow:
    def __init__(self, result, row):
        self._result = result
        self._row = row

    def __str__(self):
        return taos_print_row(self._row, self._result.fields, self._result.field_count)

    def __call__(self):
        return self.as_tuple()

    def _astuple(self):
        return self.as_tuple()

    def __iter__(self):
        return self.as_tuple()

    def as_ptr(self):
        return self._row

    def as_tuple(self):
        precision = self._result.precision
        field_count = self._result.field_count
        blocks = [None] * field_count
        fields = self._result.fields
        field_lens = self._result.field_lengths()
        for i in range(field_count):
            data = ctypes.cast(self._row, ctypes.POINTER(ctypes.c_void_p))[i]
            if fields[i].type not in CONVERT_FUNC:
                raise DatabaseError("Invalid data type returned from database")
            if data is None:
                blocks[i] = None
            else:
                blocks[i] = CONVERT_FUNC[fields[i].type](data, [False], 1, field_lens[i], precision)[0]
        return tuple(blocks)

    def as_dict(self):
        values = self.as_tuple()
        names = self._result.fields
        dict(zip(names, values))

Methods

def as_dict(self)
Expand source code
def as_dict(self):
    values = self.as_tuple()
    names = self._result.fields
    dict(zip(names, values))
def as_ptr(self)
Expand source code
def as_ptr(self):
    return self._row
def as_tuple(self)
Expand source code
def as_tuple(self):
    precision = self._result.precision
    field_count = self._result.field_count
    blocks = [None] * field_count
    fields = self._result.fields
    field_lens = self._result.field_lengths()
    for i in range(field_count):
        data = ctypes.cast(self._row, ctypes.POINTER(ctypes.c_void_p))[i]
        if fields[i].type not in CONVERT_FUNC:
            raise DatabaseError("Invalid data type returned from database")
        if data is None:
            blocks[i] = None
        else:
            blocks[i] = CONVERT_FUNC[fields[i].type](data, [False], 1, field_lens[i], precision)[0]
    return tuple(blocks)
class TaosRows (result, num_of_rows=None)

TDengine result rows iterator

Expand source code
class TaosRows:
    """TDengine result rows iterator"""

    def __init__(self, result, num_of_rows=None):
        self._result = result
        self._num_of_rows = num_of_rows

    def __iter__(self):
        return self

    def __next__(self):
        return self._next_row()

    def next(self):
        return self._next_row()

    def _next_row(self):
        if self._result is None:
            raise OperationalError("Invalid use of fetch iterator")
        if self._num_of_rows is not None and self._num_of_rows <= self._result._row_count:
            raise StopIteration

        row = taos_fetch_row_raw(self._result._result)
        if not row:
            raise StopIteration
        self._result._row_count += 1
        return TaosRow(self._result, row)

    @property
    def row_count(self):
        """Return the rowcount of the object"""
        return self._result._row_count

Instance variables

var row_count

Return the rowcount of the object

Expand source code
@property
def row_count(self):
    """Return the rowcount of the object"""
    return self._result._row_count

Methods

def next(self)
Expand source code
def next(self):
    return self._next_row()
class TaosStmt (stmt, conn=None)

TDengine STMT interface

Expand source code
class TaosStmt(object):
    """TDengine STMT interface"""

    def __init__(self, stmt, conn = None):
        self._conn = conn
        self._stmt = stmt

    def set_tbname(self, name):
        """Set table name if needed.

        Note that the set_tbname* method should only used in insert statement
        """
        if self._stmt is None:
            raise StatementError("Invalid use of set_tbname")
        taos_stmt_set_tbname(self._stmt, name)

    def prepare(self, sql):
        # type: (str) -> None
        taos_stmt_prepare(self._stmt, sql)

    def set_tbname_tags(self, name, tags):
        # type: (str, Array[TaosBind]) -> None
        """Set table name with tags, tags is array of BindParams"""
        if self._stmt is None:
            raise StatementError("Invalid use of set_tbname")
        taos_stmt_set_tbname_tags(self._stmt, name, tags)

    def bind_param(self, params, add_batch=True):
        # type: (Array[TaosBind], bool) -> None
        if self._stmt is None:
            raise StatementError("Invalid use of stmt")
        taos_stmt_bind_param(self._stmt, params)
        if add_batch:
            taos_stmt_add_batch(self._stmt)

    def bind_param_batch(self, binds, add_batch=True):
        # type: (Array[TaosMultiBind], bool) -> None
        if self._stmt is None:
            raise StatementError("Invalid use of stmt")
        taos_stmt_bind_param_batch(self._stmt, binds)
        if add_batch:
            taos_stmt_add_batch(self._stmt)

    def add_batch(self):
        if self._stmt is None:
            raise StatementError("Invalid use of stmt")
        taos_stmt_add_batch(self._stmt)

    def execute(self):
        if self._stmt is None:
            raise StatementError("Invalid use of execute")
        taos_stmt_execute(self._stmt)

    def use_result(self):
        """NOTE: Don't use a stmt result more than once."""
        result = taos_stmt_use_result(self._stmt)
        return TaosResult(result, True)

    @property
    def affected_rows(self):
        # type: () -> int
        return taos_stmt_affected_rows(self._stmt)

    def close(self):
        """Close stmt."""
        if self._stmt is None:
            return
        taos_stmt_close(self._stmt)
        self._stmt = None

    def __del__(self):
        self.close()

Instance variables

var affected_rows
Expand source code
@property
def affected_rows(self):
    # type: () -> int
    return taos_stmt_affected_rows(self._stmt)

Methods

def add_batch(self)
Expand source code
def add_batch(self):
    if self._stmt is None:
        raise StatementError("Invalid use of stmt")
    taos_stmt_add_batch(self._stmt)
def bind_param(self, params, add_batch=True)
Expand source code
def bind_param(self, params, add_batch=True):
    # type: (Array[TaosBind], bool) -> None
    if self._stmt is None:
        raise StatementError("Invalid use of stmt")
    taos_stmt_bind_param(self._stmt, params)
    if add_batch:
        taos_stmt_add_batch(self._stmt)
def bind_param_batch(self, binds, add_batch=True)
Expand source code
def bind_param_batch(self, binds, add_batch=True):
    # type: (Array[TaosMultiBind], bool) -> None
    if self._stmt is None:
        raise StatementError("Invalid use of stmt")
    taos_stmt_bind_param_batch(self._stmt, binds)
    if add_batch:
        taos_stmt_add_batch(self._stmt)
def close(self)

Close stmt.

Expand source code
def close(self):
    """Close stmt."""
    if self._stmt is None:
        return
    taos_stmt_close(self._stmt)
    self._stmt = None
def execute(self)
Expand source code
def execute(self):
    if self._stmt is None:
        raise StatementError("Invalid use of execute")
    taos_stmt_execute(self._stmt)
def prepare(self, sql)
Expand source code
def prepare(self, sql):
    # type: (str) -> None
    taos_stmt_prepare(self._stmt, sql)
def set_tbname(self, name)

Set table name if needed.

Note that the set_tbname* method should only used in insert statement

Expand source code
def set_tbname(self, name):
    """Set table name if needed.

    Note that the set_tbname* method should only used in insert statement
    """
    if self._stmt is None:
        raise StatementError("Invalid use of set_tbname")
    taos_stmt_set_tbname(self._stmt, name)
def set_tbname_tags(self, name, tags)

Set table name with tags, tags is array of BindParams

Expand source code
def set_tbname_tags(self, name, tags):
    # type: (str, Array[TaosBind]) -> None
    """Set table name with tags, tags is array of BindParams"""
    if self._stmt is None:
        raise StatementError("Invalid use of set_tbname")
    taos_stmt_set_tbname_tags(self._stmt, name, tags)
def use_result(self)

NOTE: Don't use a stmt result more than once.

Expand source code
def use_result(self):
    """NOTE: Don't use a stmt result more than once."""
    result = taos_stmt_use_result(self._stmt)
    return TaosResult(result, True)