aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/ClickHouse/ch-go/proto/reader.go
blob: 4aa8f48db8a0c8f0ea71568246f63d9c20a60c71 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
package proto

import (
	"bufio"
	"encoding/binary"
	"io"
	"math"

	"github.com/go-faster/errors"

	"github.com/ClickHouse/ch-go/compress"
)

// Decoder implements decoding from Reader.
type Decoder interface {
	Decode(r *Reader) error
}

// AwareDecoder implements encoding to Buffer that depends on version.
type AwareDecoder interface {
	DecodeAware(r *Reader, version int) error
}

// Reader implements ClickHouse protocol decoding from buffered reader.
// Not goroutine-safe.
type Reader struct {
	raw  *bufio.Reader // raw bytes, e.g. on the wire
	data io.Reader     // data, decompressed or same as raw
	b    *Buffer       // internal buffer

	decompressed io.Reader // decompressed data stream, from raw
}

func (r *Reader) ReadByte() (byte, error) {
	if err := r.readFull(1); err != nil {
		return 0, err
	}
	return r.b.Buf[0], nil
}

// EnableCompression makes next reads use decompressed source of data.
func (r *Reader) EnableCompression() {
	r.data = r.decompressed
}

// DisableCompression makes next read use raw source of data.
func (r *Reader) DisableCompression() {
	r.data = r.raw
}

func (r *Reader) Read(p []byte) (n int, err error) {
	return r.data.Read(p)
}

// Decode value.
func (r *Reader) Decode(v Decoder) error {
	return v.Decode(r)
}

func (r *Reader) ReadFull(buf []byte) error {
	if _, err := io.ReadFull(r, buf); err != nil {
		return errors.Wrap(err, "read")
	}
	return nil
}

func (r *Reader) readFull(n int) error {
	r.b.Ensure(n)
	return r.ReadFull(r.b.Buf)
}

// ReadRaw reads raw n bytes.
func (r *Reader) ReadRaw(n int) ([]byte, error) {
	if err := r.readFull(n); err != nil {
		return nil, errors.Wrap(err, "read full")
	}

	return r.b.Buf, nil
}

// UVarInt reads uint64 from internal reader.
func (r *Reader) UVarInt() (uint64, error) {
	n, err := binary.ReadUvarint(r)
	if err != nil {
		return 0, errors.Wrap(err, "read")
	}
	return n, nil
}

const maxStrSize = 10 * 1024 * 1024 // 10 MB

func (r *Reader) StrLen() (int, error) {
	n, err := r.Int()
	if err != nil {
		return 0, errors.Wrap(err, "read length")
	}

	if n < 0 {
		return 0, errors.Errorf("size %d is invalid", n)
	}
	if n > maxStrSize {
		// Protecting from possible OOM.
		return 0, errors.Errorf("size %d too big (%d is maximum)", n, maxStrSize)
	}

	return n, nil
}

// StrRaw decodes string to internal buffer and returns it directly.
//
// Do not retain returned slice.
func (r *Reader) StrRaw() ([]byte, error) {
	n, err := r.StrLen()
	if err != nil {
		return nil, errors.Wrap(err, "read length")
	}
	r.b.Ensure(n)
	if _, err := io.ReadFull(r.data, r.b.Buf); err != nil {
		return nil, errors.Wrap(err, "read str")
	}

	return r.b.Buf, nil
}

// StrAppend decodes string and appends it to provided buf.
func (r *Reader) StrAppend(buf []byte) ([]byte, error) {
	defer r.b.Reset()

	str, err := r.StrRaw()
	if err != nil {
		return nil, errors.Wrap(err, "raw")
	}

	return append(buf, str...), nil
}

// StrBytes decodes string and allocates new byte slice with result.
func (r *Reader) StrBytes() ([]byte, error) {
	return r.StrAppend(nil)
}

// Str decodes string.
func (r *Reader) Str() (string, error) {
	s, err := r.StrBytes()
	if err != nil {
		return "", errors.Wrap(err, "bytes")
	}

	return string(s), err
}

