aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect/driver/common.py
blob: 71adb00321410379bf0640ab736ed59ac2ab117b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import array
import struct
import sys

from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator

from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError
from clickhouse_connect.driver.types import Closable

# pylint: disable=invalid-name
must_swap = sys.byteorder == 'big'
int_size = array.array('i').itemsize
low_card_version = 1

array_map = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
decimal_prec = {32: 9, 64: 18, 128: 38, 256: 79}

if int_size == 2:
    array_map[4] = 'l'

array_sizes = {v: k for k, v in array_map.items()}
array_sizes['f'] = 4
array_sizes['d'] = 8
np_date_types = {0: '[s]', 3: '[ms]', 6: '[us]', 9: '[ns]'}


def array_type(size: int, signed: bool):
    """
    Determines the Python array.array code for the requested byte size
    :param size: byte size
    :param signed: whether int types should be signed or unsigned
    :return: Python array.array code
    """
    try:
        code = array_map[size]
    except KeyError:
        return None
    return code if signed else code.upper()


def write_array(code: str, column: Sequence, dest: MutableSequence):
    """
    Write a column of native Python data matching the array.array code
    :param code: Python array.array code matching the column data type
    :param column: Column of native Python values
    :param dest: Destination byte buffer
    """
    if len(column) and not isinstance(column[0], (int, float)):
        if code in ('f', 'F', 'd', 'D'):
            column = [float(x) for x in column]
        else:
            column = [int(x) for x in column]
    try:
        buff = struct.Struct(f'<{len(column)}{code}')
        dest += buff.pack(*column)
    except (TypeError, OverflowError, struct.error) as ex:
        raise DataError('Unable to create Python array.  This is usually caused by trying to insert None ' +
                        'values into a ClickHouse column that is not Nullable') from ex


def write_uint64(value: int, dest: MutableSequence):
    """
    Write a single UInt64 value to a binary write buffer
    :param value: UInt64 value to write
    :param dest: Destination byte buffer
    """
    dest.extend(value.to_bytes(8, 'little'))


def write_leb128(value: int, dest: MutableSequence):
    """
    Write a LEB128 encoded integer to a target binary buffer
    :param value: Integer value (positive only)
    :param dest: Target buffer
    """
    while True:
        b = value & 0x7f
        value >>= 7
        if value == 0:
            dest.append(b)
            return
        dest.append(0x80 | b)


def decimal_size(prec: int):
    """
    Determine the bit size of a ClickHouse or Python Decimal needed to store a value of the requested precision
    :param prec: Precision of the Decimal in total number of base 10 digits
    :return: Required bit size
    """
    if prec < 1 or prec > 79:
        raise ArithmeticError(f'Invalid precision {prec} for ClickHouse Decimal type')
    if prec < 10:
        return 32
    if prec < 19:
        return 64
    if prec < 39:
        return 128
    return 256


def unescape_identifier(x: str) -> str:
    if x.startswith('`') and x.endswith('`'):
        return x[1:-1]
    return x


def dict_copy(source: Dict = None, update: Optional[Dict] = None) -> Dict:
    copy = source.copy() if source else {}
    if update:
        copy.update(update)
    return copy


def empty_gen():
    yield from ()


def coerce_int(val: Optional[Union[str, int]]) -> int:
    if not val:
        return 0
    return int(val)


def coerce_bool(val: Optional[Union[str, bool]]):
    if not val:
        return False
    return val in (True, 'True', 'true', '1')


class SliceView(Sequence):
    """
    Provides a view into a sequence rather than copying.  Borrows liberally from
    https://gist.github.com/mathieucaroff/0cf094325fb5294fb54c6a577f05a2c1
    Also see the discussion on SO: https://stackoverflow.com/questions/3485475/can-i-create-a-view-on-a-python-list
    """
    slots = ('_source', '_range')

    def __init__(self, source: Sequence, source_slice: Optional[slice] = None):
        if isinstance(source, SliceView):
            self._source = source._source
            self._range = source._range[source_slice]
        else:
            self._source = source
            if source_slice is None:
                self._range = range(len(source))
            else:
                self._range = range(len(source))[source_slice]

    def __len__(self):
        return len(self._range)

    def __getitem__(self, i):
        if isinstance(i, slice):
            return SliceView(self._source, i)
        return self._source[self._range[i]]

    def __str__(self):
        r = self._range
        return str(self._source[slice(r.start, r.stop, r.step)])

    def __repr__(self):
        r = self._range
        return f'SliceView({self._source[slice(r.start, r.stop, r.step)]})'

    def __eq__(self, other):
        if self is other:
            return True
        if len(self) != len(other):
            return False
        for v, w in zip(self, other):
            if v != w:
                return False
        return True


class StreamContext:
    """
    Wraps a generator and its "source" in a Context.  This ensures that the source will be "closed" even if the
    generator is not fully consumed or there is an exception during consumption
    """
    __slots__ = 'source', 'gen', '_in_context'

    def __init__(self, source: Closable, gen: Generator):
        self.source = source
        self.gen = gen
        self._in_context = False

    def __iter__(self):
        return self

    def __next__(self):
        if not self._in_context:
            raise ProgrammingError('Stream should be used within a context')
        return next(self.gen)

    def __enter__(self):
        if not self.gen:
            raise StreamClosedError
        self._in_context = True
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._in_context = False
        self.source.close()
        self.gen = None