Module taos.connection

Expand source code
# encoding:UTF-8
from types import FunctionType
from .cinterface import *
from .cursor import TaosCursor
from .subscription import TaosSubscription
from .statement import TaosStmt
from .result import *


class TaosConnection(object):
    """TDengine connection object"""

    def __init__(self, *args, **kwargs):
        self._conn = None
        self._host = None
        self._user = "root"
        self._password = "taosdata"
        self._database = None
        self._port = 0
        self._config = None
        self._tz = None
        self._init_config(**kwargs)
        self._chandle = CTaosInterface(self._config, self._tz)
        self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)

    def _init_config(self, **kwargs):
        # host
        if "host" in kwargs:
            self._host = kwargs["host"]

        # user
        if "user" in kwargs:
            self._user = kwargs["user"]

        # password
        if "password" in kwargs:
            self._password = kwargs["password"]

        # database
        if "database" in kwargs:
            self._database = kwargs["database"]

        # port
        if "port" in kwargs:
            self._port = kwargs["port"]

        # config
        if "config" in kwargs:
            self._config = kwargs["config"]

        # timezone
        if "timezone" in kwargs:
            self._tz = kwargs["timezone"]

    def close(self):
        """Close current connection."""
        if self._conn:
            taos_close(self._conn)
            self._conn = None

    @property
    def client_info(self):
        # type: () -> str
        return taos_get_client_info()

    @property
    def server_info(self):
        # type: () -> str
        return taos_get_server_info(self._conn)

    def select_db(self, database):
        # type: (str) -> None
        taos_select_db(self._conn, database)

    def execute(self, sql):
        # type: (str) -> int
        """Simplely execute sql ignoring the results"""
        return self.query(sql).affected_rows

    def query(self, sql):
        # type: (str) -> TaosResult
        result = taos_query(self._conn, sql)
        return TaosResult(result, True, self)

    def query_a(self, sql, callback, param):
        # type: (str, async_query_callback_type, c_void_p) -> None
        """Asynchronously query a sql with callback function"""
        taos_query_a(self._conn, sql, callback, param)

    def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
        # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
        """Create a subscription."""
        if self._conn is None:
            return None
        sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
        if not sub:
            errno = taos_errno(c_void_p(None))
            msg = taos_errstr(c_void_p(None))
            raise Error(msg, errno)
        return TaosSubscription(sub, callback != None)

    def statement(self, sql=None):
        # type: (str | None) -> TaosStmt
        if self._conn is None:
            return None
        stmt = taos_stmt_init(self._conn)
        if sql != None:
            taos_stmt_prepare(stmt, sql)

        return TaosStmt(stmt)

    def load_table_info(self, tables):
        # type: (str) -> None
        taos_load_table_info(self._conn, tables)

    def schemaless_insert(self, lines, protocol, precision):
        # type: (list[str], SmlProtocol, SmlPrecision) -> int
        """
        1.Line protocol and schemaless support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
        ]
        conn.schemaless_insert(lines, 0, "ns")
        ```

        2.OpenTSDB telnet style API format support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
        ]
        conn.schemaless_insert(lines, 1, None)
        ```

        3.OpenTSDB HTTP JSON format support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        payload = ['''
        {
            "metric": "cpu_load_0",
            "timestamp": 1626006833610123,
            "value": 55.5,
            "tags":
                {
                    "host": "ubuntu",
                    "interface": "eth0",
                    "Id": "tb0"
                }
        }
        ''']
        conn.schemaless_insert(lines, 2, None)
        ```
        """
        return taos_schemaless_insert(self._conn, lines, protocol, precision)

    def cursor(self):
        # type: () -> TaosCursor
        """Return a new Cursor object using the connection."""
        return TaosCursor(self)

    def commit(self):
        """Commit any pending transaction to the database.

        Since TDengine do not support transactions, the implement is void functionality.
        """
        pass

    def rollback(self):
        """Void functionality"""
        pass

    def clear_result_set(self):
        """Clear unused result set on this connection."""
        pass

    def __del__(self):
        self.close()


if __name__ == "__main__":
    conn = TaosConnection()
    conn.close()

Classes

class TaosConnection (*args, **kwargs)

TDengine connection object

