aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect/driver/npquery.py
blob: 513c81a572885de5717b581f2c87fd3b0f28e5f7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
import itertools
from typing import Generator, Sequence, Tuple

from clickhouse_connect.driver.common import empty_gen, StreamContext
from clickhouse_connect.driver.exceptions import StreamClosedError
from clickhouse_connect.driver.types import Closable
from clickhouse_connect.driver.options import np, pd

logger = logging.getLogger(__name__)


# pylint: disable=too-many-instance-attributes
class NumpyResult(Closable):
    def __init__(self,
                 block_gen: Generator[Sequence, None, None] = None,
                 column_names: Tuple = (),
                 column_types: Tuple = (),
                 d_types: Sequence = (),
                 source: Closable = None):
        self.column_names = column_names
        self.column_types = column_types
        self.np_types = d_types
        self.source = source
        self.query_id = ''
        self.summary = {}
        self._block_gen = block_gen or empty_gen()
        self._numpy_result = None
        self._df_result = None

    def _np_stream(self) -> Generator:
        if self._block_gen is None:
            raise StreamClosedError

        block_gen = self._block_gen
        self._block_gen = None
        if not self.np_types:
            return block_gen

        d_types = self.np_types
        first_type = d_types[0]
        if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types):
            self.np_types = first_type

            def numpy_blocks():
                for block in block_gen:
                    yield np.array(block, first_type).transpose()
        else:
            if any(x == np.object_ for x in d_types):
                self.np_types = [np.object_] * len(self.np_types)
            self.np_types = np.dtype(list(zip(self.column_names, d_types)))

            def numpy_blocks():
                for block in block_gen:
                    np_array = np.empty(len(block[0]), dtype=self.np_types)
                    for col_name, data in zip(self.column_names, block):
                        np_array[col_name] = data
                    yield np_array

        return numpy_blocks()

    def _df_stream(self) -> Generator:
        if self._block_gen is None:
            raise StreamClosedError
        block_gen = self._block_gen

        def pd_blocks():
            for block in block_gen:
                yield pd.DataFrame(dict(zip(self.column_names, block)))

        self._block_gen = None
        return pd_blocks()

    def close_numpy(self):
        if not self._block_gen:
            raise StreamClosedError
        chunk_size = 4
        pieces = []
        blocks = []
        for block in self._np_stream():
            blocks.append(block)
            if len(blocks) == chunk_size:
                pieces.append(np.concatenate(blocks, dtype=self.np_types))
                chunk_size *= 2
                blocks = []
        pieces.extend(blocks)
        if len(pieces) > 1:
            self._numpy_result = np.concatenate(pieces, dtype=self.np_types)
        elif len(pieces) == 1:
            self._numpy_result = pieces[0]
        else:
            self._numpy_result = np.empty((0,))
        self.close()
        return self

    def close_df(self):
        if self._block_gen is None:
            raise StreamClosedError
        chains = [itertools.chain(b) for b in zip(*self._block_gen)]
        new_df_series = []
        for c in chains:
            new_df_series.append(pd.concat([pd.Series(piece, copy=False) for piece in c], copy=False))
        self._df_result = pd.DataFrame(dict(zip(self.column_names, new_df_series)))
        self.close()
        return self

    @property
    def np_result(self):
        if self._numpy_result is None:
            self.close_numpy()
        return self._numpy_result

    @property
    def df_result(self):
        if self._df_result is None:
            self.close_df()
        return self._df_result

    @property
    def np_stream(self) -> StreamContext:
        return StreamContext(self, self._np_stream())

    @property
    def df_stream(self) -> StreamContext:
        return StreamContext(self, self._df_stream())

    def close(self):
        if self._block_gen is not None:
            self._block_gen.close()
            self._block_gen = None
        if self.source:
            self.source.close()
            self.source = None