// Int decodes uvarint as int.
func (r *Reader) Int() (int, error) {
	n, err := r.UVarInt()
	if err != nil {
		return 0, errors.Wrap(err, "uvarint")
	}
	return int(n), nil
}

// Int8 decodes int8 value.
func (r *Reader) Int8() (int8, error) {
	v, err := r.UInt8()
	if err != nil {
		return 0, err
	}
	return int8(v), nil
}

// Int16 decodes int16 value.
func (r *Reader) Int16() (int16, error) {
	v, err := r.UInt16()
	if err != nil {
		return 0, err
	}
	return int16(v), nil
}

// Int32 decodes int32 value.
func (r *Reader) Int32() (int32, error) {
	v, err := r.UInt32()
	if err != nil {
		return 0, err
	}
	return int32(v), nil
}

// Int64 decodes int64 value.
func (r *Reader) Int64() (int64, error) {
	v, err := r.UInt64()
	if err != nil {
		return 0, err
	}
	return int64(v), nil
}

// Int128 decodes Int128 value.
func (r *Reader) Int128() (Int128, error) {
	v, err := r.UInt128()
	if err != nil {
		return Int128{}, err
	}
	return Int128(v), nil
}

// Byte decodes byte value.
func (r *Reader) Byte() (byte, error) {
	return r.UInt8()
}

// UInt8 decodes uint8 value.
func (r *Reader) UInt8() (uint8, error) {
	if err := r.readFull(1); err != nil {
		return 0, errors.Wrap(err, "read")
	}
	return r.b.Buf[0], nil
}

// UInt16 decodes uint16 value.
func (r *Reader) UInt16() (uint16, error) {
	if err := r.readFull(2); err != nil {
		return 0, errors.Wrap(err, "read")
	}
	return binary.LittleEndian.Uint16(r.b.Buf), nil
}

// UInt32 decodes uint32 value.
func (r *Reader) UInt32() (uint32, error) {
	if err := r.readFull(32 / 8); err != nil {
		return 0, errors.Wrap(err, "read")
	}
	return binary.LittleEndian.Uint32(r.b.Buf), nil
}

// UInt64 decodes uint64 value.
func (r *Reader) UInt64() (uint64, error) {
	if err := r.readFull(64 / 8); err != nil {
		return 0, errors.Wrap(err, "read")
	}
	return binary.LittleEndian.Uint64(r.b.Buf), nil
}

// UInt128 decodes UInt128 value.
func (r *Reader) UInt128() (UInt128, error) {
	if err := r.readFull(128 / 8); err != nil {
		return UInt128{}, errors.Wrap(err, "read")
	}
	return binUInt128(r.b.Buf), nil
}

// Float32 decodes float32 value.
func (r *Reader) Float32() (float32, error) {
	v, err := r.UInt32()
	if err != nil {
		return 0, errors.Wrap(err, "bits")
	}
	return math.Float32frombits(v), nil
}

// Float64 decodes float64 value.
func (r *Reader) Float64() (float64, error) {
	v, err := r.UInt64()
	if err != nil {
		return 0, errors.Wrap(err, "bits")
	}
	return math.Float64frombits(v), nil
}

// Bool decodes bool as uint8.
func (r *Reader) Bool() (bool, error) {
	v, err := r.UInt8()
	if err != nil {
		return false, errors.Wrap(err, "uint8")
	}
	switch v {
	case boolTrue:
		return true, nil
	case boolFalse:
		return false, nil
	default:
		return false, errors.Errorf("unexpected value %d for boolean", v)
	}
}

const defaultReaderSize = 1024 * 128 // 128kb

// NewReader initializes new Reader from provided io.Reader.
func NewReader(r io.Reader) *Reader {
	c := bufio.NewReaderSize(r, defaultReaderSize)
	return &Reader{
		raw:          c,
		data:         c,
		b:            &Buffer{},
		decompressed: compress.NewReader(c),
	}
}