Expand source code
class TaosConnection(object):
    """TDengine connection object"""

    def __init__(self, *args, **kwargs):
        self._conn = None
        self._host = None
        self._user = "root"
        self._password = "taosdata"
        self._database = None
        self._port = 0
        self._config = None
        self._tz = None
        self._init_config(**kwargs)
        self._chandle = CTaosInterface(self._config, self._tz)
        self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)

    def _init_config(self, **kwargs):
        # host
        if "host" in kwargs:
            self._host = kwargs["host"]

        # user
        if "user" in kwargs:
            self._user = kwargs["user"]

        # password
        if "password" in kwargs:
            self._password = kwargs["password"]

        # database
        if "database" in kwargs:
            self._database = kwargs["database"]

        # port
        if "port" in kwargs:
            self._port = kwargs["port"]

        # config
        if "config" in kwargs:
            self._config = kwargs["config"]

        # timezone
        if "timezone" in kwargs:
            self._tz = kwargs["timezone"]

    def close(self):
        """Close current connection."""
        if self._conn:
            taos_close(self._conn)
            self._conn = None

    @property
    def client_info(self):
        # type: () -> str
        return taos_get_client_info()

    @property
    def server_info(self):
        # type: () -> str
        return taos_get_server_info(self._conn)

    def select_db(self, database):
        # type: (str) -> None
        taos_select_db(self._conn, database)

    def execute(self, sql):
        # type: (str) -> int
        """Simplely execute sql ignoring the results"""
        return self.query(sql).affected_rows

    def query(self, sql):
        # type: (str) -> TaosResult
        result = taos_query(self._conn, sql)
        return TaosResult(result, True, self)

    def query_a(self, sql, callback, param):
        # type: (str, async_query_callback_type, c_void_p) -> None
        """Asynchronously query a sql with callback function"""
        taos_query_a(self._conn, sql, callback, param)

    def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
        # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
        """Create a subscription."""
        if self._conn is None:
            return None
        sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
        if not sub:
            errno = taos_errno(c_void_p(None))
            msg = taos_errstr(c_void_p(None))
            raise Error(msg, errno)
        return TaosSubscription(sub, callback != None)

    def statement(self, sql=None):
        # type: (str | None) -> TaosStmt
        if self._conn is None:
            return None
        stmt = taos_stmt_init(self._conn)
        if sql != None:
            taos_stmt_prepare(stmt, sql)

        return TaosStmt(stmt)

    def load_table_info(self, tables):
        # type: (str) -> None
        taos_load_table_info(self._conn, tables)

    def schemaless_insert(self, lines, protocol, precision):
        # type: (list[str], SmlProtocol, SmlPrecision) -> int
        """
        1.Line protocol and schemaless support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
        ]
        conn.schemaless_insert(lines, 0, "ns")
        ```

        2.OpenTSDB telnet style API format support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
        ]
        conn.schemaless_insert(lines, 1, None)
        ```

        3.OpenTSDB HTTP JSON format support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        payload = ['''
        {
            "metric": "cpu_load_0",
            "timestamp": 1626006833610123,
            "value": 55.5,
            "tags":
                {
                    "host": "ubuntu",
                    "interface": "eth0",
                    "Id": "tb0"
                }
        }
        ''']
        conn.schemaless_insert(lines, 2, None)
        ```
        """
        return taos_schemaless_insert(self._conn, lines, protocol, precision)

    def cursor(self):
        # type: () -> TaosCursor
        """Return a new Cursor object using the connection."""
        return TaosCursor(self)

    def commit(self):
        """Commit any pending transaction to the database.

        Since TDengine do not support transactions, the implement is void functionality.
        """
        pass

    def rollback(self):
        """Void functionality"""
        pass

    def clear_result_set(self):
        """Clear unused result set on this connection."""
        pass

    def __del__(self):
        self.close()

Instance variables

var client_info
Expand source code
@property
def client_info(self):
    # type: () -> str
    return taos_get_client_info()
var server_info
Expand source code
@property
def server_info(self):
    # type: () -> str
    return taos_get_server_info(self._conn)

Methods

def clear_result_set(self)

Clear unused result set on this connection.

Expand source code
def clear_result_set(self):
    """Clear unused result set on this connection."""
    pass
def close(self)

Close current connection.

