summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation
diff options
context:
space:
mode:
authorziganshinmr <[email protected]>2025-06-30 12:41:06 +0300
committerziganshinmr <[email protected]>2025-06-30 13:14:54 +0300
commit6c08b6c0b77e6597f669f17aff0d20c9743593d1 (patch)
treec25fdb7c70c9953ff3ba67ead2d3adb2c73b94de /yql/essentials/minikql/computation
parent5298ea9f35fff54794203d8fbbec1401df7fa1c8 (diff)
Allow multiple dependent nodes for Flow mkql callables
Allow multiple dependent nodes for Flow mkql callables commit_hash:c895960db965367ca567923755fd29547c2d23f1
Diffstat (limited to 'yql/essentials/minikql/computation')
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_impl.cpp61
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_impl.h34
2 files changed, 52 insertions, 43 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
index 8069acdb7ca..1f3f51eb231 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
@@ -174,11 +174,11 @@ Y_NO_INLINE ui32 TStatelessFlowComputationNodeBase::GetIndexImpl() const {
Y_NO_INLINE void TStatelessFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self,
const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies,
- const IComputationNode* dependence) const {
+ const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
- if (dependence) {
+ for (auto& dependence : dependences) {
dependence->CollectDependentIndexes(owner, dependencies);
}
}
@@ -189,13 +189,15 @@ Y_NO_INLINE TStatefulFlowComputationNodeBase::TStatefulFlowComputationNodeBase(u
{}
Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
const auto ins = dependencies.emplace(StateIndex_, StateKind_);
- if (ins.second && dependence) {
- dependence->CollectDependentIndexes(owner, dependencies);
+ if (ins.second) {
+ for (auto& dependence : dependences) {
+ dependence->CollectDependentIndexes(owner, dependencies);
+ }
}
}
@@ -206,14 +208,16 @@ Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase
{}
Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
const auto ins1 = dependencies.emplace(StateIndex_, FirstKind_);
const auto ins2 = dependencies.emplace(StateIndex_ + 1U, SecondKind_);
- if (ins1.second && ins2.second && dependence) {
- dependence->CollectDependentIndexes(owner, dependencies);
+ if (ins1.second && ins2.second) {
+ for (auto& dependence : dependences) {
+ dependence->CollectDependentIndexes(owner, dependencies);
+ }
}
}
@@ -222,11 +226,11 @@ Y_NO_INLINE ui32 TStatelessWideFlowComputationNodeBase::GetIndexImpl() const {
}
Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
- if (dependence) {
+ for (auto& dependence : dependences) {
dependence->CollectDependentIndexes(owner, dependencies);
}
}
@@ -245,13 +249,15 @@ Y_NO_INLINE TStatefulWideFlowComputationNodeBase::TStatefulWideFlowComputationNo
{}
Y_NO_INLINE void TStatefulWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self,
- const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
const auto ins = dependencies.emplace(StateIndex_, StateKind_);
- if (ins.second && dependence) {
- dependence->CollectDependentIndexes(owner, dependencies);
+ if (ins.second) {
+ for (auto& dependence : dependences) {
+ dependence->CollectDependentIndexes(owner, dependencies);
+ }
}
}
@@ -264,14 +270,16 @@ Y_NO_INLINE TPairStateWideFlowComputationNodeBase::TPairStateWideFlowComputation
Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesImpl(
const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
const auto ins1 = dependencies.emplace(StateIndex_, FirstKind_);
const auto ins2 = dependencies.emplace(StateIndex_ + 1U, SecondKind_);
- if (ins1.second && ins2.second && dependence) {
- dependence->CollectDependentIndexes(owner, dependencies);
+ if (ins1.second && ins2.second) {
+ for (auto& dependence : dependences) {
+ dependence->CollectDependentIndexes(owner, dependencies);
+ }
}
}
@@ -609,12 +617,11 @@ ui32 TWideFlowProxyComputationNode::GetDependencyWeight() const {
}
ui32 TWideFlowProxyComputationNode::GetDependencesCount() const {
- return Dependence_ ? 1U : 0U;
+ return Dependences_.size();
}
IComputationNode* TWideFlowProxyComputationNode::AddDependence(const IComputationNode* node) {
- Y_DEBUG_ABORT_UNLESS(!Dependence_);
- Dependence_ = node;
+ Dependences_.push_back(node);
return this;
}
@@ -624,14 +631,18 @@ bool TWideFlowProxyComputationNode::IsTemporaryValue() const { return true; }
void TWideFlowProxyComputationNode::RegisterDependencies() const {}
-void TWideFlowProxyComputationNode::PrepareStageOne() {}
+void TWideFlowProxyComputationNode::PrepareStageOne() {
+ std::sort(Dependences_.begin(), Dependences_.end());
+ Dependences_.erase(std::unique(Dependences_.begin(), Dependences_.end()), Dependences_.cend());
+ if (const auto it = std::find(Dependences_.cbegin(), Dependences_.cend(), Owner_); Dependences_.cend() != it)
+ Dependences_.erase(it);
+}
void TWideFlowProxyComputationNode::PrepareStageTwo() {
- if (Dependence_) {
- TIndexesMap dependencies;
- Dependence_->CollectDependentIndexes(Owner_, dependencies);
- InvalidationSet_.assign(dependencies.cbegin(), dependencies.cend());
- }
+ TIndexesMap dependencies;
+ std::for_each(Dependences_.cbegin(), Dependences_.cend(),
+ std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, Owner_, std::ref(dependencies)));
+ InvalidationSet_.assign(dependencies.cbegin(), dependencies.cend());
}
void TWideFlowProxyComputationNode::SetOwner(const IComputationNode* owner) {
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.h b/yql/essentials/minikql/computation/mkql_computation_node_impl.h
index 7093829ed27..2c22ae421b1 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_impl.h
+++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.h
@@ -395,13 +395,11 @@ private:
ui32 GetDependencyWeight() const final { return 42U; }
ui32 GetDependencesCount() const final {
- return Dependence_ ? 1U : 0U;
+ return Dependences_.size();
}
IComputationNode* AddDependence(const IComputationNode* node) final {
- if (!Dependence_) {
- Dependence_ = node;
- }
+ Dependences_.push_back(node);
return this;
}
@@ -420,7 +418,7 @@ private:
}
protected:
const IComputationNode *const Source_;
- const IComputationNode *Dependence_ = nullptr;
+ TConstComputationNodePtrVector Dependences_;
};
template <typename TDerived>
@@ -443,7 +441,7 @@ protected:
ui32 GetIndexImpl() const;
void CollectDependentIndexesImpl(const IComputationNode* self,
const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies,
- const IComputationNode* dependence) const;
+ const TConstComputationNodePtrVector& dependences) const;
};
template <typename TDerived>
@@ -467,7 +465,7 @@ private:
}
void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
- CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_);
+ CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_);
}
};
@@ -475,7 +473,7 @@ class TStatefulFlowComputationNodeBase {
protected:
TStatefulFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind);
void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const;
const ui32 StateIndex_;
const EValueRepresentation StateKind_;
@@ -507,7 +505,7 @@ private:
}
void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
- CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_);
+ CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_);
}
};
@@ -517,7 +515,7 @@ class TPairStateFlowComputationNodeBase {
protected:
TPairStateFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind);
void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const;
const ui32 StateIndex_;
const EValueRepresentation FirstKind_, SecondKind_;
@@ -543,7 +541,7 @@ private:
}
void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
- CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_);
+ CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_);
}
};
@@ -589,7 +587,7 @@ private:
EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* values) const final;
protected:
- const IComputationNode* Dependence_ = nullptr;
+ TConstComputationNodePtrVector Dependences_;
const IComputationNode* Owner_ = nullptr;
std::vector<std::pair<ui32, EValueRepresentation>> InvalidationSet_;
TFetcher Fetcher_;
@@ -623,7 +621,7 @@ class TStatelessWideFlowComputationNodeBase {
protected:
ui32 GetIndexImpl() const;
void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const;
};
template <typename TDerived>
@@ -644,7 +642,7 @@ private:
}
void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
- CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_);
+ CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_);
}
};
@@ -652,7 +650,7 @@ class TStatefulWideFlowComputationNodeBase {
protected:
TStatefulWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind);
void CollectDependentIndexesImpl(const IComputationNode* self,
- const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
+ const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const;
const ui32 StateIndex_;
const EValueRepresentation StateKind_;
@@ -683,7 +681,7 @@ private:
}
void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
- CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_);
+ CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_);
}
};
@@ -691,7 +689,7 @@ class TPairStateWideFlowComputationNodeBase {
protected:
TPairStateWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind);
void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const;
const ui32 StateIndex_;
const EValueRepresentation FirstKind_, SecondKind_;
@@ -717,7 +715,7 @@ private:
}
void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
- CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence_);
+ CollectDependentIndexesImpl(this, owner, dependencies, this->Dependences_);
}
};