diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:19:26 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:29:50 +0300 |
commit | 2661be00f3bc47590fda9218bf0386d6355c8c88 (patch) | |
tree | 3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/computation/mkql_computation_node_impl.h | |
parent | cf2a23963ac10add28c50cc114fbf48953eca5aa (diff) | |
download | ydb-2661be00f3bc47590fda9218bf0386d6355c8c88.tar.gz |
Moved yql/minikql YQL-19206
init
[nodiff:caesar]
commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_impl.h')
-rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_impl.h | 1058 |
1 files changed, 1058 insertions, 0 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.h b/yql/essentials/minikql/computation/mkql_computation_node_impl.h new file mode 100644 index 0000000000..92e2c76c6e --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.h @@ -0,0 +1,1058 @@ +#pragma once + +#include "mkql_computation_node.h" + +#include <yql/essentials/minikql/mkql_alloc.h> +#include <yql/essentials/public/udf/udf_value.h> + +#include <util/system/type_name.h> + +namespace NKikimr { +namespace NMiniKQL { + +enum class EDictType { + Sorted, + Hashed +}; + +enum class EContainerOptMode { + NoOpt, + OptNoAdd, + OptAdd +}; + +template <class IComputationNodeInterface> +class TRefCountedComputationNode : public IComputationNodeInterface { +private: + void Ref() final; + + void UnRef() final; + + ui32 RefCount() const final; + +private: + ui32 Refs_ = 0; +}; + +class TUnboxedImmutableComputationNode: public TRefCountedComputationNode<IComputationNode> +{ +public: + TUnboxedImmutableComputationNode(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& value); + + ~TUnboxedImmutableComputationNode(); + +private: + void InitNode(TComputationContext&) const override {} + + NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final; + + const IComputationNode* GetSource() const final; + + IComputationNode* AddDependence(const IComputationNode*) final; + + void RegisterDependencies() const final; + + ui32 GetIndex() const final; + + void CollectDependentIndexes(const IComputationNode* owner, TIndexesMap&) const final; + + ui32 GetDependencyWeight() const final; + + ui32 GetDependencesCount() const final; + + bool IsTemporaryValue() const final; + + void PrepareStageOne() final; + void PrepareStageTwo() final; + + TString DebugString() const final; + + EValueRepresentation GetRepresentation() const final; + + TMemoryUsageInfo *const MemInfo; +protected: + const NUdf::TUnboxedValue UnboxedValue; + const EValueRepresentation RepresentationKind; +}; + +class TStatefulComputationNodeBase { +protected: + TStatefulComputationNodeBase(ui32 valueIndex, EValueRepresentation kind); + ~TStatefulComputationNodeBase(); + void AddDependenceImpl(const IComputationNode* node); + void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, bool stateless) const; + + TConstComputationNodePtrVector Dependencies; + + const ui32 ValueIndex; + const EValueRepresentation RepresentationKind; +}; + +template <class IComputationNodeInterface, bool SerializableState = false> +class TStatefulComputationNode: public TRefCountedComputationNode<IComputationNodeInterface>, protected TStatefulComputationNodeBase +{ +protected: + TStatefulComputationNode(TComputationMutables& mutables, EValueRepresentation kind); + +protected: + void InitNode(TComputationContext&) const override; + + ui32 GetIndex() const final; + + IComputationNode* AddDependence(const IComputationNode* node) final; + + EValueRepresentation GetRepresentation() const override; + + NUdf::TUnboxedValue& ValueRef(TComputationContext& compCtx) const { + return compCtx.MutableValues[ValueIndex]; + } + +private: + ui32 GetDependencesCount() const final; +}; + +class TExternalComputationNode: public TStatefulComputationNode<IComputationExternalNode> +{ +public: + TExternalComputationNode(TComputationMutables& mutables, EValueRepresentation kind = EValueRepresentation::Any); + +protected: + + NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override; + + NUdf::TUnboxedValue& RefValue(TComputationContext& compCtx) const override; + void SetValue(TComputationContext& compCtx, NUdf::TUnboxedValue&& value) const override; + + TString DebugString() const final; +private: + ui32 GetDependencyWeight() const final; + + void RegisterDependencies() const final; + + void SetOwner(const IComputationNode* owner) final; + + void PrepareStageOne() final; + void PrepareStageTwo() final; + + void CollectDependentIndexes(const IComputationNode* owner, TIndexesMap& dependencies) const final; + + bool IsTemporaryValue() const final; + + const IComputationNode* Owner = nullptr; + + void SetGetter(TGetter&& getter) final; + + void InvalidateValue(TComputationContext& compCtx) const final; + + const IComputationNode* GetSource() const final; +protected: + std::vector<std::pair<ui32, EValueRepresentation>> InvalidationSet; + TGetter Getter; +}; + +class TStatefulSourceComputationNodeBase { +protected: + TStatefulSourceComputationNodeBase(); + ~TStatefulSourceComputationNodeBase(); + void PrepareStageOneImpl(const TConstComputationNodePtrVector& dependencies); + void AddSource(IComputationNode* source) const; + + mutable std::unordered_set<const IComputationNode*> Sources; // TODO: remove const and mutable. + std::optional<bool> Stateless; +}; + +template <typename TDerived, bool SerializableState = false> +class TStatefulSourceComputationNode: public TStatefulComputationNode<IComputationNode, SerializableState>, + protected TStatefulSourceComputationNodeBase +{ + using TStatefulComputationNode = TStatefulComputationNode<IComputationNode, SerializableState>; +private: + bool IsTemporaryValue() const final { + return *Stateless; + } + + ui32 GetDependencyWeight() const final { + return Sources.size(); + } + + void PrepareStageOne() final { + PrepareStageOneImpl(this->Dependencies); + } + + void PrepareStageTwo() final {} + + void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { + this->CollectDependentIndexesImpl(this, owner, dependencies, *Stateless); + } + + const IComputationNode* GetSource() const final { return this; } +protected: + TStatefulSourceComputationNode(TComputationMutables& mutables, EValueRepresentation kind = EValueRepresentation::Any) + : TStatefulComputationNode(mutables, kind) + {} + + void DependsOn(IComputationNode* node) const { + if (node) { + if (const auto source = node->AddDependence(this)) { + AddSource(source); + } + } + } + + void Own(IComputationExternalNode* node) const { + if (node) { + node->SetOwner(this); + } + } + + TString DebugString() const override { + return TypeName<TDerived>(); + } +}; + +template <typename TDerived> +class TMutableComputationNode: public TStatefulSourceComputationNode<TDerived> { +protected: + using TStatefulSourceComputationNode<TDerived>::TStatefulSourceComputationNode; + + NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override { + if (*this->Stateless) + return static_cast<const TDerived*>(this)->DoCalculate(compCtx); + NUdf::TUnboxedValue& valueRef = this->ValueRef(compCtx); + if (valueRef.IsInvalid()) { + valueRef = static_cast<const TDerived*>(this)->DoCalculate(compCtx); + } + + return valueRef; + } +}; + +template <typename TDerived, typename IFlowInterface> +class TFlowSourceBaseComputationNode: public TStatefulComputationNode<IFlowInterface> +{ + using TBase = TStatefulComputationNode<IFlowInterface>; +protected: + TFlowSourceBaseComputationNode(TComputationMutables& mutables, EValueRepresentation stateKind) + : TBase(mutables, stateKind) + {} + + TString DebugString() const override { + return TypeName<TDerived>(); + } + + void DependsOn(IComputationNode* node) const { + if (node) { + if (const auto source = node->AddDependence(this)) { + Sources.emplace(source); + } + } + } + + void Own(IComputationExternalNode* node) const { + if (node) { + node->SetOwner(this); + } + } +private: + bool IsTemporaryValue() const final { + return true; + } + + ui32 GetDependencyWeight() const final { + return this->Dependencies.size() + Sources.size(); + } + + void CollectDependentIndexes(const IComputationNode* owner, IComputationExternalNode::TIndexesMap& dependencies) const final { + this->CollectDependentIndexesImpl(this, owner, dependencies, false); + } + + void PrepareStageOne() final {} + void PrepareStageTwo() final {} + + const IComputationNode* GetSource() const final { return this; } + + mutable std::unordered_set<const IComputationNode*> Sources; // TODO: remove const and mutable. +}; + +template <typename TDerived> +class TFlowSourceComputationNode: public TFlowSourceBaseComputationNode<TDerived, IComputationNode> +{ + using TBase = TFlowSourceBaseComputationNode<TDerived, IComputationNode>; +protected: + TFlowSourceComputationNode(TComputationMutables& mutables, EValueRepresentation kind, EValueRepresentation stateKind) + : TBase(mutables, stateKind), RepresentationKind(kind) + {} + +private: + EValueRepresentation GetRepresentation() const final { + return RepresentationKind; + } + + NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final { + return static_cast<const TDerived*>(this)->DoCalculate(this->ValueRef(compCtx), compCtx); + } +private: + const EValueRepresentation RepresentationKind; +}; + +template <typename TDerived> +class TWideFlowSourceComputationNode: public TFlowSourceBaseComputationNode<TDerived, IComputationWideFlowNode> +{ + using TBase = TFlowSourceBaseComputationNode<TDerived, IComputationWideFlowNode>; +protected: + TWideFlowSourceComputationNode(TComputationMutables& mutables, EValueRepresentation stateKind) + : TBase(mutables, stateKind) + {} +private: + EValueRepresentation GetRepresentation() const final { + THROW yexception() << "Failed to get representation kind."; + } + + NUdf::TUnboxedValue GetValue(TComputationContext&) const final { + THROW yexception() << "Failed to get value from wide flow node."; + } + + EFetchResult FetchValues(TComputationContext& compCtx, NUdf::TUnboxedValue*const* values) const final { + return static_cast<const TDerived*>(this)->DoCalculate(this->ValueRef(compCtx), compCtx, values); + } +}; + +template <typename TDerived, typename IFlowInterface> +class TFlowBaseComputationNode: public TRefCountedComputationNode<IFlowInterface> +{ +protected: + TFlowBaseComputationNode(const IComputationNode* source) : Source(source) {} + + void InitNode(TComputationContext&) const override {} + + TString DebugString() const override { + return TypeName<TDerived>(); + } + + IComputationNode* FlowDependsOn(IComputationNode* node) const { + if (node) { + if (const auto source = node->AddDependence(this); dynamic_cast<IComputationExternalNode*>(source) || dynamic_cast<IComputationWideFlowProxyNode*>(source)) + return const_cast<IComputationNode*>(static_cast<const IComputationNode*>(this)); // TODO: remove const in RegisterDependencies. + else + return source; + } + return nullptr; + } + + IComputationNode* FlowDependsOnBoth(IComputationNode* one, IComputationNode* two) const { + const auto flowOne = FlowDependsOn(one); + const auto flowTwo = FlowDependsOn(two); + if (flowOne && flowTwo) { + if (flowOne == flowTwo) + return flowOne; + const auto flow = const_cast<IComputationNode*>(static_cast<const IComputationNode*>(this)); + DependsOn(flow, flowOne); + DependsOn(flow, flowTwo); + return flow; + } else if (flowOne) { + return flowOne; + } else if (flowTwo) { + return flowTwo; + } + + return nullptr; + } + + IComputationNode* FlowDependsOnAll(const std::vector<IFlowInterface*, TMKQLAllocator<IFlowInterface*>>& sources) const { + std::unordered_set<IComputationNode*> flows(sources.size()); + for (const auto& source : sources) + if (const auto flow = FlowDependsOn(source)) + flows.emplace(flow); + + if (flows.size() > 1U) { + const auto flow = const_cast<IComputationNode*>(static_cast<const IComputationNode*>(this)); + std::for_each(flows.cbegin(), flows.cend(), std::bind(&TFlowBaseComputationNode::DependsOn, flow, std::placeholders::_1)); + return flow; + } + + return flows.empty() ? nullptr : *flows.cbegin(); + } + + static void DependsOn(IComputationNode* source, IComputationNode* node) { + if (node && source && node != source) { + node->AddDependence(source); + } + } + + static void Own(IComputationNode* source, IComputationExternalNode* node) { + if (node && source && node != source) { + node->SetOwner(source); + } + } + + static void OwnProxy(IComputationNode* source, IComputationWideFlowProxyNode* node) { + if (node && source && node != source) { + node->SetOwner(source); + } + } +private: + ui32 GetDependencyWeight() const final { return 42U; } + + ui32 GetDependencesCount() const final { + return Dependence ? 1U : 0U; + } + + IComputationNode* AddDependence(const IComputationNode* node) final { + if (!Dependence) { + Dependence = node; + } + return this; + } + + bool IsTemporaryValue() const final { + return true; + } + + void PrepareStageOne() final {} + void PrepareStageTwo() final {} + + const IComputationNode* GetSource() const final { + if (Source && Source != this) + if (const auto s = Source->GetSource()) + return s; + return this; + } +protected: + const IComputationNode *const Source; + const IComputationNode *Dependence = nullptr; +}; + +template <typename TDerived> +class TBaseFlowBaseComputationNode: public TFlowBaseComputationNode<TDerived, IComputationNode> +{ +protected: + TBaseFlowBaseComputationNode(const IComputationNode* source, EValueRepresentation kind) + : TFlowBaseComputationNode<TDerived, IComputationNode>(source), RepresentationKind(kind) + {} +private: + EValueRepresentation GetRepresentation() const final { + return RepresentationKind; + } + + const EValueRepresentation RepresentationKind; +}; + +class TStatelessFlowComputationNodeBase { +protected: + ui32 GetIndexImpl() const; + void CollectDependentIndexesImpl(const IComputationNode* self, + const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, + const IComputationNode* dependence) const; +}; + +template <typename TDerived> +class TStatelessFlowComputationNode: public TBaseFlowBaseComputationNode<TDerived>, protected TStatelessFlowComputationNodeBase +{ +protected: + TStatelessFlowComputationNode(const IComputationNode* source, EValueRepresentation kind) + : TBaseFlowBaseComputationNode<TDerived>(source, kind) + {} + + TStatelessFlowComputationNode(TComputationMutables&, const IComputationNode* source, EValueRepresentation kind) + : TBaseFlowBaseComputationNode<TDerived>(source, kind) + {} + + NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override { + return static_cast<const TDerived*>(this)->DoCalculate(compCtx); + } +private: + ui32 GetIndex() const final { + return GetIndexImpl(); + } + + void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence); + } +}; + +class TStatefulFlowComputationNodeBase { +protected: + TStatefulFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind); + void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + + const ui32 StateIndex; + const EValueRepresentation StateKind; +}; + +template <typename TDerived, bool SerializableState = false> +class TStatefulFlowComputationNode: public TBaseFlowBaseComputationNode<TDerived>, protected TStatefulFlowComputationNodeBase +{ +protected: + TStatefulFlowComputationNode(TComputationMutables& mutables, const IComputationNode* source, EValueRepresentation kind, EValueRepresentation stateKind = EValueRepresentation::Any) + : TBaseFlowBaseComputationNode<TDerived>(source, kind), TStatefulFlowComputationNodeBase(mutables.CurValueIndex++, stateKind) + { + if constexpr (SerializableState) { + mutables.SerializableValues.push_back(StateIndex); + } + } + + NUdf::TUnboxedValue& RefState(TComputationContext& compCtx) const { + return compCtx.MutableValues[GetIndex()]; + } + +private: + ui32 GetIndex() const final { + return StateIndex; + } + + NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final { + return static_cast<const TDerived*>(this)->DoCalculate(compCtx.MutableValues[StateIndex], compCtx); + } + + void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence); + } +}; + +const IComputationNode* GetCommonSource(const IComputationNode* first, const IComputationNode* second, const IComputationNode* common); + +class TPairStateFlowComputationNodeBase { +protected: + TPairStateFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind); + void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + + const ui32 StateIndex; + const EValueRepresentation FirstKind, SecondKind; +}; + +template <typename TDerived> +class TPairStateFlowComputationNode: public TBaseFlowBaseComputationNode<TDerived>, protected TPairStateFlowComputationNodeBase +{ +protected: + TPairStateFlowComputationNode(TComputationMutables& mutables, const IComputationNode* source, EValueRepresentation kind, EValueRepresentation firstKind = EValueRepresentation::Any, EValueRepresentation secondKind = EValueRepresentation::Any) + : TBaseFlowBaseComputationNode<TDerived>(source, kind), TPairStateFlowComputationNodeBase(mutables.CurValueIndex++, firstKind, secondKind) + { + ++mutables.CurValueIndex; + } + +private: + NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final { + return static_cast<const TDerived*>(this)->DoCalculate(compCtx.MutableValues[StateIndex], compCtx.MutableValues[StateIndex + 1U], compCtx); + } + + ui32 GetIndex() const final { + return StateIndex; + } + + void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence); + } +}; + +class TWideFlowProxyComputationNode: public TRefCountedComputationNode<IComputationWideFlowProxyNode> +{ +public: + TWideFlowProxyComputationNode() = default; +protected: + TString DebugString() const final; +private: + void InitNode(TComputationContext&) const override {} + + EValueRepresentation GetRepresentation() const final; + + NUdf::TUnboxedValue GetValue(TComputationContext&) const final; + + ui32 GetIndex() const final; + + ui32 GetDependencyWeight() const final; + + ui32 GetDependencesCount() const final; + + const IComputationNode* GetSource() const final; + + IComputationNode* AddDependence(const IComputationNode* node) final; + + bool IsTemporaryValue() const final; + + void RegisterDependencies() const final; + + void PrepareStageOne() final; + + void PrepareStageTwo() final; + + void SetOwner(const IComputationNode* owner) final; + + void CollectDependentIndexes(const IComputationNode*, TIndexesMap&) const final; + + void InvalidateValue(TComputationContext& ctx) const final; + + void SetFetcher(TFetcher&& fetcher) final; + + EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* values) const final; + +protected: + const IComputationNode* Dependence = nullptr; + const IComputationNode* Owner = nullptr; + std::vector<std::pair<ui32, EValueRepresentation>> InvalidationSet; + TFetcher Fetcher; +}; + +class TWideFlowBaseComputationNodeBase { +protected: + EValueRepresentation GetRepresentationImpl() const; + NUdf::TUnboxedValue GetValueImpl(TComputationContext&) const; +}; + +template <typename TDerived> +class TWideFlowBaseComputationNode: public TFlowBaseComputationNode<TDerived, IComputationWideFlowNode>, + protected TWideFlowBaseComputationNodeBase +{ +protected: + TWideFlowBaseComputationNode(const IComputationNode* source) + : TFlowBaseComputationNode<TDerived, IComputationWideFlowNode>(source) + {} +private: + EValueRepresentation GetRepresentation() const final { + return GetRepresentationImpl(); + } + + NUdf::TUnboxedValue GetValue(TComputationContext& ctx) const { + return GetValueImpl(ctx); + } +}; + +class TStatelessWideFlowComputationNodeBase { +protected: + ui32 GetIndexImpl() const; + void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; +}; + +template <typename TDerived> +class TStatelessWideFlowComputationNode: public TWideFlowBaseComputationNode<TDerived>, protected TStatelessWideFlowComputationNodeBase +{ +protected: + TStatelessWideFlowComputationNode(const IComputationNode* source) + :TWideFlowBaseComputationNode<TDerived>(source) + {} + +private: + EFetchResult FetchValues(TComputationContext& compCtx, NUdf::TUnboxedValue*const* values) const final { + return static_cast<const TDerived*>(this)->DoCalculate(compCtx, values); + } + + ui32 GetIndex() const final { + return GetIndexImpl(); + } + + void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence); + } +}; + +class TStatefulWideFlowComputationNodeBase { +protected: + TStatefulWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind); + void CollectDependentIndexesImpl(const IComputationNode* self, + const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + + const ui32 StateIndex; + const EValueRepresentation StateKind; +}; + +template <typename TDerived, bool SerializableState = false> +class TStatefulWideFlowComputationNode: public TWideFlowBaseComputationNode<TDerived>, protected TStatefulWideFlowComputationNodeBase +{ +protected: + TStatefulWideFlowComputationNode(TComputationMutables& mutables, const IComputationNode* source, EValueRepresentation stateKind) + : TWideFlowBaseComputationNode<TDerived>(source), TStatefulWideFlowComputationNodeBase(mutables.CurValueIndex++, stateKind) + { + if constexpr (SerializableState) { + mutables.SerializableValues.push_back(StateIndex); + } + } + + NUdf::TUnboxedValue& RefState(TComputationContext& compCtx) const { + return compCtx.MutableValues[GetIndex()]; + } +private: + EFetchResult FetchValues(TComputationContext& compCtx, NUdf::TUnboxedValue*const* values) const final { + return static_cast<const TDerived*>(this)->DoCalculate(compCtx.MutableValues[StateIndex], compCtx, values); + } + + ui32 GetIndex() const final { + return StateIndex; + } + + void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence); + } +}; + +class TPairStateWideFlowComputationNodeBase { +protected: + TPairStateWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind); + void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + + const ui32 StateIndex; + const EValueRepresentation FirstKind, SecondKind; +}; + +template <typename TDerived> +class TPairStateWideFlowComputationNode: public TWideFlowBaseComputationNode<TDerived>, protected TPairStateWideFlowComputationNodeBase +{ +protected: + TPairStateWideFlowComputationNode(TComputationMutables& mutables, const IComputationNode* source, EValueRepresentation firstKind, EValueRepresentation secondKind) + : TWideFlowBaseComputationNode<TDerived>(source), TPairStateWideFlowComputationNodeBase(mutables.CurValueIndex++, firstKind, secondKind) + { + ++mutables.CurValueIndex; + } + +private: + EFetchResult FetchValues(TComputationContext& compCtx, NUdf::TUnboxedValue*const* values) const final { + return static_cast<const TDerived*>(this)->DoCalculate(compCtx.MutableValues[StateIndex], compCtx.MutableValues[StateIndex + 1U], compCtx, values); + } + + ui32 GetIndex() const final { + return StateIndex; + } + + void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence); + } +}; + +class TDecoratorComputationNodeBase { +protected: + TDecoratorComputationNodeBase(IComputationNode* node, EValueRepresentation kind); + ui32 GetIndexImpl() const; + TString DebugStringImpl(const TString& typeName) const; + + IComputationNode *const Node; + const EValueRepresentation Kind; +}; + +template <typename TDerived> +class TDecoratorComputationNode: public TRefCountedComputationNode<IComputationNode>, protected TDecoratorComputationNodeBase +{ +private: + void InitNode(TComputationContext&) const final {} + + const IComputationNode* GetSource() const final { return Node; } + + IComputationNode* AddDependence(const IComputationNode* node) final { + return Node->AddDependence(node); + } + + EValueRepresentation GetRepresentation() const final { return Kind; } + bool IsTemporaryValue() const final { return true; } + + void PrepareStageOne() final {} + void PrepareStageTwo() final {} + + void RegisterDependencies() const final { Node->AddDependence(this); } + + void CollectDependentIndexes(const IComputationNode*, TIndexesMap&) const final {} + + ui32 GetDependencyWeight() const final { return 0U; } + + ui32 GetDependencesCount() const final { + return Node->GetDependencesCount(); + } + + ui32 GetIndex() const final { + return GetIndexImpl(); + } + + NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final { + return static_cast<const TDerived*>(this)->DoCalculate(compCtx, Node->GetValue(compCtx)); + } + +protected: + TDecoratorComputationNode(IComputationNode* node, EValueRepresentation kind) + : TDecoratorComputationNodeBase(node, kind) + {} + + TDecoratorComputationNode(IComputationNode* node) + : TDecoratorComputationNodeBase(node, node->GetRepresentation()) + {} + + + TString DebugString() const override { + return DebugStringImpl(TypeName<TDerived>()); + } +}; + +class TBinaryComputationNodeBase { +protected: + TBinaryComputationNodeBase(IComputationNode* left, IComputationNode* right, EValueRepresentation kind); + ui32 GetIndexImpl() const; + TString DebugStringImpl(const TString& typeName) const; + + IComputationNode *const Left; + IComputationNode *const Right; + const EValueRepresentation Kind; +}; + +template <typename TDerived> +class TBinaryComputationNode: public TRefCountedComputationNode<IComputationNode>, protected TBinaryComputationNodeBase +{ +private: + NUdf::TUnboxedValue GetValue(TComputationContext& ctx) const final { + return static_cast<const TDerived*>(this)->DoCalculate(ctx); + } + + const IComputationNode* GetSource() const final { + return GetCommonSource(Left, Right, this); + } + + void InitNode(TComputationContext&) const final {} +protected: + TString DebugString() const override { + return DebugStringImpl(TypeName<TDerived>()); + } + + IComputationNode* AddDependence(const IComputationNode* node) final { + const auto l = Left->AddDependence(node); + const auto r = Right->AddDependence(node); + + if (!l) return r; + if (!r) return l; + + return this; + } + + EValueRepresentation GetRepresentation() const final { + return Kind; + } + + bool IsTemporaryValue() const final { return true; } + + void PrepareStageOne() final {} + void PrepareStageTwo() final {} + + void RegisterDependencies() const final { + Left->AddDependence(this); + Right->AddDependence(this); + } + + void CollectDependentIndexes(const IComputationNode*, TIndexesMap&) const final {} + + ui32 GetDependencyWeight() const final { return 0U; } + + ui32 GetDependencesCount() const final { + return Left->GetDependencesCount() + Right->GetDependencesCount(); + } + + ui32 GetIndex() const final { + return GetIndexImpl(); + } + + TBinaryComputationNode(IComputationNode* left, IComputationNode* right, const EValueRepresentation kind) + : TBinaryComputationNodeBase(left, right, kind) + { + } +}; + +[[noreturn]] +void ThrowNotSupportedImplForClass(const TString& className, const char *func); + + +class TComputationValueBaseNotSupportedStub: public NYql::NUdf::IBoxedValue +{ +private: + using TBase = NYql::NUdf::IBoxedValue; +public: + template <typename... Args> + TComputationValueBaseNotSupportedStub(Args&&... args) + : TBase(std::forward<Args>(args)...) + { + } + + ~TComputationValueBaseNotSupportedStub() { + } + +private: + bool HasFastListLength() const override; + ui64 GetListLength() const override; + ui64 GetEstimatedListLength() const override; + bool HasListItems() const override; + const NUdf::TOpaqueListRepresentation* GetListRepresentation() const override; + NUdf::IBoxedValuePtr ReverseListImpl(const NUdf::IValueBuilder& builder) const override; + NUdf::IBoxedValuePtr SkipListImpl(const NUdf::IValueBuilder& builder, ui64 count) const override; + NUdf::IBoxedValuePtr TakeListImpl(const NUdf::IValueBuilder& builder, ui64 count) const override; + NUdf::IBoxedValuePtr ToIndexDictImpl(const NUdf::IValueBuilder& builder) const override; + ui64 GetDictLength() const override; + bool HasDictItems() const override; + NUdf::TStringRef GetResourceTag() const override; + void* GetResource() override; + void Apply(NUdf::IApplyContext& applyCtx) const override; + NUdf::TUnboxedValue GetListIterator() const override ; + NUdf::TUnboxedValue GetDictIterator() const override; + NUdf::TUnboxedValue GetKeysIterator() const override; + NUdf::TUnboxedValue GetPayloadsIterator() const override; + bool Contains(const NUdf::TUnboxedValuePod& key) const override; + NUdf::TUnboxedValue Lookup(const NUdf::TUnboxedValuePod& key) const override; + NUdf::TUnboxedValue GetElement(ui32 index) const override ; + const NUdf::TUnboxedValue* GetElements() const override; + NUdf::TUnboxedValue Run( + const NUdf::IValueBuilder* valueBuilder, + const NUdf::TUnboxedValuePod* args) const override; + bool Skip() override; + bool Next(NUdf::TUnboxedValue&) override; + bool NextPair(NUdf::TUnboxedValue&, NUdf::TUnboxedValue&) override; + ui32 GetVariantIndex() const override; + NUdf::TUnboxedValue GetVariantItem() const override; + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override; + ui32 GetTraverseCount() const override; + NUdf::TUnboxedValue GetTraverseItem(ui32 index) const override; + NUdf::TUnboxedValue Save() const override; + void Load(const NUdf::TStringRef& state) override; + bool Load2(const NUdf::TUnboxedValue& state) override; + void Push(const NUdf::TUnboxedValuePod& value) override; + bool IsSortedDict() const override; + void Unused1() override; + void Unused2() override; + void Unused3() override; + void Unused4() override; + void Unused5() override; + void Unused6() override; + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) override; + +protected: + [[noreturn]] virtual void ThrowNotSupported(const char* func) const = 0; +}; + + +template <typename TDerived> +class TComputationValueBase: public TComputationValueBaseNotSupportedStub +{ +private: + using TBase = TComputationValueBaseNotSupportedStub; +public: + template <typename... Args> + TComputationValueBase(Args&&... args) + : TBase(std::forward<Args>(args)...) + { + } + + ~TComputationValueBase() { + } + + TString DebugString() const { + return TypeName<TDerived>(); + } + +protected: + [[noreturn]] void ThrowNotSupported(const char* func) const final { + ThrowNotSupportedImplForClass(TypeName(*this), func); + } +}; + +template <typename TDerived, EMemorySubPool MemoryPool> +class TComputationValueImpl: public TComputationValueBase<TDerived>, + public TWithMiniKQLAlloc<MemoryPool> { +private: + using TBase = TComputationValueBase<TDerived>; +protected: + inline TMemoryUsageInfo* GetMemInfo() const { +#ifndef NDEBUG + return static_cast<TMemoryUsageInfo*>(M_.MemInfo); +#else + return nullptr; +#endif + } + using TWithMiniKQLAlloc<MemoryPool>::AllocWithSize; + using TWithMiniKQLAlloc<MemoryPool>::FreeWithSize; +public: + template <typename... Args> + TComputationValueImpl(TMemoryUsageInfo* memInfo, Args&&... args) + : TBase(std::forward<Args>(args)...) { +#ifndef NDEBUG + M_.MemInfo = memInfo; + MKQL_MEM_TAKE(memInfo, this, sizeof(TDerived), __MKQL_LOCATION__); +#else + Y_UNUSED(memInfo); +#endif + } + + ~TComputationValueImpl() { +#ifndef NDEBUG + MKQL_MEM_RETURN(GetMemInfo(), this, sizeof(TDerived)); +#endif + } +private: +#ifndef NDEBUG + struct { + void* MemInfo; // used for tracking memory usage during execution + } M_; +#endif +}; + +template <typename TDerived> +class TTemporaryComputationValue: public TComputationValueImpl<TDerived, EMemorySubPool::Temporary> { +private: + using TBase = TComputationValueImpl<TDerived, EMemorySubPool::Temporary>; +public: + using TBase::TBase; +}; + +template <typename TDerived> +class TComputationValue: public TComputationValueImpl<TDerived, EMemorySubPool::Default> { +private: + using TBase = TComputationValueImpl<TDerived, EMemorySubPool::Default>; +public: + using TBase::TBase; +}; + +template<bool IsStream> +struct TThresher; + +template<> +struct TThresher<true> { + template<class Handler> + static void DoForEachItem(const NUdf::TUnboxedValuePod& stream, const Handler& handler) { + for (NUdf::TUnboxedValue item;; handler(std::move(item))) { + const auto status = stream.Fetch(item); + MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Unexpected stream status!"); + if (status == NUdf::EFetchStatus::Finish) + break; + } + } +}; + +template<> +struct TThresher<false> { + template<class Handler> + static void DoForEachItem(const NUdf::TUnboxedValuePod& list, const Handler& handler) { + if (auto ptr = list.GetElements()) { + if (auto size = list.GetListLength()) do { + handler(NUdf::TUnboxedValue(*ptr++)); + } while (--size); + } else if (const auto& iter = list.GetListIterator()) { + for (NUdf::TUnboxedValue item; iter.Next(item); handler(std::move(item))) + continue; + } + } +}; + +IComputationNode* LocateNode(const TNodeLocator& nodeLocator, TCallable& callable, ui32 index, bool pop = false); +IComputationNode* LocateNode(const TNodeLocator& nodeLocator, TNode& node, bool pop = false); +IComputationExternalNode* LocateExternalNode(const TNodeLocator& nodeLocator, TCallable& callable, ui32 index, bool pop = true); + +using TPasstroughtMap = std::vector<std::optional<size_t>>; + +template<class TContainerOne, class TContainerTwo> +TPasstroughtMap GetPasstroughtMap(const TContainerOne& from, const TContainerTwo& to); + +template<class TContainerOne, class TContainerTwo> +TPasstroughtMap GetPasstroughtMapOneToOne(const TContainerOne& from, const TContainerTwo& to); + +std::optional<size_t> IsPasstrought(const IComputationNode* root, const TComputationExternalNodePtrVector& args); + +TPasstroughtMap MergePasstroughtMaps(const TPasstroughtMap& lhs, const TPasstroughtMap& rhs); + +void ApplyChanges(const NUdf::TUnboxedValue& value, NUdf::IApplyContext& applyCtx); +void CleanupCurrentContext(); + +} +} |