1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
import array
from datetime import datetime, date, tzinfo
from ipaddress import IPv4Address
from typing import Sequence, Optional, Any
from uuid import UUID, SafeUUID
from clickhouse_connect.driver.common import int_size
from clickhouse_connect.driver.errors import NONE_IN_NULLABLE_COLUMN
from clickhouse_connect.driver.types import ByteSource
from clickhouse_connect.driver.options import np
MONTH_DAYS = (0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365)
MONTH_DAYS_LEAP = (0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366)
def read_ipv4_col(source: ByteSource, num_rows: int):
column = source.read_array('I', num_rows)
fast_ip_v4 = IPv4Address.__new__
new_col = []
app = new_col.append
for x in column:
ipv4 = fast_ip_v4(IPv4Address)
ipv4._ip = x # pylint: disable=protected-access
app(ipv4)
return new_col
def read_datetime_col(source: ByteSource, num_rows: int, tz_info: Optional[tzinfo]):
src_array = source.read_array('I', num_rows)
fts = datetime.fromtimestamp
return [fts(ts, tz_info) for ts in src_array]
def epoch_days_to_date(days: int) -> date:
cycles400, rem = divmod(days + 134774, 146097)
cycles100, rem = divmod(rem, 36524)
cycles, rem = divmod(rem, 1461)
years, rem = divmod(rem, 365)
year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601
if years == 4 or cycles100 == 4:
return date(year - 1, 12, 31)
m_list = MONTH_DAYS_LEAP if years == 3 and (year == 2000 or year % 100 != 0) else MONTH_DAYS
month = (rem + 24) >> 5
while rem < m_list[month]:
month -= 1
return date(year, month + 1, rem + 1 - m_list[month])
def read_date_col(source: ByteSource, num_rows: int):
column = source.read_array('H', num_rows)
return [epoch_days_to_date(x) for x in column]
def read_date32_col(source: ByteSource, num_rows: int):
column = source.read_array('l' if int_size == 2 else 'i', num_rows)
return [epoch_days_to_date(x) for x in column]
def read_uuid_col(source: ByteSource, num_rows: int):
v = source.read_array('Q', num_rows * 2)
empty_uuid = UUID(int=0)
new_uuid = UUID.__new__
unsafe = SafeUUID.unsafe
oset = object.__setattr__
column = []
app = column.append
for i in range(num_rows):
ix = i << 1
int_value = v[ix] << 64 | v[ix + 1]
if int_value == 0:
app(empty_uuid)
else:
fast_uuid = new_uuid(UUID)
oset(fast_uuid, 'int', int_value)
oset(fast_uuid, 'is_safe', unsafe)
app(fast_uuid)
return column
def read_nullable_array(source: ByteSource, array_type: str, num_rows: int, null_obj: Any):
null_map = source.read_bytes(num_rows)
column = source.read_array(array_type, num_rows)
return [null_obj if null_map[ix] else column[ix] for ix in range(num_rows)]
def build_nullable_column(source: Sequence, null_map: bytes, null_obj: Any):
return [source[ix] if null_map[ix] == 0 else null_obj for ix in range(len(source))]
def build_lc_nullable_column(index: Sequence, keys: array.array, null_obj: Any):
column = []
for key in keys:
if key == 0:
column.append(null_obj)
else:
column.append(index[key])
return column
def to_numpy_array(column: Sequence):
arr = np.empty((len(column),), dtype=np.object)
arr[:] = column
return arr
def pivot(data: Sequence[Sequence], start_row: int, end_row: int) -> Sequence[Sequence]:
return tuple(zip(*data[start_row: end_row]))
def write_str_col(column: Sequence, nullable: bool, encoding: Optional[str], dest: bytearray) -> int:
app = dest.append
for x in column:
if not x:
if not nullable and x is None:
return NONE_IN_NULLABLE_COLUMN
app(0)
else:
if encoding:
x = x.encode(encoding)
else:
x = bytes(x)
sz = len(x)
while True:
b = sz & 0x7f
sz >>= 7
if sz == 0:
app(b)
break
app(0x80 | b)
dest += x
return 0
|