summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes/mkql_source.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2024-11-07 04:19:26 +0300
committervvvv <[email protected]>2024-11-07 04:29:50 +0300
commit2661be00f3bc47590fda9218bf0386d6355c8c88 (patch)
tree3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/comp_nodes/mkql_source.cpp
parentcf2a23963ac10add28c50cc114fbf48953eca5aa (diff)
Moved yql/minikql YQL-19206
init [nodiff:caesar] commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/comp_nodes/mkql_source.cpp')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_source.cpp83
1 files changed, 83 insertions, 0 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_source.cpp b/yql/essentials/minikql/comp_nodes/mkql_source.cpp
new file mode 100644
index 00000000000..825e71b0b21
--- /dev/null
+++ b/yql/essentials/minikql/comp_nodes/mkql_source.cpp
@@ -0,0 +1,83 @@
+#include "mkql_source.h"
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
+#include <yql/essentials/minikql/mkql_node_cast.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+class TSourceOfWrapper : public TMutableComputationNode<TSourceOfWrapper> {
+ typedef TMutableComputationNode<TSourceOfWrapper> TBaseComputation;
+private:
+ class TValue : public TComputationValue<TValue> {
+ public:
+ TValue(TMemoryUsageInfo* memInfo)
+ : TComputationValue<TValue>(memInfo)
+ {}
+
+ private:
+ ui32 GetTraverseCount() const override { return 0U; }
+
+ NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
+ result = NUdf::TUnboxedValuePod();
+ return NUdf::EFetchStatus::Ok;
+ }
+ };
+
+public:
+ TSourceOfWrapper(TComputationMutables& mutables)
+ : TBaseComputation(mutables)
+ {}
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TValue>();
+ }
+
+private:
+ void RegisterDependencies() const final {}
+};
+
+class TSourceWrapper : public TStatelessWideFlowCodegeneratorNode<TSourceWrapper> {
+using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TSourceWrapper>;
+public:
+ TSourceWrapper()
+ : TStatelessWideFlowCodegeneratorNode<TSourceWrapper>(nullptr)
+ {}
+
+ EFetchResult DoCalculate(TComputationContext&, NUdf::TUnboxedValue*const*) const {
+ return EFetchResult::One;
+ }
+#ifndef MKQL_DISABLE_CODEGEN
+ TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*&) const {
+ return {ConstantInt::get(Type::getInt32Ty(ctx.Codegen.GetContext()), static_cast<i32>(EFetchResult::One)), {}};
+ }
+#endif
+private:
+ void RegisterDependencies() const final {}
+};
+
+}
+
+IComputationNode* WrapSourceOf(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args.");
+ const auto type = callable.GetType()->GetReturnType();
+ if (type->IsFlow()) {
+ return ctx.NodeFactory.CreateImmutableNode(NUdf::TUnboxedValuePod());
+ } else if (type->IsStream()) {
+ return new TSourceOfWrapper(ctx.Mutables);
+ }
+
+ THROW yexception() << "Expected flow or stream.";
+}
+
+IComputationNode* WrapSource(TCallable& callable, const TComputationNodeFactoryContext&) {
+ MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args.");
+ MKQL_ENSURE(!GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())), "Expected zero width of output flow.");
+ return new TSourceWrapper;
+}
+
+}
+}