Module taos.field

Expand source code
# encoding:UTF-8
import ctypes
import math
import pytz

from datetime import datetime, timedelta
from ctypes import *

from .constants import FieldType
from .error import *

_priv_tz = None
_utc_tz = pytz.timezone("UTC")
_datetime_epoch = datetime.fromtimestamp(0)
_utc_datetime_epoch = _utc_tz.localize(datetime.utcfromtimestamp(0))


def set_tz(tz):
    # type: (str) -> None
    global _priv_tz
    _priv_tz = tz


def _convert_millisecond_to_datetime(milli):
    try:
        if _priv_tz is None:
            return _datetime_epoch + timedelta(seconds=milli / 1000.0)
        return (_utc_datetime_epoch + timedelta(seconds=milli / 1000.0)).astimezone(_priv_tz)
    except OverflowError:
        # catch OverflowError and pass
        print("WARN: datetime overflow!")
        pass


def _convert_microsecond_to_datetime(micro):
    try:
        if _priv_tz is None:
            return _datetime_epoch + timedelta(seconds=micro / 1000000.0)
        return (_utc_datetime_epoch + timedelta(seconds=micro / 1000000.0)).astimezone(_priv_tz)
    except OverflowError:
        # catch OverflowError and pass
        print("WARN: datetime overflow!")
        pass


def _convert_nanosecond_to_datetime(nanosec):
    return nanosec


def _crow_timestamp_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C bool row to python row."""
    _timestamp_converter = _convert_millisecond_to_datetime
    if precision == FieldType.C_TIMESTAMP_MILLI:
        _timestamp_converter = _convert_millisecond_to_datetime
    elif precision == FieldType.C_TIMESTAMP_MICRO:
        _timestamp_converter = _convert_microsecond_to_datetime
    elif precision == FieldType.C_TIMESTAMP_NANO:
        _timestamp_converter = _convert_nanosecond_to_datetime
    else:
        raise DatabaseError("Unknown precision returned from database")

    return [
        None if is_null[i] else _timestamp_converter(ele)
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)])
    ]


def _crow_bool_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C bool row to python row."""
    return [
        None if is_null[i] else bool(ele)
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)])
    ]


def _crow_tinyint_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C tinyint row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)])
    ]


def _crow_tinyint_unsigned_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C unsigned tinyint row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_ubyte))[: abs(num_of_rows)])
    ]


def _crow_smallint_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C smallint row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[: abs(num_of_rows)])
    ]


def _crow_smallint_unsigned_to_python(
    data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN
):
    """Function to convert C unsigned smallint row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_ushort))[: abs(num_of_rows)])
    ]


def _crow_int_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C int row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[: abs(num_of_rows)])
    ]


def _crow_int_unsigned_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C unsigned int row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_uint))[: abs(num_of_rows)])
    ]


def _crow_bigint_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C bigint row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)])
    ]


def _crow_bigint_unsigned_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C unsigned bigint row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_uint64))[: abs(num_of_rows)])
    ]


def _crow_float_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C float row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[: abs(num_of_rows)])
    ]


def _crow_double_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C double row to python row."""
    return [
        None if is_null[i] else ele
        for i, ele in enumerate(ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[: abs(num_of_rows)])
    ]


