aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes/mkql_squeeze_state.cpp
blob: 310b4a63bf77870ce7bfb9977a3c69ff92757ca5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include "mkql_squeeze_state.h"
#include "mkql_saveload.h"

#include <yql/essentials/minikql/mkql_string_util.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>

namespace NKikimr {
namespace NMiniKQL {

constexpr ui32 StateVersion = 1;

TSqueezeState::TSqueezeState(
    IComputationExternalNode* item,
    IComputationExternalNode* state,
    IComputationNode* outSwitch,
    IComputationNode* initState,
    IComputationNode* updateState,
    IComputationExternalNode* inSave,
    IComputationNode* outSave,
    IComputationExternalNode* inLoad,
    IComputationNode* outLoad,
    const TType* stateType
)
    : Item(item)
    , State(state)
    , Switch(outSwitch)
    , InitState(initState)
    , UpdateState(updateState)
    , InSave(inSave)
    , OutSave(outSave)
    , InLoad(inLoad)
    , OutLoad(outLoad)
    , StateType(stateType)
{}

TSqueezeState::TSqueezeState(const TSqueezeState& state)
    : Item(state.Item)
    , State(state.State)
    , Switch(state.Switch)
    , InitState(state.InitState)
    , UpdateState(state.UpdateState)
    , InSave(state.InSave)
    , OutSave(state.OutSave)
    , InLoad(state.InLoad)
    , OutLoad(state.OutLoad)
    , StateType(state.StateType)
{}

NUdf::TUnboxedValue TSqueezeState::Save(TComputationContext& ctx) const {
    TOutputSerializer out(EMkqlStateType::SIMPLE_BLOB, StateVersion, ctx);
    out.Write(static_cast<ui8>(Stage));
    if (ESqueezeState::Work == Stage) {
        InSave->SetValue(ctx, State->GetValue(ctx));
        out.WriteUnboxedValue(GetPacker(), OutSave->GetValue(ctx));
    }
    return out.MakeState();
}

void TSqueezeState::Load(TComputationContext& ctx, const NUdf::TStringRef& state) {
    TInputSerializer in(state, EMkqlStateType::SIMPLE_BLOB);

    const auto loadStateVersion = in.GetStateVersion();
    if (loadStateVersion != StateVersion) {
        THROW yexception() << "Invalid state version " << loadStateVersion;
    }

    Stage = static_cast<ESqueezeState>(in.Read<ui8>());
    if (ESqueezeState::Work == Stage) {
        InLoad->SetValue(ctx, in.ReadUnboxedValue(GetPacker(), ctx));
        State->SetValue(ctx, OutLoad->GetValue(ctx));
    }
}

const TValuePacker& TSqueezeState::GetPacker() const {
    if (!Packer && StateType)
        Packer = MakeHolder<TValuePacker>(false, StateType);
    return *Packer;
}

TSqueezeCodegenValue::TSqueezeCodegenValue(TMemoryUsageInfo* memInfo, const TSqueezeState& state, TFetchPtr fetch, TComputationContext& ctx, NUdf::TUnboxedValue&& stream)
    : TBase(memInfo)
    , FetchFunc(fetch)
    , Stream(std::move(stream))
    , Ctx(ctx)
    , State(state)
{}

ui32 TSqueezeCodegenValue::GetTraverseCount() const {
    return 1U;
}

NUdf::TUnboxedValue TSqueezeCodegenValue::GetTraverseItem(ui32) const {
    return Stream;
}

NUdf::TUnboxedValue TSqueezeCodegenValue::Save() const {
    return State.Save(Ctx);
}

void TSqueezeCodegenValue::Load(const NUdf::TStringRef& state) {
    State.Load(Ctx, state);
}

NUdf::EFetchStatus TSqueezeCodegenValue::Fetch(NUdf::TUnboxedValue& result) {
    if (ESqueezeState::Finished == State.Stage)
        return NUdf::EFetchStatus::Finish;
    return FetchFunc(&Ctx, static_cast<const NUdf::TUnboxedValuePod&>(Stream), result, State.Stage);
}

}
}