Module taosrest.sqlalchemy

Expand source code
from sqlalchemy.engine import default, reflection

import taosrest


class AlchemyRestConnection:
    threadsafety = 0
    paramstyle = "pyformat"
    Error = taosrest.Error

    def connect(self, **kwargs):
        host = kwargs["host"] if "host" in kwargs else "localhost"
        port = kwargs["port"] if "port" in kwargs else "6041"
        user = kwargs["user"] if "user" in kwargs else "root"
        password = kwargs["password"] if "password" in kwargs else "taosdata"
        url = f"http://{host}:{port}"
        return taosrest.connect(url=url, user=user, password=password)


class TaosRestDialect(default.DefaultDialect):
    name = "taosrest"
    driver = "taosrest"
    supports_native_boolean = True
    implicit_returning = True

    def do_rollback(self, connection):
        pass

    def _get_server_version_info(self, connection):
        return tuple(connection.connection.server_info)

    @classmethod
    def dbapi(cls):
        return AlchemyRestConnection()

    def has_schema(self, connection, schema):
        return False

    def has_table(self, connection, table_name, schema=None):
        try:
            connection.cursor().execute(f"describe {table_name}")
            return True
        except:
            return False

    @reflection.cache
    def get_indexes(self, connection, table_name, schema=None, **kw):
        """
        Gets all indexes
        """
        # no index is supported by TDengine
        return []

    def get_columns(self, connection, table_name, schema=None, **kw):
        try:
            cursor = connection.cursor()
            cursor.execute("describe {}" % table_name)
            return [row[0] for row in cursor.fetchall()]
        except:
            return []

Classes

class AlchemyRestConnection
Expand source code
class AlchemyRestConnection:
    threadsafety = 0
    paramstyle = "pyformat"
    Error = taosrest.Error

    def connect(self, **kwargs):
        host = kwargs["host"] if "host" in kwargs else "localhost"
        port = kwargs["port"] if "port" in kwargs else "6041"
        user = kwargs["user"] if "user" in kwargs else "root"
        password = kwargs["password"] if "password" in kwargs else "taosdata"
        url = f"http://{host}:{port}"
        return taosrest.connect(url=url, user=user, password=password)

Class variables

var Error

Common base class for all non-exit exceptions.

var paramstyle
var threadsafety

Methods

def connect(self, **kwargs)
Expand source code
def connect(self, **kwargs):
    host = kwargs["host"] if "host" in kwargs else "localhost"
    port = kwargs["port"] if "port" in kwargs else "6041"
    user = kwargs["user"] if "user" in kwargs else "root"
    password = kwargs["password"] if "password" in kwargs else "taosdata"
    url = f"http://{host}:{port}"
    return taosrest.connect(url=url, user=user, password=password)
class TaosRestDialect (convert_unicode=False, encoding='utf-8', paramstyle=None, dbapi=None, implicit_returning=None, case_sensitive=True, supports_native_boolean=None, max_identifier_length=None, label_length=None, compiler_linting=0, server_side_cursors=False, **kwargs)

Default implementation of Dialect

Expand source code
class TaosRestDialect(default.DefaultDialect):
    name = "taosrest"
    driver = "taosrest"
    supports_native_boolean = True
    implicit_returning = True

    def do_rollback(self, connection):
        pass

    def _get_server_version_info(self, connection):
        return tuple(connection.connection.server_info)

    @classmethod
    def dbapi(cls):
        return AlchemyRestConnection()

    def has_schema(self, connection, schema):
        return False

    def has_table(self, connection, table_name, schema=None):
        try:
            connection.cursor().execute(f"describe {table_name}")
            return True
        except:
            return False

    @reflection.cache
    def get_indexes(self, connection, table_name, schema=None, **kw):
        """
        Gets all indexes
        """
        # no index is supported by TDengine
        return []

    def get_columns(self, connection, table_name, schema=None, **kw):
        try:
            cursor = connection.cursor()
            cursor.execute("describe {}" % table_name)
            return [row[0] for row in cursor.fetchall()]
        except:
            return []

Ancestors

  • sqlalchemy.engine.default.DefaultDialect
  • sqlalchemy.engine.interfaces.Dialect

Class variables

var driver
var implicit_returning
var name
var supports_native_boolean

Static methods

def dbapi()
Expand source code
@classmethod
def dbapi(cls):
    return AlchemyRestConnection()

Methods

def do_rollback(self, connection)

Provide an implementation of connection.rollback(), given a DB-API connection.

:param dbapi_connection: a DBAPI connection, typically proxied within a :class:.ConnectionFairy.

Expand source code
def do_rollback(self, connection):
    pass
def get_columns(self, connection, table_name, schema=None, **kw)

Return information about columns in table_name.

Given a :class:_engine.Connection, a string table_name, and an optional string schema, return column information as a list of dictionaries with these keys:

name the column's name

type [sqlalchemy.types#TypeEngine]

nullable boolean

default the column's default value

autoincrement boolean

sequence a dictionary of the form {'name' : str, 'start' :int, 'increment': int, 'minvalue': int, 'maxvalue': int, 'nominvalue': bool, 'nomaxvalue': bool, 'cycle': bool, 'cache': int, 'order': bool}

Additional column attributes may be present.

Expand source code
def get_columns(self, connection, table_name, schema=None, **kw):
    try:
        cursor = connection.cursor()
        cursor.execute("describe {}" % table_name)
        return [row[0] for row in cursor.fetchall()]
    except:
        return []
def get_indexes(self, connection, table_name, schema=None, **kw)

Gets all indexes

Expand source code
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
    """
    Gets all indexes
    """
    # no index is supported by TDengine
    return []
def has_schema(self, connection, schema)
Expand source code
def has_schema(self, connection, schema):
    return False
def has_table(self, connection, table_name, schema=None)

For internal dialect use, check the existence of a particular table in the database.

Given a :class:_engine.Connection object, a string table_name and optional schema name, return True if the given table exists in the database, False otherwise.

This method serves as the underlying implementation of the public facing :meth:.Inspector.has_table method, and is also used internally to implement the "checkfirst" behavior for methods like :meth:_schema.Table.create and :meth:_schema.MetaData.create_all.

Note: This method is used internally by SQLAlchemy, and is

published so that third-party dialects may provide an implementation. It is not the public API for checking for table presence. Please use the :meth:.Inspector.has_table method. Alternatively, for legacy cross-compatibility, the :meth:_engine.Engine.has_table method may be used.

Expand source code
def has_table(self, connection, table_name, schema=None):
    try:
        connection.cursor().execute(f"describe {table_name}")
        return True
    except:
        return False