1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
|
import array
import struct
import sys
from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator
from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError
from clickhouse_connect.driver.types import Closable
# pylint: disable=invalid-name
must_swap = sys.byteorder == 'big'
int_size = array.array('i').itemsize
low_card_version = 1
array_map = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
decimal_prec = {32: 9, 64: 18, 128: 38, 256: 79}
if int_size == 2:
array_map[4] = 'l'
array_sizes = {v: k for k, v in array_map.items()}
array_sizes['f'] = 4
array_sizes['d'] = 8
np_date_types = {0: '[s]', 3: '[ms]', 6: '[us]', 9: '[ns]'}
def array_type(size: int, signed: bool):
"""
Determines the Python array.array code for the requested byte size
:param size: byte size
:param signed: whether int types should be signed or unsigned
:return: Python array.array code
"""
try:
code = array_map[size]
except KeyError:
return None
return code if signed else code.upper()
def write_array(code: str, column: Sequence, dest: MutableSequence):
"""
Write a column of native Python data matching the array.array code
:param code: Python array.array code matching the column data type
:param column: Column of native Python values
:param dest: Destination byte buffer
"""
if len(column) and not isinstance(column[0], (int, float)):
if code in ('f', 'F', 'd', 'D'):
column = [float(x) for x in column]
else:
column = [int(x) for x in column]
try:
buff = struct.Struct(f'<{len(column)}{code}')
dest += buff.pack(*column)
except (TypeError, OverflowError, struct.error) as ex:
raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' +
'values into a ClickHouse column that is not Nullable') from ex
def write_uint64(value: int, dest: MutableSequence):
"""
Write a single UInt64 value to a binary write buffer
:param value: UInt64 value to write
:param dest: Destination byte buffer
"""
dest.extend(value.to_bytes(8, 'little'))
def write_leb128(value: int, dest: MutableSequence):
"""
Write a LEB128 encoded integer to a target binary buffer
:param value: Integer value (positive only)
:param dest: Target buffer
"""
while True:
b = value & 0x7f
value >>= 7
if value == 0:
dest.append(b)
return
dest.append(0x80 | b)
def decimal_size(prec: int):
"""
Determine the bit size of a ClickHouse or Python Decimal needed to store a value of the requested precision
:param prec: Precision of the Decimal in total number of base 10 digits
:return: Required bit size
"""
if prec < 1 or prec > 79:
raise ArithmeticError(f'Invalid precision {prec} for ClickHouse Decimal type')
if prec < 10:
return 32
if prec < 19:
return 64
if prec < 39:
return 128
return 256
def unescape_identifier(x: str) -> str:
if x.startswith('`') and x.endswith('`'):
return x[1:-1]
return x
def dict_copy(source: Dict = None, update: Optional[Dict] = None) -> Dict:
copy = source.copy() if source else {}
if update:
copy.update(update)
return copy
def dict_add(source: Dict, key: str, value: any) -> Dict:
if value is not None:
source[key] = value
return source
def empty_gen():
yield from ()
def coerce_int(val: Optional[Union[str, int]]) -> int:
if not val:
return 0
return int(val)
def coerce_bool(val: Optional[Union[str, bool]]):
if not val:
return False
return val is True or (isinstance(val, str) and val.lower() in ('true', '1', 'y', 'yes'))
class SliceView(Sequence):
"""
Provides a view into a sequence rather than copying. Borrows liberally from
https://gist.github.com/mathieucaroff/0cf094325fb5294fb54c6a577f05a2c1
Also see the discussion on SO: https://stackoverflow.com/questions/3485475/can-i-create-a-view-on-a-python-list
"""
slots = ('_source', '_range')
def __init__(self, source: Sequence, source_slice: Optional[slice] = None):
if isinstance(source, SliceView):
self._source = source._source
self._range = source._range[source_slice]
else:
self._source = source
if source_slice is None:
self._range = range(len(source))
else:
self._range = range(len(source))[source_slice]
def __len__(self):
return len(self._range)
def __getitem__(self, i):
if isinstance(i, slice):
return SliceView(self._source, i)
return self._source[self._range[i]]
def __str__(self):
r = self._range
return str(self._source[slice(r.start, r.stop, r.step)])
def __repr__(self):
r = self._range
return f'SliceView({self._source[slice(r.start, r.stop, r.step)]})'
def __eq__(self, other):
if self is other:
return True
if len(self) != len(other):
return False
for v, w in zip(self, other):
if v != w:
return False
return True
class StreamContext:
"""
Wraps a generator and its "source" in a Context. This ensures that the source will be "closed" even if the
generator is not fully consumed or there is an exception during consumption
"""
__slots__ = 'source', 'gen', '_in_context'
def __init__(self, source: Closable, gen: Generator):
self.source = source
self.gen = gen
self._in_context = False
def __iter__(self):
return self
def __next__(self):
if not self._in_context:
raise ProgrammingError('Stream should be used within a context')
return next(self.gen)
def __enter__(self):
if not self.gen:
raise StreamClosedError
self._in_context = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._in_context = False
self.source.close()
self.gen = None
|