1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
#include "mkql_squeeze_state.h"
#include "mkql_saveload.h"
#include <yql/essentials/minikql/mkql_string_util.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
namespace NKikimr {
namespace NMiniKQL {
constexpr ui32 StateVersion = 1;
TSqueezeState::TSqueezeState(
IComputationExternalNode* item,
IComputationExternalNode* state,
IComputationNode* outSwitch,
IComputationNode* initState,
IComputationNode* updateState,
IComputationExternalNode* inSave,
IComputationNode* outSave,
IComputationExternalNode* inLoad,
IComputationNode* outLoad,
const TType* stateType
)
: Item(item)
, State(state)
, Switch(outSwitch)
, InitState(initState)
, UpdateState(updateState)
, InSave(inSave)
, OutSave(outSave)
, InLoad(inLoad)
, OutLoad(outLoad)
, StateType(stateType)
{}
TSqueezeState::TSqueezeState(const TSqueezeState& state)
: Item(state.Item)
, State(state.State)
, Switch(state.Switch)
, InitState(state.InitState)
, UpdateState(state.UpdateState)
, InSave(state.InSave)
, OutSave(state.OutSave)
, InLoad(state.InLoad)
, OutLoad(state.OutLoad)
, StateType(state.StateType)
{}
NUdf::TUnboxedValue TSqueezeState::Save(TComputationContext& ctx) const {
TOutputSerializer out(EMkqlStateType::SIMPLE_BLOB, StateVersion, ctx);
out.Write(static_cast<ui8>(Stage));
if (ESqueezeState::Work == Stage) {
InSave->SetValue(ctx, State->GetValue(ctx));
out.WriteUnboxedValue(GetPacker(), OutSave->GetValue(ctx));
}
return out.MakeState();
}
void TSqueezeState::Load(TComputationContext& ctx, const NUdf::TStringRef& state) {
TInputSerializer in(state, EMkqlStateType::SIMPLE_BLOB);
const auto loadStateVersion = in.GetStateVersion();
if (loadStateVersion != StateVersion) {
THROW yexception() << "Invalid state version " << loadStateVersion;
}
Stage = static_cast<ESqueezeState>(in.Read<ui8>());
if (ESqueezeState::Work == Stage) {
InLoad->SetValue(ctx, in.ReadUnboxedValue(GetPacker(), ctx));
State->SetValue(ctx, OutLoad->GetValue(ctx));
}
}
const TValuePacker& TSqueezeState::GetPacker() const {
if (!Packer && StateType)
Packer = MakeHolder<TValuePacker>(false, StateType);
return *Packer;
}
TSqueezeCodegenValue::TSqueezeCodegenValue(TMemoryUsageInfo* memInfo, const TSqueezeState& state, TFetchPtr fetch, TComputationContext& ctx, NUdf::TUnboxedValue&& stream)
: TBase(memInfo)
, FetchFunc(fetch)
, Stream(std::move(stream))
, Ctx(ctx)
, State(state)
{}
ui32 TSqueezeCodegenValue::GetTraverseCount() const {
return 1U;
}
NUdf::TUnboxedValue TSqueezeCodegenValue::GetTraverseItem(ui32) const {
return Stream;
}
NUdf::TUnboxedValue TSqueezeCodegenValue::Save() const {
return State.Save(Ctx);
}
void TSqueezeCodegenValue::Load(const NUdf::TStringRef& state) {
State.Load(Ctx, state);
}
NUdf::EFetchStatus TSqueezeCodegenValue::Fetch(NUdf::TUnboxedValue& result) {
if (ESqueezeState::Finished == State.Stage)
return NUdf::EFetchStatus::Finish;
return FetchFunc(&Ctx, static_cast<const NUdf::TUnboxedValuePod&>(Stream), result, State.Stage);
}
}
}
|