1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
import logging
import itertools
from typing import Generator, Sequence, Tuple
from clickhouse_connect.driver.common import empty_gen, StreamContext
from clickhouse_connect.driver.exceptions import StreamClosedError
from clickhouse_connect.driver.types import Closable
from clickhouse_connect.driver.options import np, pd
logger = logging.getLogger(__name__)
# pylint: disable=too-many-instance-attributes
class NumpyResult(Closable):
def __init__(self,
block_gen: Generator[Sequence, None, None] = None,
column_names: Tuple = (),
column_types: Tuple = (),
d_types: Sequence = (),
source: Closable = None):
self.column_names = column_names
self.column_types = column_types
self.np_types = d_types
self.source = source
self.query_id = ''
self.summary = {}
self._block_gen = block_gen or empty_gen()
self._numpy_result = None
self._df_result = None
def _np_stream(self) -> Generator:
if self._block_gen is None:
raise StreamClosedError
block_gen = self._block_gen
self._block_gen = None
if not self.np_types:
return block_gen
d_types = self.np_types
first_type = d_types[0]
if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types):
self.np_types = first_type
def numpy_blocks():
for block in block_gen:
yield np.array(block, first_type).transpose()
else:
if any(x == np.object_ for x in d_types):
self.np_types = [np.object_] * len(self.np_types)
self.np_types = np.dtype(list(zip(self.column_names, d_types)))
def numpy_blocks():
for block in block_gen:
np_array = np.empty(len(block[0]), dtype=self.np_types)
for col_name, data in zip(self.column_names, block):
np_array[col_name] = data
yield np_array
return numpy_blocks()
def _df_stream(self) -> Generator:
if self._block_gen is None:
raise StreamClosedError
block_gen = self._block_gen
def pd_blocks():
for block in block_gen:
yield pd.DataFrame(dict(zip(self.column_names, block)))
self._block_gen = None
return pd_blocks()
def close_numpy(self):
if not self._block_gen:
raise StreamClosedError
chunk_size = 4
pieces = []
blocks = []
for block in self._np_stream():
blocks.append(block)
if len(blocks) == chunk_size:
pieces.append(np.concatenate(blocks, dtype=self.np_types))
chunk_size *= 2
blocks = []
pieces.extend(blocks)
if len(pieces) > 1:
self._numpy_result = np.concatenate(pieces, dtype=self.np_types)
elif len(pieces) == 1:
self._numpy_result = pieces[0]
else:
self._numpy_result = np.empty((0,))
self.close()
return self
def close_df(self):
if self._block_gen is None:
raise StreamClosedError
bg = self._block_gen
chain = itertools.chain
chains = [chain(b) for b in zip(*bg)]
new_df_series = []
for c in chains:
series = [pd.Series(piece, copy=False) for piece in c if len(piece) > 0]
if len(series) > 0:
new_df_series.append(pd.concat(series, copy=False, ignore_index=True))
self._df_result = pd.DataFrame(dict(zip(self.column_names, new_df_series)))
self.close()
return self
@property
def np_result(self):
if self._numpy_result is None:
self.close_numpy()
return self._numpy_result
@property
def df_result(self):
if self._df_result is None:
self.close_df()
return self._df_result
@property
def np_stream(self) -> StreamContext:
return StreamContext(self, self._np_stream())
@property
def df_stream(self) -> StreamContext:
return StreamContext(self, self._df_stream())
def close(self):
if self._block_gen is not None:
self._block_gen.close()
self._block_gen = None
if self.source:
self.source.close()
self.source = None
|