def _crow_binary_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C binary row to python row."""
    assert nbytes is not None
    return [
        None if is_null[i] else ele.value.decode("utf-8")
        for i, ele in enumerate((ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[: abs(num_of_rows)])
    ]


def _crow_nchar_to_python(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C nchar row to python row."""
    assert nbytes is not None
    res = []
    for i in range(abs(num_of_rows)):
        if is_null[i]:
            res.append(None)
        else:
            try:
                if num_of_rows >= 0:
                    tmpstr = ctypes.c_char_p(data)
                    res.append(tmpstr.value.decode("utf-8"))
                else:
                    res.append(
                        (
                            ctypes.cast(
                                data + nbytes * i,
                                ctypes.POINTER(ctypes.c_wchar * (nbytes // 4)),
                            )
                        )[0].value
                    )
            except ValueError:
                res.append(None)

    return res


def _crow_binary_to_python_block(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C binary row to python row."""
    assert nbytes is not None
    res = []
    for i in range(abs(num_of_rows)):
        if is_null[i]:
            res.append(None)
        else:
            rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_short))[:1].pop()
            chars = ctypes.cast(c_char_p(data + nbytes * i + 2), ctypes.POINTER(c_char * rbyte))
            buffer = create_string_buffer(rbyte + 1)
            buffer[:rbyte] = chars[0][:rbyte]
            res.append(cast(buffer, c_char_p).value.decode("utf-8"))
    return res


def _crow_nchar_to_python_block(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
    """Function to convert C nchar row to python row."""
    assert nbytes is not None
    res = []
    for i in range(abs(num_of_rows)):
        if is_null[i]:
            res.append(None)
        else:
            rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_short))[:1].pop()
            chars = ctypes.cast(c_char_p(data + nbytes * i + 2), ctypes.POINTER(c_char * rbyte))
            buffer = create_string_buffer(rbyte + 1)
            buffer[:rbyte] = chars[0][:rbyte]
            res.append(cast(buffer, c_char_p).value.decode("utf-8"))
    return res


CONVERT_FUNC = {
    FieldType.C_BOOL: _crow_bool_to_python,
    FieldType.C_TINYINT: _crow_tinyint_to_python,
    FieldType.C_SMALLINT: _crow_smallint_to_python,
    FieldType.C_INT: _crow_int_to_python,
    FieldType.C_BIGINT: _crow_bigint_to_python,
    FieldType.C_FLOAT: _crow_float_to_python,
    FieldType.C_DOUBLE: _crow_double_to_python,
    FieldType.C_BINARY: _crow_binary_to_python,
    FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
    FieldType.C_NCHAR: _crow_nchar_to_python,
    FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,
    FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
    FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
    FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
    FieldType.C_JSON: _crow_nchar_to_python,
}

CONVERT_FUNC_BLOCK = {
    FieldType.C_BOOL: _crow_bool_to_python,
    FieldType.C_TINYINT: _crow_tinyint_to_python,
    FieldType.C_SMALLINT: _crow_smallint_to_python,
    FieldType.C_INT: _crow_int_to_python,
    FieldType.C_BIGINT: _crow_bigint_to_python,
    FieldType.C_FLOAT: _crow_float_to_python,
    FieldType.C_DOUBLE: _crow_double_to_python,
    FieldType.C_BINARY: _crow_binary_to_python_block,
    FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
    FieldType.C_NCHAR: _crow_nchar_to_python_block,
    FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,
    FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
    FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
    FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
    FieldType.C_JSON: _crow_nchar_to_python_block,
}

# Corresponding TAOS_FIELD structure in C


class TaosField(ctypes.Structure):
    _fields_ = [
        ("_name", ctypes.c_char * 65),
        ("_type", ctypes.c_uint8),
        ("_bytes", ctypes.c_uint16),
    ]

    @property
    def name(self):
        return self._name.decode("utf-8")

    @property
    def length(self):
        """alias to self.bytes"""
        return self._bytes

    @property
    def bytes(self):
        return self._bytes

    @property
    def type(self):
        return self._type

    def __dict__(self):
        return {"name": self.name, "type": self.type, "bytes": self.length}

    def __str__(self):
        return "{name: %s, type: %d, bytes: %d}" % (
            self.name,
            self.type,
            self.length,
        )

    def __getitem__(self, item):
        return getattr(self, item)


class TaosFields(object):
    def __init__(self, fields, count):
        if isinstance(fields, c_void_p):
            self._fields = cast(fields, POINTER(TaosField))
        if isinstance(fields, POINTER(TaosField)):
            self._fields = fields
        self._count = count
        self._iter = 0

    def as_ptr(self):
        return self._fields

    @property
    def count(self):
        return self._count

    @property
    def fields(self):
        return self._fields

    def __next__(self):
        return self._next_field()

    def next(self):
        return self._next_field()

    def _next_field(self):
        if self._iter < self.count:
            field = self._fields[self._iter]
            self._iter += 1
            return field
        else:
            raise StopIteration

    def __getitem__(self, item):
        return self._fields[item]

    def __iter__(self):
        self._iter = 0
        return self

    def __len__(self):
        return self.count

Functions

def set_tz(tz)
Expand source code
def set_tz(tz):
    # type: (str) -> None
    global _priv_tz
    _priv_tz = tz

Classes

class TaosField (*args, **kwargs)

Structure base class

Expand source code
class TaosField(ctypes.Structure):
    _fields_ = [
        ("_name", ctypes.c_char * 65),
        ("_type", ctypes.c_uint8),
        ("_bytes", ctypes.c_uint16),
    ]

    @property
    def name(self):
        return self._name.decode("utf-8")

    @property
    def length(self):
        """alias to self.bytes"""
        return self._bytes

    @property
    def bytes(self):
        return self._bytes

    @property
    def type(self):
        return self._type

    def __dict__(self):
        return {"name": self.name, "type": self.type, "bytes": self.length}

    def __str__(self):
        return "{name: %s, type: %d, bytes: %d}" % (
            self.name,
            self.type,
            self.length,
        )

    def __getitem__(self, item):
        return getattr(self, item)

Ancestors

  • _ctypes.Structure
  • _ctypes._CData

Instance variables

var bytes
Expand source code
@property
def bytes(self):
    return self._bytes
var length

alias to self.bytes

Expand source code
@property
def length(self):
    """alias to self.bytes"""
    return self._bytes
var name
Expand source code
@property
def name(self):
    return self._name.decode("utf-8")
var type
Expand source code
@property
def type(self):
    return self._type
class TaosFields (fields, count)
Expand source code
class TaosFields(object):
    def __init__(self, fields, count):
        if isinstance(fields, c_void_p):
            self._fields = cast(fields, POINTER(TaosField))
        if isinstance(fields, POINTER(TaosField)):
            self._fields = fields
        self._count = count
        self._iter = 0

    def as_ptr(self):
        return self._fields

    @property
    def count(self):
        return self._count

    @property
    def fields(self):
        return self._fields

    def __next__(self):
        return self._next_field()

    def next(self):
        return self._next_field()

    def _next_field(self):
        if self._iter < self.count:
            field = self._fields[self._iter]
            self._iter += 1
            return field
        else:
            raise StopIteration

    def __getitem__(self, item):
        return self._fields[item]

    def __iter__(self):
        self._iter = 0
        return self

    def __len__(self):
        return self.count

Instance variables

var count
Expand source code
@property
def count(self):
    return self._count
var fields
Expand source code
@property
def fields(self):
    return self._fields

Methods

def as_ptr(self)
Expand source code
def as_ptr(self):
    return self._fields
def next(self)
Expand source code
def next(self):
    return self._next_field()