summaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect/driver/bytesource.py
blob: 3d98b6dd394bb3e8d80de80d3c0efd09fecadf03 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import struct

from clickhouse_connect.driver.types import ByteSource


class ByteArraySource(ByteSource):
    """
    ByteSource implementation for in-memory byte arrays.

    This class wraps a byte array and provides the ByteSource interface,
    allowing ClickHouse type decoders to read from in-memory data instead
    of a network stream.

    Used primarily for decoding variant-encoded values from JSON shared data
    where each value is a complete serialized type instance.
    """

    def __init__(self, data: bytes, encoding: str = "utf-8"):
        self.data = data
        self.pos = 0
        self.encoding = encoding

    def read_byte(self) -> int:
        if self.pos >= len(self.data):
            raise EOFError("Attempted to read past end of byte array")
        b = self.data[self.pos]
        self.pos += 1
        return b

    def read_bytes(self, sz: int) -> bytes:
        if self.pos + sz > len(self.data):
            raise EOFError(f"Attempted to read {sz} bytes, only {len(self.data) - self.pos} available")
        result = self.data[self.pos : self.pos + sz]
        self.pos += sz
        return result

    def read_leb128(self) -> int:
        sz = 0
        shift = 0
        while self.pos < len(self.data):
            b = self.read_byte()
            sz += (b & 0x7F) << shift
            if (b & 0x80) == 0:
                return sz
            shift += 7
        raise EOFError("Unexpected end while reading LEB128")

    def read_leb128_str(self) -> str:
        sz = self.read_leb128()
        return self.read_bytes(sz).decode(self.encoding)

    def read_uint64(self) -> int:
        return int.from_bytes(self.read_bytes(8), "little", signed=False)

    def read_int64(self) -> int:
        return int.from_bytes(self.read_bytes(8), "little", signed=True)

    def read_uint32(self) -> int:
        return int.from_bytes(self.read_bytes(4), "little", signed=False)

    def read_int32(self) -> int:
        return int.from_bytes(self.read_bytes(4), "little", signed=True)

    def read_uint16(self) -> int:
        return int.from_bytes(self.read_bytes(2), "little", signed=False)

    def read_int16(self) -> int:
        return int.from_bytes(self.read_bytes(2), "little", signed=True)

    def read_float32(self) -> float:
        return struct.unpack("<f", self.read_bytes(4))[0]

    def read_float64(self) -> float:
        return struct.unpack("<d", self.read_bytes(8))[0]

    def read_array(self, array_type: str, num_rows: int):  # type: ignore
        if array_type == "B":
            return [self.read_byte() for _ in range(num_rows)]
        if array_type == "H":
            return [self.read_uint16() for _ in range(num_rows)]
        if array_type == "I":
            return [self.read_uint32() for _ in range(num_rows)]
        if array_type == "Q":
            return [self.read_uint64() for _ in range(num_rows)]
        if array_type == "b":
            return [int.from_bytes([self.read_byte()], "little", signed=True) for _ in range(num_rows)]
        if array_type == "h":
            return [self.read_int16() for _ in range(num_rows)]
        if array_type == "i":
            return [self.read_int32() for _ in range(num_rows)]
        if array_type == "q":
            return [self.read_int64() for _ in range(num_rows)]
        if array_type == "f":
            return [self.read_float32() for _ in range(num_rows)]
        if array_type == "d":
            return [self.read_float64() for _ in range(num_rows)]
        raise NotImplementedError(f"Array type {array_type} not implemented for ByteArraySource")

    def read_str_col(self, num_rows, encoding, nullable=False, null_obj=None):  # type: ignore
        if num_rows != 1:
            raise NotImplementedError("read_str_col only supports num_rows=1 for single-value decoding")

        length = self.read_leb128()
        string_bytes = self.read_bytes(length)

        if encoding is None:
            return [string_bytes]

        return [string_bytes.decode(encoding)]

    def read_bytes_col(self, sz, num_rows):
        raise NotImplementedError("read_bytes_col not needed for single-value decoding")

    def read_fixed_str_col(self, sz, num_rows, encoding):
        raise NotImplementedError("read_fixed_str_col not needed for single-value decoding")

    def close(self):
        """No cleanup needed for byte arrays."""