diff options
| author | ziganshinmr <[email protected]> | 2025-06-30 12:41:06 +0300 |
|---|---|---|
| committer | ziganshinmr <[email protected]> | 2025-06-30 13:14:54 +0300 |
| commit | 6c08b6c0b77e6597f669f17aff0d20c9743593d1 (patch) | |
| tree | c25fdb7c70c9953ff3ba67ead2d3adb2c73b94de /yql/essentials/minikql/computation | |
| parent | 5298ea9f35fff54794203d8fbbec1401df7fa1c8 (diff) | |
Allow multiple dependent nodes for Flow mkql callables
Allow multiple dependent nodes for Flow mkql callables
commit_hash:c895960db965367ca567923755fd29547c2d23f1
Diffstat (limited to 'yql/essentials/minikql/computation')
| -rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_impl.cpp | 61 | ||||
| -rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_impl.h | 34 |
2 files changed, 52 insertions, 43 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp index 8069acdb7ca..1f3f51eb231 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp @@ -174,11 +174,11 @@ Y_NO_INLINE ui32 TStatelessFlowComputationNodeBase::GetIndexImpl() const { Y_NO_INLINE void TStatelessFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, - const IComputationNode* dependence) const { + const TConstComputationNodePtrVector& dependences) const { if (self == owner) return; - if (dependence) { + for (auto& dependence : dependences) { dependence->CollectDependentIndexes(owner, dependencies); } } @@ -189,13 +189,15 @@ Y_NO_INLINE TStatefulFlowComputationNodeBase::TStatefulFlowComputationNodeBase(u {} Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { if (self == owner) return; const auto ins = dependencies.emplace(StateIndex_, StateKind_); - if (ins.second && dependence) { - dependence->CollectDependentIndexes(owner, dependencies); + if (ins.second) { + for (auto& dependence : dependences) { + dependence->CollectDependentIndexes(owner, dependencies); + } } } @@ -206,14 +208,16 @@ Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase {} Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { if (self == owner) return; const auto ins1 = dependencies.emplace(StateIndex_, FirstKind_); const auto ins2 = dependencies.emplace(StateIndex_ + 1U, SecondKind_); - if (ins1.second && ins2.second && dependence) { - dependence->CollectDependentIndexes(owner, dependencies); + if (ins1.second && ins2.second) { + for (auto& dependence : dependences) { + dependence->CollectDependentIndexes(owner, dependencies); + } } } @@ -222,11 +226,11 @@ Y_NO_INLINE ui32 TStatelessWideFlowComputationNodeBase::GetIndexImpl() const { } Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { if (self == owner) return; - if (dependence) { + for (auto& dependence : dependences) { dependence->CollectDependentIndexes(owner, dependencies); } } @@ -245,13 +249,15 @@ Y_NO_INLINE TStatefulWideFlowComputationNodeBase::TStatefulWideFlowComputationNo {} Y_NO_INLINE void TStatefulWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, - const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { + const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { if (self == owner) return; const auto ins = dependencies.emplace(StateIndex_, StateKind_); - if (ins.second && dependence) { - dependence->CollectDependentIndexes(owner, dependencies); + if (ins.second) { + for (auto& dependence : dependences) { + dependence->CollectDependentIndexes(owner, dependencies); + } } } @@ -264,14 +270,16 @@ Y_NO_INLINE TPairStateWideFlowComputationNodeBase::TPairStateWideFlowComputation Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesImpl( const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { if (self == owner) return; const auto ins1 = dependencies.emplace(StateIndex_, FirstKind_); const auto ins2 = dependencies.emplace(StateIndex_ + 1U, SecondKind_); - if (ins1.second && ins2.second && dependence) { - dependence->CollectDependentIndexes(owner, dependencies); + if (ins1.second && ins2.second) { + for (auto& dependence : dependences) { + dependence->CollectDependentIndexes(owner, dependencies); + } } } @@ -609,12 +617,11 @@ ui32 TWideFlowProxyComputationNode::GetDependencyWeight() const { } ui32 TWideFlowProxyComputationNode::GetDependencesCount() const { - return Dependence_ ? 1U : 0U; + return Dependences_.size(); } IComputationNode* TWideFlowProxyComputationNode::AddDependence(const IComputationNode* node) { - Y_DEBUG_ABORT_UNLESS(!Dependence_); - Dependence_ = node; + Dependences_.push_back(node); return this; } @@ -624,14 +631,18 @@ bool TWideFlowProxyComputationNode::IsTemporaryValue() const { return true; } void TWideFlowProxyComputationNode::RegisterDependencies() const {} -void TWideFlowProxyComputationNode::PrepareStageOne() {} +void TWideFlowProxyComputationNode::PrepareStageOne() { + std::sort(Dependences_.begin(), Dependences_.end()); + Dependences_.erase(std::unique(Dependences_.begin(), Dependences_.end()), Dependences_.cend()); + if (const auto it = std::find(Dependences_.cbegin(), Dependences_.cend(), Owner_); Dependences_.cend() != it) + Dependences_.erase(it); +} void TWideFlowProxyComputationNode::PrepareStageTwo() { - if (Dependence_) { - TIndexesMap dependencies; - Dependence_->CollectDependentIndexes(Owner_, dependencies); - InvalidationSet_.assign(dependencies.cbegin(), dependencies.cend()); - } + TIndexesMap dependencies; + std::for_each(Dependences_.cbegin(), Dependences_.cend(), + std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, Owner_, std::ref(dependencies))); + InvalidationSet_.assign(dependencies.cbegin(), dependencies.cend()); } void TWideFlowProxyComputationNode::SetOwner(const IComputationNode* owner) { diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.h b/yql/essentials/minikql/computation/mkql_computation_node_impl.h index 7093829ed27..2c22ae421b1 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_impl.h +++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.h @@ -395,13 +395,11 @@ private: ui32 GetDependencyWeight() const final { return 42U; } ui32 GetDependencesCount() const final { - return Dependence_ ? 1U : 0U; + return Dependences_.size(); } IComputationNode* AddDependence(const IComputationNode* node) final { - if (!Dependence_) { - Dependence_ = node; - } + Dependences_.push_back(node); return this; } @@ -420,7 +418,7 @@ private: } protected: const IComputationNode *const Source_; - const IComputationNode *Dependence_ = nullptr; + TConstComputationNodePtrVector Dependences_; }; template <typename TDerived> @@ -443,7 +441,7 @@ protected: ui32 GetIndexImpl() const; void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, - const IComputationNode* dependence) const; + const TConstComputationNodePtrVector& dependences) const; }; template <typename TDerived> @@ -467,7 +465,7 @@ private: } void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { - CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_); + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_); } }; @@ -475,7 +473,7 @@ class TStatefulFlowComputationNodeBase { protected: TStatefulFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind); void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const; const ui32 StateIndex_; const EValueRepresentation StateKind_; @@ -507,7 +505,7 @@ private: } void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { - CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_); + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_); } }; @@ -517,7 +515,7 @@ class TPairStateFlowComputationNodeBase { protected: TPairStateFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind); void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const; const ui32 StateIndex_; const EValueRepresentation FirstKind_, SecondKind_; @@ -543,7 +541,7 @@ private: } void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { - CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_); + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_); } }; @@ -589,7 +587,7 @@ private: EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* values) const final; protected: - const IComputationNode* Dependence_ = nullptr; + TConstComputationNodePtrVector Dependences_; const IComputationNode* Owner_ = nullptr; std::vector<std::pair<ui32, EValueRepresentation>> InvalidationSet_; TFetcher Fetcher_; @@ -623,7 +621,7 @@ class TStatelessWideFlowComputationNodeBase { protected: ui32 GetIndexImpl() const; void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const; }; template <typename TDerived> @@ -644,7 +642,7 @@ private: } void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { - CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_); + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_); } }; @@ -652,7 +650,7 @@ class TStatefulWideFlowComputationNodeBase { protected: TStatefulWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind); void CollectDependentIndexesImpl(const IComputationNode* self, - const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const; const ui32 StateIndex_; const EValueRepresentation StateKind_; @@ -683,7 +681,7 @@ private: } void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { - CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_); + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_); } }; @@ -691,7 +689,7 @@ class TPairStateWideFlowComputationNodeBase { protected: TPairStateWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind); void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const; + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const; const ui32 StateIndex_; const EValueRepresentation FirstKind_, SecondKind_; @@ -717,7 +715,7 @@ private: } void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { - CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_); + CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_); } }; |
