aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_spiller_adapter.h
blob: 8ddcfe46be4ac354166bc27c97851bee45a5c93c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#pragma once
#include "mkql_spiller.h"
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>


namespace NKikimr::NMiniKQL {

///Stores and loads very long sequences of TMultiType UVs
///Can split sequences into chunks
///Sends chunks to ISplitter and keeps assigned keys
///When all data is written switches to read mode. Switching back to writing mode is not supported
///Provides an interface for sequential read (like forward iterator)
///When interaction with ISpiller is required, Write and Read operations return a Future
class TWideUnboxedValuesSpillerAdapter {
public:
    TWideUnboxedValuesSpillerAdapter(ISpiller::TPtr spiller, const TMultiType* type, size_t sizeLimit)
        : Spiller(spiller)
        , ItemType(type)
        , SizeLimit(sizeLimit)
        , Packer(type)
    {
    }

    /// Write wide UV item
    /// \returns
    ///  - nullopt, if thee values are accumulated
    ///  - TFeature, if the values are being stored asynchronously and a caller must wait until async operation ends
    ///    In this case a caller must wait operation completion and call StoreCompleted.
    ///    Design note: not using Subscribe on a Future here to avoid possible race condition
    std::optional<NThreading::TFuture<ISpiller::TKey>> WriteWideItem(const TArrayRef<NUdf::TUnboxedValuePod>& wideItem) {
        Packer.AddWideItem(wideItem.data(), wideItem.size());
        if (Packer.PackedSizeEstimate() > SizeLimit) {
            return Spiller->Put(std::move(Packer.Finish()));
        } else {
           return std::nullopt;
        }
    }

    std::optional<NThreading::TFuture<ISpiller::TKey>> FinishWriting() {
        if (Packer.IsEmpty())
            return std::nullopt;
        return Spiller->Put(std::move(Packer.Finish()));
    }

    void AsyncWriteCompleted(ISpiller::TKey key) {
        StoredChunks.push_back(key);
    }

    //Extracting interface
    bool Empty() const {
       return StoredChunks.empty() && !CurrentBatch;
    }
    std::optional<NThreading::TFuture<std::optional<NYql::TChunkedBuffer>>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) {
        MKQL_ENSURE(!Empty(), "Internal logic error");
        if (CurrentBatch) {
            auto row = CurrentBatch->Head();
            for (size_t i = 0; i != wideItem.size(); ++i) {
                wideItem[i] = row[i];
            }
            CurrentBatch->Pop();
            if (CurrentBatch->empty()) {
                CurrentBatch = std::nullopt;
            }
            return std::nullopt;
        } else {
            auto r = Spiller->Get(StoredChunks.front());
            StoredChunks.pop_front();
            return r;
        }
    }

    void AsyncReadCompleted(NYql::TChunkedBuffer&& rope,const THolderFactory& holderFactory ) {
        //Implementation detail: deserialization is performed in a processing thread
        TUnboxedValueBatch batch(ItemType);
        Packer.UnpackBatch(std::move(rope), holderFactory, batch);
        CurrentBatch = std::move(batch);
    }

private:
    ISpiller::TPtr Spiller;
    const TMultiType* const ItemType;
    const size_t SizeLimit;
    TValuePackerTransport<false> Packer;
    std::deque<ISpiller::TKey> StoredChunks;
    std::optional<TUnboxedValueBatch> CurrentBatch;
};

}//namespace NKikimr::NMiniKQL