aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_node_pack.h
blob: 6e79b1a1a33274f2a8e72df1e184fe7c3f4ad12e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#pragma once

#include "mkql_computation_node.h"
#include "mkql_computation_node_holders.h"
#include "mkql_optional_usage_mask.h"
#include "mkql_block_transport.h"
#include "mkql_block_reader.h"

#include <yql/essentials/minikql/mkql_buffer.h>
#include <yql/essentials/public/udf/udf_value.h>

#include <library/cpp/enumbitset/enumbitset.h>
#include <yql/essentials/utils/chunked_buffer.h>

#include <util/stream/output.h>
#include <util/generic/buffer.h>
#include <util/generic/strbuf.h>

#include <utility>

namespace NKikimr {
namespace NMiniKQL {

namespace NDetails {

enum EPackProps {
  Begin,
  UseOptionalMask = Begin,
  UseTopLength,
  SingleOptional,
  End
};

using TPackProperties = TEnumBitSet<EPackProps, EPackProps::Begin, EPackProps::End>;

struct TPackerState {
    explicit TPackerState(TPackProperties&& properties)
        : Properties(std::move(properties))
        , OptionalMaskReserve(Properties.Test(EPackProps::UseOptionalMask) ? 1 : 0)
    {
    }

    const TPackProperties Properties;

    TPlainContainerCache TopStruct;
    TVector<TVector<std::pair<NUdf::TUnboxedValue, NUdf::TUnboxedValue>>> DictBuffers;
    TVector<TVector<std::tuple<NUdf::TUnboxedValue, NUdf::TUnboxedValue, NUdf::TUnboxedValue>>> EncodedDictBuffers;
    size_t OptionalMaskReserve;
    NDetails::TOptionalUsageMask OptionalUsageMask;
};

} // namespace NDetails

template<bool Fast>
class TValuePackerGeneric {
public:
    using TSelf = TValuePackerGeneric<Fast>;

    TValuePackerGeneric(bool stable, const TType *type);

    // reference is valid till the next call to Pack()
    TStringBuf Pack(const NUdf::TUnboxedValuePod& value) const;
    NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const;

private:
    const bool Stable_;
    const TType* const Type_;
    // TODO: real thread safety with external state
    mutable TBuffer Buffer_;
    mutable NDetails::TPackerState State_;
};

template<bool Fast>
class TValuePackerTransport {
public:
    using TSelf = TValuePackerTransport<Fast>;

    explicit TValuePackerTransport(const TType* type, arrow::MemoryPool* pool = nullptr);
    // for compatibility with TValuePackerGeneric - stable packing is not supported
    TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* ppol = nullptr);

    // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout
    TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
    TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count);
    size_t PackedSizeEstimate() const {
        return IsBlock_ ? BlockBuffer_.Size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0);
    }

    bool IsEmpty() const {
        return !ItemCount_;
    }

    bool IsBlock() const {
        return IsBlock_;
    }

    void Clear();
    NYql::TChunkedBuffer Finish();

    // Pack()/Unpack() will pack/unpack single value of type T
    NYql::TChunkedBuffer Pack(const NUdf::TUnboxedValuePod& value) const;
    NUdf::TUnboxedValue Unpack(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory) const;
    void UnpackBatch(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
private:
    void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const;
    void StartPack();

    void InitBlocks();
    TSelf& AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 count);
    NYql::TChunkedBuffer FinishBlocks();
    void UnpackBatchBlocks(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;

    const TType* const Type_;
    ui64 ItemCount_ = 0;
    TPagedBuffer::TPtr Buffer_;
    mutable NDetails::TPackerState State_;
    mutable NDetails::TPackerState IncrementalState_;

    arrow::MemoryPool& ArrowPool_;
    bool IsBlock_ = false;
    bool IsLegacyBlock_ = false;
    ui32 BlockLenIndex_ = 0;

    TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_;
    TVector<std::unique_ptr<IBlockReader>> BlockReaders_;
    TVector<std::shared_ptr<arrow::ArrayData>> ConvertedScalars_;
    NYql::TChunkedBuffer BlockBuffer_;

    TVector<std::unique_ptr<IBlockDeserializer>> BlockDeserializers_;
};

using TValuePacker = TValuePackerGeneric<false>;

class TValuePackerBoxed : public TComputationValue<TValuePackerBoxed>, public TValuePacker {
    typedef TComputationValue<TValuePackerBoxed> TBase;
public:
    TValuePackerBoxed(TMemoryUsageInfo* memInfo, bool stable, const TType* type);
};

}
}