Module taos.cursor
Expand source code
# encoding:UTF-8
from .cinterface import taos_fetch_fields
from .error import *
from .constants import FieldType
from .result import *
class TaosCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mandatory)
> type_code (mandatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
return self._taos_next()
def next(self):
return self._taos_next()
def _taos_next(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = taos_fetch_row(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object."""
return self._description
@property
def rowcount(self):
"""
For INSERT statement, rowcount is assigned immediately after execute the statement.
For SELECT statement, rowcount will not get correct value until fetched all data.
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the rowcount of insertion"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def log(self, logfile):
self._logfile = logfile
def close(self):
"""Close the cursor."""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
# type: (str, Any) -> int
"""Prepare and execute a database operation (query or command)."""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid race condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = taos_query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if self._logfile:
with open(self._logfile, "a", encoding="utf-8") as logfile:
logfile.write("%s;\n" % operation)
if taos_field_count(self._result) == 0:
affected_rows = taos_affected_rows(self._result)
self._affected_rows = affected_rows
self._rowcount = affected_rows
return affected_rows
else:
self._fields = taos_fetch_fields(self._result)
return self._handle_result()
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters."""
self.execute(operation)
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available."""
pass
def fetchmany(self):
pass
def istype(self, col, dataType):
if dataType.upper() == "BOOL":
if self._description[col][1] == FieldType.C_BOOL:
return True
if dataType.upper() == "TINYINT":
if self._description[col][1] == FieldType.C_TINYINT:
return True
if dataType.upper() == "TINYINT UNSIGNED":
if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED:
return True
if dataType.upper() == "SMALLINT":
if self._description[col][1] == FieldType.C_SMALLINT:
return True
if dataType.upper() == "SMALLINT UNSIGNED":
if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED:
return True
if dataType.upper() == "INT":
if self._description[col][1] == FieldType.C_INT:
return True
if dataType.upper() == "INT UNSIGNED":
if self._description[col][1] == FieldType.C_INT_UNSIGNED:
return True
if dataType.upper() == "BIGINT":
if self._description[col][1] == FieldType.C_BIGINT:
return True
if dataType.upper() == "BIGINT UNSIGNED":
if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED:
return True
if dataType.upper() == "FLOAT":
if self._description[col][1] == FieldType.C_FLOAT:
return True
if dataType.upper() == "DOUBLE":
if self._description[col][1] == FieldType.C_DOUBLE:
return True
if dataType.upper() == "BINARY":
if self._description[col][1] == FieldType.C_BINARY:
return True
if dataType.upper() == "TIMESTAMP":
if self._description[col][1] == FieldType.C_TIMESTAMP:
return True
if dataType.upper() == "NCHAR":
if self._description[col][1] == FieldType.C_NCHAR:
return True
if dataType.upper() == "JSON":
if self._description[col][1] == FieldType.C_JSON:
return True
return False
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation."""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_rows = taos_fetch_row(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_rows == 0:
break
self._rowcount += num_of_rows
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None:
raise OperationalError("Invalid use of fetchall")
fields = self._fields if self._fields is not None else taos_fetch_fields(
self._result)
buffer = [[] for i in range(len(fields))]
self._rowcount = 0
while True:
block, num_of_rows = taos_fetch_block(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_rows == 0:
break
self._rowcount += num_of_rows
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def stop_query(self):
if self._result is not None:
taos_stop_query(self._result)
def nextset(self):
""" """
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version."""
self._description = []
self._rowcount = -1
if self._result is not None:
taos_free_result(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query."""
self._description = []
for ele in self._fields:
self._description.append(
(ele["name"], ele["type"], None, None, None, None, False))
return self._result
def __del__(self):
self.close()
Classes
class TaosCursor (connection=None)
-
Database cursor which is used to manage the context of a fetch operation.
Attributes
.description: Read-only attribute consists of 7-item sequences:
> name (mandatory) > type_code (mandatory) > display_size > internal_size > precision > scale > null_ok This attribute will be None for operations that do not return rows or if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last .execute*() produced (for DQL statements like SELECT) or affected
Expand source code
class TaosCursor(object): """Database cursor which is used to manage the context of a fetch operation. Attributes: .description: Read-only attribute consists of 7-item sequences: > name (mandatory) > type_code (mandatory) > display_size > internal_size > precision > scale > null_ok This attribute will be None for operations that do not return rows or if the cursor has not had an operation invoked via the .execute*() method yet. .rowcount:This read-only attribute specifies the number of rows that the last .execute*() produced (for DQL statements like SELECT) or affected """ def __init__(self, connection=None): self._description = [] self._rowcount = -1 self._connection = None self._result = None self._fields = None self._block = None self._block_rows = -1 self._block_iter = 0 self._affected_rows = 0 self._logfile = "" if connection is not None: self._connection = connection def __iter__(self): return self def __next__(self): return self._taos_next() def next(self): return self._taos_next() def _taos_next(self): if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetch iterator") if self._block_rows <= self._block_iter: block, self._block_rows = taos_fetch_row( self._result, self._fields) if self._block_rows == 0: raise StopIteration self._block = list(map(tuple, zip(*block))) self._block_iter = 0 data = self._block[self._block_iter] self._block_iter += 1 return data @property def description(self): """Return the description of the object.""" return self._description @property def rowcount(self): """ For INSERT statement, rowcount is assigned immediately after execute the statement. For SELECT statement, rowcount will not get correct value until fetched all data. """ return self._rowcount @property def affected_rows(self): """Return the rowcount of insertion""" return self._affected_rows def callproc(self, procname, *args): """Call a stored database procedure with the given name. Void functionality since no stored procedures. """ pass def log(self, logfile): self._logfile = logfile def close(self): """Close the cursor.""" if self._connection is None: return False self._reset_result() self._connection = None return True def execute(self, operation, params=None): # type: (str, Any) -> int """Prepare and execute a database operation (query or command).""" if not operation: return None if not self._connection: # TODO : change the exception raised here raise ProgrammingError("Cursor is not connected") self._reset_result() stmt = operation if params is not None: pass # global querySeqNum # querySeqNum += 1 # localSeqNum = querySeqNum # avoid race condition # print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt))) self._result = taos_query(self._connection._conn, stmt) # print(" << Query ({}) Exec Done".format(localSeqNum)) if self._logfile: with open(self._logfile, "a", encoding="utf-8") as logfile: logfile.write("%s;\n" % operation) if taos_field_count(self._result) == 0: affected_rows = taos_affected_rows(self._result) self._affected_rows = affected_rows self._rowcount = affected_rows return affected_rows else: self._fields = taos_fetch_fields(self._result) return self._handle_result() def executemany(self, operation, seq_of_parameters): """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.""" self.execute(operation) pass def fetchone(self): """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.""" pass def fetchmany(self): pass def istype(self, col, dataType): if dataType.upper() == "BOOL": if self._description[col][1] == FieldType.C_BOOL: return True if dataType.upper() == "TINYINT": if self._description[col][1] == FieldType.C_TINYINT: return True if dataType.upper() == "TINYINT UNSIGNED": if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED: return True if dataType.upper() == "SMALLINT": if self._description[col][1] == FieldType.C_SMALLINT: return True if dataType.upper() == "SMALLINT UNSIGNED": if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED: return True if dataType.upper() == "INT": if self._description[col][1] == FieldType.C_INT: return True if dataType.upper() == "INT UNSIGNED": if self._description[col][1] == FieldType.C_INT_UNSIGNED: return True if dataType.upper() == "BIGINT": if self._description[col][1] == FieldType.C_BIGINT: return True if dataType.upper() == "BIGINT UNSIGNED": if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED: return True if dataType.upper() == "FLOAT": if self._description[col][1] == FieldType.C_FLOAT: return True if dataType.upper() == "DOUBLE": if self._description[col][1] == FieldType.C_DOUBLE: return True if dataType.upper() == "BINARY": if self._description[col][1] == FieldType.C_BINARY: return True if dataType.upper() == "TIMESTAMP": if self._description[col][1] == FieldType.C_TIMESTAMP: return True if dataType.upper() == "NCHAR": if self._description[col][1] == FieldType.C_NCHAR: return True if dataType.upper() == "JSON": if self._description[col][1] == FieldType.C_JSON: return True return False def fetchall_row(self): """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.""" if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetchall") buffer = [[] for i in range(len(self._fields))] self._rowcount = 0 while True: block, num_of_rows = taos_fetch_row(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_rows == 0: break self._rowcount += num_of_rows for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer))) def fetchall(self): if self._result is None: raise OperationalError("Invalid use of fetchall") fields = self._fields if self._fields is not None else taos_fetch_fields( self._result) buffer = [[] for i in range(len(fields))] self._rowcount = 0 while True: block, num_of_rows = taos_fetch_block(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_rows == 0: break self._rowcount += num_of_rows for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer))) def stop_query(self): if self._result is not None: taos_stop_query(self._result) def nextset(self): """ """ pass def setinputsize(self, sizes): pass def setutputsize(self, size, column=None): pass def _reset_result(self): """Reset the result to unused version.""" self._description = [] self._rowcount = -1 if self._result is not None: taos_free_result(self._result) self._result = None self._fields = None self._block = None self._block_rows = -1 self._block_iter = 0 self._affected_rows = 0 def _handle_result(self): """Handle the return result from query.""" self._description = [] for ele in self._fields: self._description.append( (ele["name"], ele["type"], None, None, None, None, False)) return self._result def __del__(self): self.close()
Instance variables
var affected_rows
-
Return the rowcount of insertion
Expand source code
@property def affected_rows(self): """Return the rowcount of insertion""" return self._affected_rows
var description
-
Return the description of the object.
Expand source code
@property def description(self): """Return the description of the object.""" return self._description
var rowcount
-
For INSERT statement, rowcount is assigned immediately after execute the statement. For SELECT statement, rowcount will not get correct value until fetched all data.
Expand source code
@property def rowcount(self): """ For INSERT statement, rowcount is assigned immediately after execute the statement. For SELECT statement, rowcount will not get correct value until fetched all data. """ return self._rowcount
Methods
def callproc(self, procname, *args)
-
Call a stored database procedure with the given name.
Void functionality since no stored procedures.
Expand source code
def callproc(self, procname, *args): """Call a stored database procedure with the given name. Void functionality since no stored procedures. """ pass
def close(self)
-
Close the cursor.
Expand source code
def close(self): """Close the cursor.""" if self._connection is None: return False self._reset_result() self._connection = None return True
def execute(self, operation, params=None)
-
Prepare and execute a database operation (query or command).
Expand source code
def execute(self, operation, params=None): # type: (str, Any) -> int """Prepare and execute a database operation (query or command).""" if not operation: return None if not self._connection: # TODO : change the exception raised here raise ProgrammingError("Cursor is not connected") self._reset_result() stmt = operation if params is not None: pass # global querySeqNum # querySeqNum += 1 # localSeqNum = querySeqNum # avoid race condition # print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt))) self._result = taos_query(self._connection._conn, stmt) # print(" << Query ({}) Exec Done".format(localSeqNum)) if self._logfile: with open(self._logfile, "a", encoding="utf-8") as logfile: logfile.write("%s;\n" % operation) if taos_field_count(self._result) == 0: affected_rows = taos_affected_rows(self._result) self._affected_rows = affected_rows self._rowcount = affected_rows return affected_rows else: self._fields = taos_fetch_fields(self._result) return self._handle_result()
def executemany(self, operation, seq_of_parameters)
-
Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
Expand source code
def executemany(self, operation, seq_of_parameters): """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.""" self.execute(operation) pass
def fetchall(self)
-
Expand source code
def fetchall(self): if self._result is None: raise OperationalError("Invalid use of fetchall") fields = self._fields if self._fields is not None else taos_fetch_fields( self._result) buffer = [[] for i in range(len(fields))] self._rowcount = 0 while True: block, num_of_rows = taos_fetch_block(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_rows == 0: break self._rowcount += num_of_rows for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer)))
def fetchall_row(self)
-
Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
Expand source code
def fetchall_row(self): """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.""" if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetchall") buffer = [[] for i in range(len(self._fields))] self._rowcount = 0 while True: block, num_of_rows = taos_fetch_row(self._result, self._fields) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) if num_of_rows == 0: break self._rowcount += num_of_rows for i in range(len(self._fields)): buffer[i].extend(block[i]) return list(map(tuple, zip(*buffer)))
def fetchmany(self)
-
Expand source code
def fetchmany(self): pass
def fetchone(self)
-
Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
Expand source code
def fetchone(self): """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.""" pass
def istype(self, col, dataType)
-
Expand source code
def istype(self, col, dataType): if dataType.upper() == "BOOL": if self._description[col][1] == FieldType.C_BOOL: return True if dataType.upper() == "TINYINT": if self._description[col][1] == FieldType.C_TINYINT: return True if dataType.upper() == "TINYINT UNSIGNED": if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED: return True if dataType.upper() == "SMALLINT": if self._description[col][1] == FieldType.C_SMALLINT: return True if dataType.upper() == "SMALLINT UNSIGNED": if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED: return True if dataType.upper() == "INT": if self._description[col][1] == FieldType.C_INT: return True if dataType.upper() == "INT UNSIGNED": if self._description[col][1] == FieldType.C_INT_UNSIGNED: return True if dataType.upper() == "BIGINT": if self._description[col][1] == FieldType.C_BIGINT: return True if dataType.upper() == "BIGINT UNSIGNED": if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED: return True if dataType.upper() == "FLOAT": if self._description[col][1] == FieldType.C_FLOAT: return True if dataType.upper() == "DOUBLE": if self._description[col][1] == FieldType.C_DOUBLE: return True if dataType.upper() == "BINARY": if self._description[col][1] == FieldType.C_BINARY: return True if dataType.upper() == "TIMESTAMP": if self._description[col][1] == FieldType.C_TIMESTAMP: return True if dataType.upper() == "NCHAR": if self._description[col][1] == FieldType.C_NCHAR: return True if dataType.upper() == "JSON": if self._description[col][1] == FieldType.C_JSON: return True return False
def log(self, logfile)
-
Expand source code
def log(self, logfile): self._logfile = logfile
def next(self)
-
Expand source code
def next(self): return self._taos_next()
def nextset(self)
-
Expand source code
def nextset(self): """ """ pass
def setinputsize(self, sizes)
-
Expand source code
def setinputsize(self, sizes): pass
def setutputsize(self, size, column=None)
-
Expand source code
def setutputsize(self, size, column=None): pass
def stop_query(self)
-
Expand source code
def stop_query(self): if self._result is not None: taos_stop_query(self._result)