summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
diff options
context:
space:
mode:
authorziganshinmr <[email protected]>2025-06-30 12:41:06 +0300
committerziganshinmr <[email protected]>2025-06-30 13:14:54 +0300
commit6c08b6c0b77e6597f669f17aff0d20c9743593d1 (patch)
treec25fdb7c70c9953ff3ba67ead2d3adb2c73b94de /yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
parent5298ea9f35fff54794203d8fbbec1401df7fa1c8 (diff)
Allow multiple dependent nodes for Flow mkql callables
Allow multiple dependent nodes for Flow mkql callables commit_hash:c895960db965367ca567923755fd29547c2d23f1
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_impl.cpp')
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_impl.cpp61
1 files changed, 36 insertions, 25 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
index 8069acdb7ca..1f3f51eb231 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
@@ -174,11 +174,11 @@ Y_NO_INLINE ui32 TStatelessFlowComputationNodeBase::GetIndexImpl() const {
Y_NO_INLINE void TStatelessFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self,
const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies,
- const IComputationNode* dependence) const {
+ const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
- if (dependence) {
+ for (auto& dependence : dependences) {
dependence->CollectDependentIndexes(owner, dependencies);
}
}
@@ -189,13 +189,15 @@ Y_NO_INLINE TStatefulFlowComputationNodeBase::TStatefulFlowComputationNodeBase(u
{}
Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
const auto ins = dependencies.emplace(StateIndex_, StateKind_);
- if (ins.second && dependence) {
- dependence->CollectDependentIndexes(owner, dependencies);
+ if (ins.second) {
+ for (auto& dependence : dependences) {
+ dependence->CollectDependentIndexes(owner, dependencies);
+ }
}
}
@@ -206,14 +208,16 @@ Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase
{}
Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
const auto ins1 = dependencies.emplace(StateIndex_, FirstKind_);
const auto ins2 = dependencies.emplace(StateIndex_ + 1U, SecondKind_);
- if (ins1.second && ins2.second && dependence) {
- dependence->CollectDependentIndexes(owner, dependencies);
+ if (ins1.second && ins2.second) {
+ for (auto& dependence : dependences) {
+ dependence->CollectDependentIndexes(owner, dependencies);
+ }
}
}
@@ -222,11 +226,11 @@ Y_NO_INLINE ui32 TStatelessWideFlowComputationNodeBase::GetIndexImpl() const {
}
Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
- if (dependence) {
+ for (auto& dependence : dependences) {
dependence->CollectDependentIndexes(owner, dependencies);
}
}
@@ -245,13 +249,15 @@ Y_NO_INLINE TStatefulWideFlowComputationNodeBase::TStatefulWideFlowComputationNo
{}
Y_NO_INLINE void TStatefulWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self,
- const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
const auto ins = dependencies.emplace(StateIndex_, StateKind_);
- if (ins.second && dependence) {
- dependence->CollectDependentIndexes(owner, dependencies);
+ if (ins.second) {
+ for (auto& dependence : dependences) {
+ dependence->CollectDependentIndexes(owner, dependencies);
+ }
}
}
@@ -264,14 +270,16 @@ Y_NO_INLINE TPairStateWideFlowComputationNodeBase::TPairStateWideFlowComputation
Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesImpl(
const IComputationNode* self, const IComputationNode* owner,
- IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
+ IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const {
if (self == owner)
return;
const auto ins1 = dependencies.emplace(StateIndex_, FirstKind_);
const auto ins2 = dependencies.emplace(StateIndex_ + 1U, SecondKind_);
- if (ins1.second && ins2.second && dependence) {
- dependence->CollectDependentIndexes(owner, dependencies);
+ if (ins1.second && ins2.second) {
+ for (auto& dependence : dependences) {
+ dependence->CollectDependentIndexes(owner, dependencies);
+ }
}
}
@@ -609,12 +617,11 @@ ui32 TWideFlowProxyComputationNode::GetDependencyWeight() const {
}
ui32 TWideFlowProxyComputationNode::GetDependencesCount() const {
- return Dependence_ ? 1U : 0U;
+ return Dependences_.size();
}
IComputationNode* TWideFlowProxyComputationNode::AddDependence(const IComputationNode* node) {
- Y_DEBUG_ABORT_UNLESS(!Dependence_);
- Dependence_ = node;
+ Dependences_.push_back(node);
return this;
}
@@ -624,14 +631,18 @@ bool TWideFlowProxyComputationNode::IsTemporaryValue() const { return true; }
void TWideFlowProxyComputationNode::RegisterDependencies() const {}
-void TWideFlowProxyComputationNode::PrepareStageOne() {}
+void TWideFlowProxyComputationNode::PrepareStageOne() {
+ std::sort(Dependences_.begin(), Dependences_.end());
+ Dependences_.erase(std::unique(Dependences_.begin(), Dependences_.end()), Dependences_.cend());
+ if (const auto it = std::find(Dependences_.cbegin(), Dependences_.cend(), Owner_); Dependences_.cend() != it)
+ Dependences_.erase(it);
+}
void TWideFlowProxyComputationNode::PrepareStageTwo() {
- if (Dependence_) {
- TIndexesMap dependencies;
- Dependence_->CollectDependentIndexes(Owner_, dependencies);
- InvalidationSet_.assign(dependencies.cbegin(), dependencies.cend());
- }
+ TIndexesMap dependencies;
+ std::for_each(Dependences_.cbegin(), Dependences_.cend(),
+ std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, Owner_, std::ref(dependencies)));
+ InvalidationSet_.assign(dependencies.cbegin(), dependencies.cend());
}
void TWideFlowProxyComputationNode::SetOwner(const IComputationNode* owner) {