summaryrefslogtreecommitdiffstats
path: root/yql/essentials/public/udf/arrow/block_io_buffer.h
blob: 46bfd4acbb4cefced0ccdbb97901fcdae114c1cb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#pragma once

#include <util/generic/strbuf.h>
#include <util/generic/vector.h>
#include <util/system/unaligned_mem.h>

namespace NYql {
namespace NUdf {

class TInputBuffer {
public:
    TInputBuffer(TStringBuf buf)
        : Buf_(buf)
    {}

    char PopChar() {
        Ensure(1);
        char c = Buf_.data()[Pos_];
        ++Pos_;
        return c;
    }

    template <typename T>
    void PopNumber(T& result) {
        result = PopNumber<T>();
    }

    template <typename T>
    T PopNumber() {
        Ensure(sizeof(T));
        T t = ReadUnaligned<T>(Buf_.data() + Pos_);
        Pos_ += sizeof(T);
        return t;
    }

    std::string_view PopString() {
        ui32 size = PopNumber<ui32>();
        Ensure(size);
        std::string_view result(Buf_.data() + Pos_, size);
        Pos_ += size;
        return result;
    }

private:
    void Ensure(size_t delta) {
        Y_ENSURE(Pos_ + delta <= Buf_.size(), "Unexpected end of buffer");
    }

private:
    size_t Pos_ = 0;
    TStringBuf Buf_;
};

class TOutputBuffer {
public:
    void PushChar(char c) {
        Ensure(1);
        Vec_[Pos_] = c;
        ++Pos_;
    }

    template <typename T>
    void PushNumber(T t) {
        Ensure(sizeof(T));
        WriteUnaligned<T>(Vec_.data() + Pos_, t);
        Pos_ += sizeof(T);
    }

    void PushString(std::string_view data) {
        Ensure(sizeof(ui32) + data.size());
        *(ui32*)&Vec_[Pos_] = data.size();
        Pos_ += sizeof(ui32);
        std::memcpy(Vec_.data() + Pos_, data.data(), data.size());
        Pos_ += data.size();
    }

    // fill with zeros
    void Resize(size_t size) {
        Pos_ = 0;
        Vec_.clear();
        Vec_.resize(size);
    }

    void Rewind() {
        Pos_ = 0;
    }

    TStringBuf Finish() const {
        return TStringBuf(Vec_.data(), Vec_.data() + Pos_);
    }

private:
    void Ensure(size_t delta) {
        if (Pos_ + delta > Vec_.size()) {
            if (Pos_ + delta > Vec_.capacity()) {
                Vec_.reserve(Max(2 * Vec_.capacity(), Pos_ + delta));
            }
            // TODO: replace TVector - resize() performs unneeded zeroing here
            Vec_.resize(Pos_ + delta);
        }
    }

private:
    size_t Pos_ = 0;
    TVector<char> Vec_;
};



}
}