summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes/mkql_linear.cpp
blob: 776b669f623058ea7dcec4380558c5544bad2bd4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include "mkql_linear.h"
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/mkql_node_cast.h>

namespace NKikimr {
namespace NMiniKQL {

using namespace NYql::NUdf;

namespace {

class TToDynamicLinearWrapper : public TMutableComputationNode<TToDynamicLinearWrapper> {
    using TSelf = TToDynamicLinearWrapper;
    using TBase = TMutableComputationNode<TSelf>;
    typedef TBase TBaseComputation;
public:
    class TValue : public TComputationValue<TValue> {
    public:
        TValue(TMemoryUsageInfo* memInfo, TUnboxedValue&& value)
            : TComputationValue(memInfo)
            , Value_(std::move(value))
            , Consumed_(false)
        {}

    private:
        bool Next(NUdf::TUnboxedValue& result) override {
            if (Consumed_) {
                return false;
            }

            result = std::move(Value_);
            Consumed_ = true;
            return true;
        }

        TUnboxedValue Value_;
        bool Consumed_;
    };

    TToDynamicLinearWrapper(TComputationMutables& mutables, IComputationNode* source)
        : TBaseComputation(mutables)
        , Source_(source)
    {
    }

    NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
        TUnboxedValue input = Source_->GetValue(ctx);
        return ctx.HolderFactory.Create<TValue>(std::move(input));
    }

private:
    void RegisterDependencies() const final {
        this->DependsOn(Source_);
    }

    IComputationNode* const Source_;
};

class TFromDynamicLinearWrapper : public TMutableComputationNode<TFromDynamicLinearWrapper> {
    using TSelf = TFromDynamicLinearWrapper;
    using TBase = TMutableComputationNode<TSelf>;
    typedef TBase TBaseComputation;
public:
    TFromDynamicLinearWrapper(TComputationMutables& mutables, IComputationNode* source, const TSourcePosition& pos)
        : TBaseComputation(mutables)
        , Source_(source)
        , Pos_(pos)
    {
    }

    NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
        TUnboxedValue input = Source_->GetValue(ctx);
        TUnboxedValue result;
        if (input.Next(result)) {
            return result.Release();
        }

        TStringBuilder res;
        res << Pos_ << " The linear value has already been used";
        UdfTerminate(res.c_str());
    }

private:
    void RegisterDependencies() const final {
        this->DependsOn(Source_);
    }

    IComputationNode* const Source_;
    const TSourcePosition Pos_;
};

} // namespace

IComputationNode* WrapToDynamicLinear(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
    MKQL_ENSURE(callable.GetInputsCount() == 1, "Expecting exactly one argument");
    auto source = LocateNode(ctx.NodeLocator, callable, 0);
    return new TToDynamicLinearWrapper(ctx.Mutables, source);
}

IComputationNode* WrapFromDynamicLinear(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
    MKQL_ENSURE(callable.GetInputsCount() == 4, "Expecting exactly 4 arguments");
    auto source = LocateNode(ctx.NodeLocator, callable, 0);
    const TStringBuf file = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().AsStringRef();
    const ui32 row = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui32>();
    const ui32 column = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui32>();
    return new TFromDynamicLinearWrapper(ctx.Mutables, source, NUdf::TSourcePosition(row, column, file));
}

}
}