diff options
author | vvvv <[email protected]> | 2024-11-07 04:19:26 +0300 |
---|---|---|
committer | vvvv <[email protected]> | 2024-11-07 04:29:50 +0300 |
commit | 2661be00f3bc47590fda9218bf0386d6355c8c88 (patch) | |
tree | 3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/comp_nodes/mkql_source.cpp | |
parent | cf2a23963ac10add28c50cc114fbf48953eca5aa (diff) |
Moved yql/minikql YQL-19206
init
[nodiff:caesar]
commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/comp_nodes/mkql_source.cpp')
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_source.cpp | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_source.cpp b/yql/essentials/minikql/comp_nodes/mkql_source.cpp new file mode 100644 index 00000000000..825e71b0b21 --- /dev/null +++ b/yql/essentials/minikql/comp_nodes/mkql_source.cpp @@ -0,0 +1,83 @@ +#include "mkql_source.h" +#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> +#include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE +#include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h> +#include <yql/essentials/minikql/mkql_node_cast.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +class TSourceOfWrapper : public TMutableComputationNode<TSourceOfWrapper> { + typedef TMutableComputationNode<TSourceOfWrapper> TBaseComputation; +private: + class TValue : public TComputationValue<TValue> { + public: + TValue(TMemoryUsageInfo* memInfo) + : TComputationValue<TValue>(memInfo) + {} + + private: + ui32 GetTraverseCount() const override { return 0U; } + + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override { + result = NUdf::TUnboxedValuePod(); + return NUdf::EFetchStatus::Ok; + } + }; + +public: + TSourceOfWrapper(TComputationMutables& mutables) + : TBaseComputation(mutables) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TValue>(); + } + +private: + void RegisterDependencies() const final {} +}; + +class TSourceWrapper : public TStatelessWideFlowCodegeneratorNode<TSourceWrapper> { +using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TSourceWrapper>; +public: + TSourceWrapper() + : TStatelessWideFlowCodegeneratorNode<TSourceWrapper>(nullptr) + {} + + EFetchResult DoCalculate(TComputationContext&, NUdf::TUnboxedValue*const*) const { + return EFetchResult::One; + } +#ifndef MKQL_DISABLE_CODEGEN + TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*&) const { + return {ConstantInt::get(Type::getInt32Ty(ctx.Codegen.GetContext()), static_cast<i32>(EFetchResult::One)), {}}; + } +#endif +private: + void RegisterDependencies() const final {} +}; + +} + +IComputationNode* WrapSourceOf(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args."); + const auto type = callable.GetType()->GetReturnType(); + if (type->IsFlow()) { + return ctx.NodeFactory.CreateImmutableNode(NUdf::TUnboxedValuePod()); + } else if (type->IsStream()) { + return new TSourceOfWrapper(ctx.Mutables); + } + + THROW yexception() << "Expected flow or stream."; +} + +IComputationNode* WrapSource(TCallable& callable, const TComputationNodeFactoryContext&) { + MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args."); + MKQL_ENSURE(!GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())), "Expected zero width of output flow."); + return new TSourceWrapper; +} + +} +} |