Module taos.connection
Expand source code
# encoding:UTF-8
from types import FunctionType
from .cinterface import *
from .cursor import TaosCursor
from .subscription import TaosSubscription
from .statement import TaosStmt
from .result import *
class TaosConnection(object):
"""TDengine connection object"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._tz = None
self._init_config(**kwargs)
self._chandle = CTaosInterface(self._config, self._tz)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
def _init_config(self, **kwargs):
# host
if "host" in kwargs:
self._host = kwargs["host"]
# user
if "user" in kwargs:
self._user = kwargs["user"]
# password
if "password" in kwargs:
self._password = kwargs["password"]
# database
if "database" in kwargs:
self._database = kwargs["database"]
# port
if "port" in kwargs:
self._port = kwargs["port"]
# config
if "config" in kwargs:
self._config = kwargs["config"]
# timezone
if "timezone" in kwargs:
self._tz = kwargs["timezone"]
def close(self):
"""Close current connection."""
if self._conn:
taos_close(self._conn)
self._conn = None
@property
def client_info(self):
# type: () -> str
return taos_get_client_info()
@property
def server_info(self):
# type: () -> str
return taos_get_server_info(self._conn)
def select_db(self, database):
# type: (str) -> None
taos_select_db(self._conn, database)
def execute(self, sql):
# type: (str) -> int
"""Simplely execute sql ignoring the results"""
return self.query(sql).affected_rows
def query(self, sql):
# type: (str) -> TaosResult
result = taos_query(self._conn, sql)
return TaosResult(result, True, self)
def query_a(self, sql, callback, param):
# type: (str, async_query_callback_type, c_void_p) -> None
"""Asynchronously query a sql with callback function"""
taos_query_a(self._conn, sql, callback, param)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
# type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
"""Create a subscription."""
if self._conn is None:
return None
sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
if not sub:
errno = taos_errno(c_void_p(None))
msg = taos_errstr(c_void_p(None))
raise Error(msg, errno)
return TaosSubscription(sub, callback != None)
def statement(self, sql=None):
# type: (str | None) -> TaosStmt
if self._conn is None:
return None
stmt = taos_stmt_init(self._conn)
if sql != None:
taos_stmt_prepare(stmt, sql)
return TaosStmt(stmt)
def load_table_info(self, tables):
# type: (str) -> None
taos_load_table_info(self._conn, tables)
def schemaless_insert(self, lines, protocol, precision):
# type: (list[str], SmlProtocol, SmlPrecision) -> int
"""
1.Line protocol and schemaless support
## Example
```python
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
]
conn.schemaless_insert(lines, 0, "ns")
```
2.OpenTSDB telnet style API format support
## Example
```python
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
]
conn.schemaless_insert(lines, 1, None)
```
3.OpenTSDB HTTP JSON format support
## Example
```python
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
payload = ['''
{
"metric": "cpu_load_0",
"timestamp": 1626006833610123,
"value": 55.5,
"tags":
{
"host": "ubuntu",
"interface": "eth0",
"Id": "tb0"
}
}
''']
conn.schemaless_insert(lines, 2, None)
```
"""
return taos_schemaless_insert(self._conn, lines, protocol, precision)
def cursor(self):
# type: () -> TaosCursor
"""Return a new Cursor object using the connection."""
return TaosCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection."""
pass
def __del__(self):
self.close()
if __name__ == "__main__":
conn = TaosConnection()
conn.close()
Classes
class TaosConnection (*args, **kwargs)
-
TDengine connection object
Expand source code
class TaosConnection(object): """TDengine connection object""" def __init__(self, *args, **kwargs): self._conn = None self._host = None self._user = "root" self._password = "taosdata" self._database = None self._port = 0 self._config = None self._tz = None self._init_config(**kwargs) self._chandle = CTaosInterface(self._config, self._tz) self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port) def _init_config(self, **kwargs): # host if "host" in kwargs: self._host = kwargs["host"] # user if "user" in kwargs: self._user = kwargs["user"] # password if "password" in kwargs: self._password = kwargs["password"] # database if "database" in kwargs: self._database = kwargs["database"] # port if "port" in kwargs: self._port = kwargs["port"] # config if "config" in kwargs: self._config = kwargs["config"] # timezone if "timezone" in kwargs: self._tz = kwargs["timezone"] def close(self): """Close current connection.""" if self._conn: taos_close(self._conn) self._conn = None @property def client_info(self): # type: () -> str return taos_get_client_info() @property def server_info(self): # type: () -> str return taos_get_server_info(self._conn) def select_db(self, database): # type: (str) -> None taos_select_db(self._conn, database) def execute(self, sql): # type: (str) -> int """Simplely execute sql ignoring the results""" return self.query(sql).affected_rows def query(self, sql): # type: (str) -> TaosResult result = taos_query(self._conn, sql) return TaosResult(result, True, self) def query_a(self, sql, callback, param): # type: (str, async_query_callback_type, c_void_p) -> None """Asynchronously query a sql with callback function""" taos_query_a(self._conn, sql, callback, param) def subscribe(self, restart, topic, sql, interval, callback=None, param=None): # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription """Create a subscription.""" if self._conn is None: return None sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param) if not sub: errno = taos_errno(c_void_p(None)) msg = taos_errstr(c_void_p(None)) raise Error(msg, errno) return TaosSubscription(sub, callback != None) def statement(self, sql=None): # type: (str | None) -> TaosStmt if self._conn is None: return None stmt = taos_stmt_init(self._conn) if sql != None: taos_stmt_prepare(stmt, sql) return TaosStmt(stmt) def load_table_info(self, tables): # type: (str) -> None taos_load_table_info(self._conn, tables) def schemaless_insert(self, lines, protocol, precision): # type: (list[str], SmlProtocol, SmlPrecision) -> int """ 1.Line protocol and schemaless support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532', ] conn.schemaless_insert(lines, 0, "ns") ``` 2.OpenTSDB telnet style API format support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"', ] conn.schemaless_insert(lines, 1, None) ``` 3.OpenTSDB HTTP JSON format support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") payload = [''' { "metric": "cpu_load_0", "timestamp": 1626006833610123, "value": 55.5, "tags": { "host": "ubuntu", "interface": "eth0", "Id": "tb0" } } '''] conn.schemaless_insert(lines, 2, None) ``` """ return taos_schemaless_insert(self._conn, lines, protocol, precision) def cursor(self): # type: () -> TaosCursor """Return a new Cursor object using the connection.""" return TaosCursor(self) def commit(self): """Commit any pending transaction to the database. Since TDengine do not support transactions, the implement is void functionality. """ pass def rollback(self): """Void functionality""" pass def clear_result_set(self): """Clear unused result set on this connection.""" pass def __del__(self): self.close()
Instance variables
var client_info
-
Expand source code
@property def client_info(self): # type: () -> str return taos_get_client_info()
var server_info
-
Expand source code
@property def server_info(self): # type: () -> str return taos_get_server_info(self._conn)
Methods
def clear_result_set(self)
-
Clear unused result set on this connection.
Expand source code
def clear_result_set(self): """Clear unused result set on this connection.""" pass
def close(self)
-
Close current connection.
Expand source code
def close(self): """Close current connection.""" if self._conn: taos_close(self._conn) self._conn = None
def commit(self)
-
Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
Expand source code
def commit(self): """Commit any pending transaction to the database. Since TDengine do not support transactions, the implement is void functionality. """ pass
def cursor(self)
-
Return a new Cursor object using the connection.
Expand source code
def cursor(self): # type: () -> TaosCursor """Return a new Cursor object using the connection.""" return TaosCursor(self)
def execute(self, sql)
-
Simplely execute sql ignoring the results
Expand source code
def execute(self, sql): # type: (str) -> int """Simplely execute sql ignoring the results""" return self.query(sql).affected_rows
def load_table_info(self, tables)
-
Expand source code
def load_table_info(self, tables): # type: (str) -> None taos_load_table_info(self._conn, tables)
def query(self, sql)
-
Expand source code
def query(self, sql): # type: (str) -> TaosResult result = taos_query(self._conn, sql) return TaosResult(result, True, self)
def query_a(self, sql, callback, param)
-
Asynchronously query a sql with callback function
Expand source code
def query_a(self, sql, callback, param): # type: (str, async_query_callback_type, c_void_p) -> None """Asynchronously query a sql with callback function""" taos_query_a(self._conn, sql, callback, param)
def rollback(self)
-
Void functionality
Expand source code
def rollback(self): """Void functionality""" pass
def schemaless_insert(self, lines, protocol, precision)
-
1.Line protocol and schemaless support
Example
import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532', ] conn.schemaless_insert(lines, 0, "ns")
2.OpenTSDB telnet style API format support
Example
import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"', ] conn.schemaless_insert(lines, 1, None)
3.OpenTSDB HTTP JSON format support
Example
import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") payload = [''' { "metric": "cpu_load_0", "timestamp": 1626006833610123, "value": 55.5, "tags": { "host": "ubuntu", "interface": "eth0", "Id": "tb0" } } '''] conn.schemaless_insert(lines, 2, None)
Expand source code
def schemaless_insert(self, lines, protocol, precision): # type: (list[str], SmlProtocol, SmlPrecision) -> int """ 1.Line protocol and schemaless support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532', ] conn.schemaless_insert(lines, 0, "ns") ``` 2.OpenTSDB telnet style API format support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") lines = [ 'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"', ] conn.schemaless_insert(lines, 1, None) ``` 3.OpenTSDB HTTP JSON format support ## Example ```python import taos conn = taos.connect() conn.exec("drop database if exists test") conn.select_db("test") payload = [''' { "metric": "cpu_load_0", "timestamp": 1626006833610123, "value": 55.5, "tags": { "host": "ubuntu", "interface": "eth0", "Id": "tb0" } } '''] conn.schemaless_insert(lines, 2, None) ``` """ return taos_schemaless_insert(self._conn, lines, protocol, precision)
def select_db(self, database)
-
Expand source code
def select_db(self, database): # type: (str) -> None taos_select_db(self._conn, database)
def statement(self, sql=None)
-
Expand source code
def statement(self, sql=None): # type: (str | None) -> TaosStmt if self._conn is None: return None stmt = taos_stmt_init(self._conn) if sql != None: taos_stmt_prepare(stmt, sql) return TaosStmt(stmt)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None)
-
Create a subscription.
Expand source code
def subscribe(self, restart, topic, sql, interval, callback=None, param=None): # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription """Create a subscription.""" if self._conn is None: return None sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param) if not sub: errno = taos_errno(c_void_p(None)) msg = taos_errstr(c_void_p(None)) raise Error(msg, errno) return TaosSubscription(sub, callback != None)