Expand source code
def close(self):
    """Close current connection."""
    if self._conn:
        taos_close(self._conn)
        self._conn = None
def commit(self)

Commit any pending transaction to the database.

Since TDengine do not support transactions, the implement is void functionality.

Expand source code
def commit(self):
    """Commit any pending transaction to the database.

    Since TDengine do not support transactions, the implement is void functionality.
    """
    pass
def cursor(self)

Return a new Cursor object using the connection.

Expand source code
def cursor(self):
    # type: () -> TaosCursor
    """Return a new Cursor object using the connection."""
    return TaosCursor(self)
def execute(self, sql)

Simplely execute sql ignoring the results

Expand source code
def execute(self, sql):
    # type: (str) -> int
    """Simplely execute sql ignoring the results"""
    return self.query(sql).affected_rows
def load_table_info(self, tables)
Expand source code
def load_table_info(self, tables):
    # type: (str) -> None
    taos_load_table_info(self._conn, tables)
def query(self, sql)
Expand source code
def query(self, sql):
    # type: (str) -> TaosResult
    result = taos_query(self._conn, sql)
    return TaosResult(result, True, self)
def query_a(self, sql, callback, param)

Asynchronously query a sql with callback function

Expand source code
def query_a(self, sql, callback, param):
    # type: (str, async_query_callback_type, c_void_p) -> None
    """Asynchronously query a sql with callback function"""
    taos_query_a(self._conn, sql, callback, param)
def rollback(self)

Void functionality

Expand source code
def rollback(self):
    """Void functionality"""
    pass
def schemaless_insert(self, lines, protocol, precision)

1.Line protocol and schemaless support

Example

import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
    'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
]
conn.schemaless_insert(lines, 0, "ns")

2.OpenTSDB telnet style API format support

Example

import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
    'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
]
conn.schemaless_insert(lines, 1, None)

3.OpenTSDB HTTP JSON format support

Example

import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
payload = ['''
{
    "metric": "cpu_load_0",
    "timestamp": 1626006833610123,
    "value": 55.5,
    "tags":
        {
            "host": "ubuntu",
            "interface": "eth0",
            "Id": "tb0"
        }
}
''']
conn.schemaless_insert(lines, 2, None)
Expand source code
def schemaless_insert(self, lines, protocol, precision):
    # type: (list[str], SmlProtocol, SmlPrecision) -> int
    """
    1.Line protocol and schemaless support

    ## Example

    ```python
    import taos
    conn = taos.connect()
    conn.exec("drop database if exists test")
    conn.select_db("test")
    lines = [
        'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
    ]
    conn.schemaless_insert(lines, 0, "ns")
    ```

    2.OpenTSDB telnet style API format support

    ## Example

    ```python
    import taos
    conn = taos.connect()
    conn.exec("drop database if exists test")
    conn.select_db("test")
    lines = [
        'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
    ]
    conn.schemaless_insert(lines, 1, None)
    ```

    3.OpenTSDB HTTP JSON format support

    ## Example

    ```python
    import taos
    conn = taos.connect()
    conn.exec("drop database if exists test")
    conn.select_db("test")
    payload = ['''
    {
        "metric": "cpu_load_0",
        "timestamp": 1626006833610123,
        "value": 55.5,
        "tags":
            {
                "host": "ubuntu",
                "interface": "eth0",
                "Id": "tb0"
            }
    }
    ''']
    conn.schemaless_insert(lines, 2, None)
    ```
    """
    return taos_schemaless_insert(self._conn, lines, protocol, precision)
def select_db(self, database)
Expand source code
def select_db(self, database):
    # type: (str) -> None
    taos_select_db(self._conn, database)
def statement(self, sql=None)
Expand source code
def statement(self, sql=None):
    # type: (str | None) -> TaosStmt
    if self._conn is None:
        return None
    stmt = taos_stmt_init(self._conn)
    if sql != None:
        taos_stmt_prepare(stmt, sql)

    return TaosStmt(stmt)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None)

Create a subscription.

Expand source code
def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
    # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
    """Create a subscription."""
    if self._conn is None:
        return None
    sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
    if not sub:
        errno = taos_errno(c_void_p(None))
        msg = taos_errstr(c_void_p(None))
        raise Error(msg, errno)
    return TaosSubscription(sub, callback != None)