Module taosrest.sqlalchemy
Expand source code
from sqlalchemy.engine import default, reflection
import taosrest
class AlchemyRestConnection:
threadsafety = 0
paramstyle = "pyformat"
Error = taosrest.Error
def connect(self, **kwargs):
host = kwargs["host"] if "host" in kwargs else "localhost"
port = kwargs["port"] if "port" in kwargs else "6041"
user = kwargs["user"] if "user" in kwargs else "root"
password = kwargs["password"] if "password" in kwargs else "taosdata"
url = f"http://{host}:{port}"
return taosrest.connect(url=url, user=user, password=password)
class TaosRestDialect(default.DefaultDialect):
name = "taosrest"
driver = "taosrest"
supports_native_boolean = True
implicit_returning = True
def do_rollback(self, connection):
pass
def _get_server_version_info(self, connection):
return tuple(connection.connection.server_info)
@classmethod
def dbapi(cls):
return AlchemyRestConnection()
def has_schema(self, connection, schema):
return False
def has_table(self, connection, table_name, schema=None):
try:
connection.cursor().execute(f"describe {table_name}")
return True
except:
return False
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
"""
Gets all indexes
"""
# no index is supported by TDengine
return []
def get_columns(self, connection, table_name, schema=None, **kw):
try:
cursor = connection.cursor()
cursor.execute("describe {}" % table_name)
return [row[0] for row in cursor.fetchall()]
except:
return []
Classes
class AlchemyRestConnection-
Expand source code
class AlchemyRestConnection: threadsafety = 0 paramstyle = "pyformat" Error = taosrest.Error def connect(self, **kwargs): host = kwargs["host"] if "host" in kwargs else "localhost" port = kwargs["port"] if "port" in kwargs else "6041" user = kwargs["user"] if "user" in kwargs else "root" password = kwargs["password"] if "password" in kwargs else "taosdata" url = f"http://{host}:{port}" return taosrest.connect(url=url, user=user, password=password)Class variables
var Error-
Common base class for all non-exit exceptions.
var paramstylevar threadsafety
Methods
def connect(self, **kwargs)-
Expand source code
def connect(self, **kwargs): host = kwargs["host"] if "host" in kwargs else "localhost" port = kwargs["port"] if "port" in kwargs else "6041" user = kwargs["user"] if "user" in kwargs else "root" password = kwargs["password"] if "password" in kwargs else "taosdata" url = f"http://{host}:{port}" return taosrest.connect(url=url, user=user, password=password)
class TaosRestDialect (convert_unicode=False, encoding='utf-8', paramstyle=None, dbapi=None, implicit_returning=None, case_sensitive=True, supports_native_boolean=None, max_identifier_length=None, label_length=None, compiler_linting=0, server_side_cursors=False, **kwargs)-
Default implementation of Dialect
Expand source code
class TaosRestDialect(default.DefaultDialect): name = "taosrest" driver = "taosrest" supports_native_boolean = True implicit_returning = True def do_rollback(self, connection): pass def _get_server_version_info(self, connection): return tuple(connection.connection.server_info) @classmethod def dbapi(cls): return AlchemyRestConnection() def has_schema(self, connection, schema): return False def has_table(self, connection, table_name, schema=None): try: connection.cursor().execute(f"describe {table_name}") return True except: return False @reflection.cache def get_indexes(self, connection, table_name, schema=None, **kw): """ Gets all indexes """ # no index is supported by TDengine return [] def get_columns(self, connection, table_name, schema=None, **kw): try: cursor = connection.cursor() cursor.execute("describe {}" % table_name) return [row[0] for row in cursor.fetchall()] except: return []Ancestors
- sqlalchemy.engine.default.DefaultDialect
- sqlalchemy.engine.interfaces.Dialect
Class variables
var drivervar implicit_returningvar namevar supports_native_boolean
Static methods
def dbapi()-
Expand source code
@classmethod def dbapi(cls): return AlchemyRestConnection()
Methods
def do_rollback(self, connection)-
Provide an implementation of
connection.rollback(), given a DB-API connection.:param dbapi_connection: a DBAPI connection, typically proxied within a :class:
.ConnectionFairy.Expand source code
def do_rollback(self, connection): pass def get_columns(self, connection, table_name, schema=None, **kw)-
Return information about columns in
table_name.Given a :class:
_engine.Connection, a stringtable_name, and an optional stringschema, return column information as a list of dictionaries with these keys:name the column's name
type [sqlalchemy.types#TypeEngine]
nullable boolean
default the column's default value
autoincrement boolean
sequence a dictionary of the form {'name' : str, 'start' :int, 'increment': int, 'minvalue': int, 'maxvalue': int, 'nominvalue': bool, 'nomaxvalue': bool, 'cycle': bool, 'cache': int, 'order': bool}
Additional column attributes may be present.
Expand source code
def get_columns(self, connection, table_name, schema=None, **kw): try: cursor = connection.cursor() cursor.execute("describe {}" % table_name) return [row[0] for row in cursor.fetchall()] except: return [] def get_indexes(self, connection, table_name, schema=None, **kw)-
Gets all indexes
Expand source code
@reflection.cache def get_indexes(self, connection, table_name, schema=None, **kw): """ Gets all indexes """ # no index is supported by TDengine return [] def has_schema(self, connection, schema)-
Expand source code
def has_schema(self, connection, schema): return False def has_table(self, connection, table_name, schema=None)-
For internal dialect use, check the existence of a particular table in the database.
Given a :class:
_engine.Connectionobject, a string table_name and optional schema name, return True if the given table exists in the database, False otherwise.This method serves as the underlying implementation of the public facing :meth:
.Inspector.has_tablemethod, and is also used internally to implement the "checkfirst" behavior for methods like :meth:_schema.Table.createand :meth:_schema.MetaData.create_all.Note: This method is used internally by SQLAlchemy, and is
published so that third-party dialects may provide an implementation. It is not the public API for checking for table presence. Please use the :meth:
.Inspector.has_tablemethod. Alternatively, for legacy cross-compatibility, the :meth:_engine.Engine.has_tablemethod may be used.Expand source code
def has_table(self, connection, table_name, schema=None): try: connection.cursor().execute(f"describe {table_name}") return True except: return False