Module taosrest.connection
Expand source code
from typing import List, Dict
from .errors import NotSupportedError
from .cursor import TaosRestCursor
from .restclient import RestClient
class Result:
def __init__(self, resp: dict):
self.status: str = resp["status"]
self.head: List = resp["head"]
self.column_meta: List[list] = resp["column_meta"]
self.data: List[list] = resp["data"]
self.rows: int = resp["rows"]
@property
def field_count(self):
return len(self.head)
@property
def fields(self) -> List[Dict]:
"""
return a list of column meta dict which contains three keys:
- name: column name
- type: column type code
- bytes: data length in bytes
for more information about column meta, refer https://docs.tdengine.com/2.4/reference/rest-api/#http-return-format
"""
return list(map(lambda meta: {"name": meta[0], "type": meta[1], "bytes": meta[2]}, self.column_meta))
def __iter__(self):
return self.data.__iter__()
class TaosRestConnection:
"""
Implement [PEP 249 Connection API](https://peps.python.org/pep-0249/#connection-objects)
"""
def __init__(self, **kwargs):
"""
Keyword Arguments
----------------------------
- url : str, optional.
url to connect
- token : str, optional
cloud service token
- user : str, optional.
username used to log in
- password : str, optional.
password used to log in
- timeout : int, optional.
the optional timeout parameter specifies a timeout in seconds for blocking operations
"""
self._url = kwargs["url"] if "url" in kwargs else "http://localhost:6041"
self._user = kwargs["user"] if "user" in kwargs else "root"
self._password = kwargs["password"] if "password" in kwargs else "taosdata"
self._timeout = kwargs["timeout"] if "timeout" in kwargs else None
self._token = kwargs["token"] if "token" in kwargs else None
self._c = RestClient(self._url, token=self._token, user=self._user, password=self._password, timeout=self._timeout)
def close(self):
pass
def commit(self):
pass
def rollback(self):
raise NotSupportedError()
def cursor(self):
return TaosRestCursor(self._c)
############################################################################
# Methods bellow are not PEP249 specified.
# Add them for giving a similar programming experience as taos.Connection.
############################################################################
@property
def server_info(self):
resp = self._c.sql("select server_version()")
return resp["data"][0][0]
def execute(self, sql):
"""
execute sql usually INSERT statement and return affected row count.
If there is not a column named "affected_rows" in response, then None is returned.
"""
resp = self._c.sql(sql)
if resp["head"] == ['affected_rows']:
return resp["data"][0][0]
else:
return None
def query(self, sql) -> Result:
"""
execute sql and wrap the http response as Result object.
"""
resp = self._c.sql(sql)
return Result(resp)
Classes
class Result (resp: dict)
-
Expand source code
class Result: def __init__(self, resp: dict): self.status: str = resp["status"] self.head: List = resp["head"] self.column_meta: List[list] = resp["column_meta"] self.data: List[list] = resp["data"] self.rows: int = resp["rows"] @property def field_count(self): return len(self.head) @property def fields(self) -> List[Dict]: """ return a list of column meta dict which contains three keys: - name: column name - type: column type code - bytes: data length in bytes for more information about column meta, refer https://docs.tdengine.com/2.4/reference/rest-api/#http-return-format """ return list(map(lambda meta: {"name": meta[0], "type": meta[1], "bytes": meta[2]}, self.column_meta)) def __iter__(self): return self.data.__iter__()
Instance variables
var field_count
-
Expand source code
@property def field_count(self): return len(self.head)
var fields : List[Dict[~KT, ~VT]]
-
return a list of column meta dict which contains three keys: - name: column name - type: column type code - bytes: data length in bytes for more information about column meta, refer https://docs.tdengine.com/2.4/reference/rest-api/#http-return-format
Expand source code
@property def fields(self) -> List[Dict]: """ return a list of column meta dict which contains three keys: - name: column name - type: column type code - bytes: data length in bytes for more information about column meta, refer https://docs.tdengine.com/2.4/reference/rest-api/#http-return-format """ return list(map(lambda meta: {"name": meta[0], "type": meta[1], "bytes": meta[2]}, self.column_meta))
class TaosRestConnection (**kwargs)
-
Implement PEP 249 Connection API
Keyword Arguments
- url : str, optional. url to connect
- token : str, optional cloud service token
- user : str, optional. username used to log in
- password : str, optional. password used to log in
- timeout : int, optional. the optional timeout parameter specifies a timeout in seconds for blocking operations
Expand source code
class TaosRestConnection: """ Implement [PEP 249 Connection API](https://peps.python.org/pep-0249/#connection-objects) """ def __init__(self, **kwargs): """ Keyword Arguments ---------------------------- - url : str, optional. url to connect - token : str, optional cloud service token - user : str, optional. username used to log in - password : str, optional. password used to log in - timeout : int, optional. the optional timeout parameter specifies a timeout in seconds for blocking operations """ self._url = kwargs["url"] if "url" in kwargs else "http://localhost:6041" self._user = kwargs["user"] if "user" in kwargs else "root" self._password = kwargs["password"] if "password" in kwargs else "taosdata" self._timeout = kwargs["timeout"] if "timeout" in kwargs else None self._token = kwargs["token"] if "token" in kwargs else None self._c = RestClient(self._url, token=self._token, user=self._user, password=self._password, timeout=self._timeout) def close(self): pass def commit(self): pass def rollback(self): raise NotSupportedError() def cursor(self): return TaosRestCursor(self._c) ############################################################################ # Methods bellow are not PEP249 specified. # Add them for giving a similar programming experience as taos.Connection. ############################################################################ @property def server_info(self): resp = self._c.sql("select server_version()") return resp["data"][0][0] def execute(self, sql): """ execute sql usually INSERT statement and return affected row count. If there is not a column named "affected_rows" in response, then None is returned. """ resp = self._c.sql(sql) if resp["head"] == ['affected_rows']: return resp["data"][0][0] else: return None def query(self, sql) -> Result: """ execute sql and wrap the http response as Result object. """ resp = self._c.sql(sql) return Result(resp)
Instance variables
var server_info
-
Expand source code
@property def server_info(self): resp = self._c.sql("select server_version()") return resp["data"][0][0]
Methods
def close(self)
-
Expand source code
def close(self): pass
def commit(self)
-
Expand source code
def commit(self): pass
def cursor(self)
-
Expand source code
def cursor(self): return TaosRestCursor(self._c)
def execute(self, sql)
-
execute sql usually INSERT statement and return affected row count. If there is not a column named "affected_rows" in response, then None is returned.
Expand source code
def execute(self, sql): """ execute sql usually INSERT statement and return affected row count. If there is not a column named "affected_rows" in response, then None is returned. """ resp = self._c.sql(sql) if resp["head"] == ['affected_rows']: return resp["data"][0][0] else: return None
def query(self, sql) ‑> Result
-
execute sql and wrap the http response as Result object.
Expand source code
def query(self, sql) -> Result: """ execute sql and wrap the http response as Result object. """ resp = self._c.sql(sql) return Result(resp)
def rollback(self)
-
Expand source code
def rollback(self): raise NotSupportedError()