aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/public/purecalc/common/worker.h
blob: 07b8dfa2e79e362cf497d72358254b7715e4d525 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#pragma once

#include <yql/essentials/public/purecalc/common/interface.h>

#include <yql/essentials/public/udf/udf_value.h>
#include <yql/essentials/ast/yql_expr.h>
#include <yql/essentials/core/yql_user_data.h>
#include <yql/essentials/minikql/mkql_alloc.h>
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/minikql/mkql_node_visitor.h>
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
#include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>

#include <memory>

namespace NYql {
    namespace NPureCalc {
        struct TWorkerGraph {
            TWorkerGraph(
                const TExprNode::TPtr& exprRoot,
                TExprContext& exprCtx,
                const TString& serializedProgram,
                const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
                const TUserDataTable& userData,
                const TVector<const TStructExprType*>& inputTypes,
                const TVector<const TStructExprType*>& originalInputTypes,
                const TVector<const TStructExprType*>& rawInputTypes,
                const TTypeAnnotationNode* outputType,
                const TTypeAnnotationNode* rawOutputType,
                const TString& LLVMSettings,
                NKikimr::NUdf::ICountersProvider* countersProvider,
                ui64 nativeYtTypeFlags,
                TMaybe<ui64> deterministicTimeProviderSeed
            );

            ~TWorkerGraph();

            NKikimr::NMiniKQL::TScopedAlloc ScopedAlloc_;
            NKikimr::NMiniKQL::TTypeEnvironment Env_;
            const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry_;
            TIntrusivePtr<IRandomProvider> RandomProvider_;
            TIntrusivePtr<ITimeProvider> TimeProvider_;
            NKikimr::NMiniKQL::IComputationPattern::TPtr ComputationPattern_;
            THolder<NKikimr::NMiniKQL::IComputationGraph> ComputationGraph_;
            TString LLVMSettings_;
            ui64 NativeYtTypeFlags_;
            TMaybe<TString> TimestampColumn_;
            const NKikimr::NMiniKQL::TType* OutputType_;
            const NKikimr::NMiniKQL::TType* RawOutputType_;
            TVector<NKikimr::NMiniKQL::IComputationExternalNode*> SelfNodes_;
            TVector<const NKikimr::NMiniKQL::TStructType*> InputTypes_;
            TVector<const NKikimr::NMiniKQL::TStructType*> OriginalInputTypes_;
            TVector<const NKikimr::NMiniKQL::TStructType*> RawInputTypes_;
        };

        template <typename TBase>
        class TWorker: public TBase {
        public:
            using TWorkerFactoryPtr = std::weak_ptr<IWorkerFactory>;
        private:
            // Worker factory implementation should stay alive for this worker to operate correctly.
            TWorkerFactoryPtr WorkerFactory_;

        protected:
            TWorkerGraph Graph_;

        public:
            TWorker(
                TWorkerFactoryPtr factory,
                const TExprNode::TPtr& exprRoot,
                TExprContext& exprCtx,
                const TString& serializedProgram,
                const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
                const TUserDataTable& userData,
                const TVector<const TStructExprType*>& inputTypes,
                const TVector<const TStructExprType*>& originalInputTypes,
                const TVector<const TStructExprType*>& rawInputTypes,
                const TTypeAnnotationNode* outputType,
                const TTypeAnnotationNode* rawOutputType,
                const TString& LLVMSettings,
                NKikimr::NUdf::ICountersProvider* countersProvider,
                ui64 nativeYtTypeFlags,
                TMaybe<ui64> deterministicTimeProviderSeed
            );

        public:
            ui32 GetInputsCount() const override;
            const NKikimr::NMiniKQL::TStructType* GetInputType(ui32, bool) const override;
            const NKikimr::NMiniKQL::TStructType* GetInputType(bool) const override;
            const NKikimr::NMiniKQL::TStructType* GetRawInputType(ui32) const override;
            const NKikimr::NMiniKQL::TStructType* GetRawInputType() const override;
            const NKikimr::NMiniKQL::TType* GetOutputType() const override;
            const NKikimr::NMiniKQL::TType* GetRawOutputType() const override;
            NYT::TNode MakeInputSchema() const override;
            NYT::TNode MakeInputSchema(ui32) const override;
            NYT::TNode MakeOutputSchema() const override;
            NYT::TNode MakeOutputSchema(ui32) const override;
            NYT::TNode MakeOutputSchema(TStringBuf) const override;
            NYT::TNode MakeFullOutputSchema() const override;
            NKikimr::NMiniKQL::TScopedAlloc& GetScopedAlloc() override;
            NKikimr::NMiniKQL::IComputationGraph& GetGraph() override;
            const NKikimr::NMiniKQL::IFunctionRegistry& GetFunctionRegistry() const override;
            NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnvironment() override;
            const TString& GetLLVMSettings() const override;
            ui64 GetNativeYtTypeFlags() const override;
            ITimeProvider* GetTimeProvider() const override;
        protected:
            void Release() override;
        };

        class TPullStreamWorker final: public TWorker<IPullStreamWorker> {
        private:
            NKikimr::NUdf::TUnboxedValue Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
            TVector<bool> HasInput_;

            inline bool CheckAllInputsSet() {
                return AllOf(HasInput_, [](bool x) { return x; });
            }

        public:
            using TWorker::TWorker;
            ~TPullStreamWorker();

        public:
            void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) override;
            NKikimr::NUdf::TUnboxedValue& GetOutput() override;

        protected:
            void Release() override;
        };

        class TPullListWorker final: public TWorker<IPullListWorker> {
        private:
            NKikimr::NUdf::TUnboxedValue Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
            NKikimr::NUdf::TUnboxedValue OutputIterator_ = NKikimr::NUdf::TUnboxedValue::Invalid();
            TVector<bool> HasInput_;

            inline bool CheckAllInputsSet() {
                return AllOf(HasInput_, [](bool x) { return x; });
            }

        public:
            using TWorker::TWorker;
            ~TPullListWorker();

        public:
            void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) override;
            NKikimr::NUdf::TUnboxedValue& GetOutput() override;
            NKikimr::NUdf::TUnboxedValue& GetOutputIterator() override;
            void ResetOutputIterator() override;

        protected:
            void Release() override;
        };

        class TPushStreamWorker final: public TWorker<IPushStreamWorker> {
        private:
            THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>> Consumer_{};
            bool Finished_ = false;
            NKikimr::NMiniKQL::IComputationExternalNode* SelfNode_ = nullptr;

        public:
            using TWorker::TWorker;

        private:
            void FeedToConsumer();
            NYql::NUdf::IBoxedValue* GetPushStream() const;

        public:
            void SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>>) override;
            void Push(NKikimr::NUdf::TUnboxedValue&&) override;
            void OnFinish() override;

        protected:
            void Release() override;
        };
    }
}