1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
#include "mkql_linear.h"
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
namespace NKikimr {
namespace NMiniKQL {
using namespace NYql::NUdf;
namespace {
class TToDynamicLinearWrapper : public TMutableComputationNode<TToDynamicLinearWrapper> {
using TSelf = TToDynamicLinearWrapper;
using TBase = TMutableComputationNode<TSelf>;
typedef TBase TBaseComputation;
public:
class TValue : public TComputationValue<TValue> {
public:
TValue(TMemoryUsageInfo* memInfo, TUnboxedValue&& value)
: TComputationValue(memInfo)
, Value_(std::move(value))
, Consumed_(false)
{}
private:
bool Next(NUdf::TUnboxedValue& result) override {
if (Consumed_) {
return false;
}
result = std::move(Value_);
Consumed_ = true;
return true;
}
TUnboxedValue Value_;
bool Consumed_;
};
TToDynamicLinearWrapper(TComputationMutables& mutables, IComputationNode* source)
: TBaseComputation(mutables)
, Source_(source)
{
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
TUnboxedValue input = Source_->GetValue(ctx);
return ctx.HolderFactory.Create<TValue>(std::move(input));
}
private:
void RegisterDependencies() const final {
this->DependsOn(Source_);
}
IComputationNode* const Source_;
};
class TFromDynamicLinearWrapper : public TMutableComputationNode<TFromDynamicLinearWrapper> {
using TSelf = TFromDynamicLinearWrapper;
using TBase = TMutableComputationNode<TSelf>;
typedef TBase TBaseComputation;
public:
TFromDynamicLinearWrapper(TComputationMutables& mutables, IComputationNode* source, const TSourcePosition& pos)
: TBaseComputation(mutables)
, Source_(source)
, Pos_(pos)
{
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
TUnboxedValue input = Source_->GetValue(ctx);
TUnboxedValue result;
if (input.Next(result)) {
return result.Release();
}
TStringBuilder res;
res << Pos_ << " The linear value has already been used";
UdfTerminate(res.c_str());
}
private:
void RegisterDependencies() const final {
this->DependsOn(Source_);
}
IComputationNode* const Source_;
const TSourcePosition Pos_;
};
} // namespace
IComputationNode* WrapToDynamicLinear(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expecting exactly one argument");
auto source = LocateNode(ctx.NodeLocator, callable, 0);
return new TToDynamicLinearWrapper(ctx.Mutables, source);
}
IComputationNode* WrapFromDynamicLinear(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 4, "Expecting exactly 4 arguments");
auto source = LocateNode(ctx.NodeLocator, callable, 0);
const TStringBuf file = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().AsStringRef();
const ui32 row = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui32>();
const ui32 column = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui32>();
return new TFromDynamicLinearWrapper(ctx.Mutables, source, NUdf::TSourcePosition(row, column, file));
}
}
}
|