diff options
| author | vvvv <[email protected]> | 2025-10-09 12:25:18 +0300 |
|---|---|---|
| committer | vvvv <[email protected]> | 2025-10-09 12:57:17 +0300 |
| commit | cb77d014972b2cdb27d2e6d979fc3a2772b27ad4 (patch) | |
| tree | 7f3bcd8ce71c6bd0f3ccc11e31b9f665475b819e /yql/essentials/minikql/computation/mkql_computation_node_impl.cpp | |
| parent | d58a8990d353b051c27e1069141117fdfde64358 (diff) | |
YQL-20086 minikql
commit_hash:e96f7390db5fcbe7e9f64f898141a263ad522daa
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_impl.cpp')
| -rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_impl.cpp | 229 |
1 files changed, 152 insertions, 77 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp index a5c3bf13623..7ce89c1a688 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp @@ -5,7 +5,7 @@ namespace NKikimr { namespace NMiniKQL { -void ThrowNotSupportedImplForClass(const TString& className, const char *func) { +void ThrowNotSupportedImplForClass(const TString& className, const char* func) { THROW yexception() << "Unsupported access to '" << func << "' method of: " << className; } @@ -33,7 +33,13 @@ template class TRefCountedComputationNode<IComputationWideFlowProxyNode>; TUnboxedImmutableComputationNode::TUnboxedImmutableComputationNode(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& value) : MemInfo_(memInfo) , UnboxedValue_(std::move(value)) - , RepresentationKind_(UnboxedValue_.HasValue() ? (UnboxedValue_.IsBoxed() ? EValueRepresentation::Boxed : (UnboxedValue_.IsString() ? EValueRepresentation::String : EValueRepresentation::Embedded)) : EValueRepresentation::Embedded) + , RepresentationKind_(UnboxedValue_.HasValue() + ? (UnboxedValue_.IsBoxed() + ? EValueRepresentation::Boxed + : (UnboxedValue_.IsString() + ? EValueRepresentation::String + : EValueRepresentation::Embedded)) + : EValueRepresentation::Embedded) { MKQL_MEM_TAKE(MemInfo_, this, sizeof(*this), __MKQL_LOCATION__); TlsAllocState->LockObject(UnboxedValue_); @@ -53,11 +59,16 @@ NUdf::TUnboxedValue TUnboxedImmutableComputationNode::GetValue(TComputationConte return UnboxedValue_; } -const IComputationNode* TUnboxedImmutableComputationNode::GetSource() const { return nullptr; } +const IComputationNode* TUnboxedImmutableComputationNode::GetSource() const { + return nullptr; +} -IComputationNode* TUnboxedImmutableComputationNode::AddDependence(const IComputationNode*) { return nullptr; } +IComputationNode* TUnboxedImmutableComputationNode::AddDependence(const IComputationNode*) { + return nullptr; +} -void TUnboxedImmutableComputationNode::RegisterDependencies() const {} +void TUnboxedImmutableComputationNode::RegisterDependencies() const { +} ui32 TUnboxedImmutableComputationNode::GetIndex() const { THROW yexception() << "Failed to get index."; @@ -75,10 +86,14 @@ ui32 TUnboxedImmutableComputationNode::GetDependencesCount() const { THROW yexception() << "Can't get dependences count from const node."; } -bool TUnboxedImmutableComputationNode::IsTemporaryValue() const { return false; } +bool TUnboxedImmutableComputationNode::IsTemporaryValue() const { + return false; +} -void TUnboxedImmutableComputationNode::PrepareStageOne() {} -void TUnboxedImmutableComputationNode::PrepareStageTwo() {} +void TUnboxedImmutableComputationNode::PrepareStageOne() { +} +void TUnboxedImmutableComputationNode::PrepareStageTwo() { +} TString TUnboxedImmutableComputationNode::DebugString() const { return UnboxedValue_ ? (UnboxedValue_.IsBoxed() ? "Boxed" : "Literal") : "Empty"; @@ -91,22 +106,27 @@ EValueRepresentation TUnboxedImmutableComputationNode::GetRepresentation() const Y_NO_INLINE TStatefulComputationNodeBase::TStatefulComputationNodeBase(ui32 valueIndex, EValueRepresentation kind) : ValueIndex_(valueIndex) , RepresentationKind_(kind) -{} +{ +} Y_NO_INLINE TStatefulComputationNodeBase::~TStatefulComputationNodeBase() -{} +{ +} Y_NO_INLINE void TStatefulComputationNodeBase::AddDependenceImpl(const IComputationNode* node) { Dependencies_.emplace_back(node); } -Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl( + const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, bool stateless) const { - if (self == owner) + if (self == owner) { return; + } if (const auto ins = dependencies.emplace(ValueIndex_, RepresentationKind_); ins.second) { - std::for_each(Dependencies_.cbegin(), Dependencies_.cend(), std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, owner, std::ref(dependencies))); + std::for_each(Dependencies_.cbegin(), Dependencies_.cend(), + std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, owner, std::ref(dependencies))); if (stateless) { dependencies.erase(ins.first); @@ -114,17 +134,20 @@ Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const } } - Y_NO_INLINE TStatefulSourceComputationNodeBase::TStatefulSourceComputationNodeBase() -{} +{ +} Y_NO_INLINE TStatefulSourceComputationNodeBase::~TStatefulSourceComputationNodeBase() -{} +{ +} Y_NO_INLINE void TStatefulSourceComputationNodeBase::PrepareStageOneImpl(const TConstComputationNodePtrVector& dependencies) { if (!Stateless_) { - Stateless_ = std::accumulate(dependencies.cbegin(), dependencies.cend(), 0, - std::bind(std::plus<i32>(), std::placeholders::_1, std::bind(&IComputationNode::GetDependencyWeight, std::placeholders::_2))) <= 1; + Stateless_ = std::accumulate( + dependencies.cbegin(), dependencies.cend(), 0, + std::bind(std::plus<i32>(), std::placeholders::_1, + std::bind(&IComputationNode::GetDependencyWeight, std::placeholders::_2))) <= 1; } } @@ -133,7 +156,8 @@ Y_NO_INLINE void TStatefulSourceComputationNodeBase::AddSource(IComputationNode* } template <class IComputationNodeInterface, bool SerializableState> -TStatefulComputationNode<IComputationNodeInterface, SerializableState>::TStatefulComputationNode(TComputationMutables& mutables, EValueRepresentation kind) +TStatefulComputationNode<IComputationNodeInterface, SerializableState>::TStatefulComputationNode( + TComputationMutables& mutables, EValueRepresentation kind) : TStatefulComputationNodeBase(mutables.CurValueIndex++, kind) { if constexpr (SerializableState) { @@ -142,7 +166,8 @@ TStatefulComputationNode<IComputationNodeInterface, SerializableState>::TStatefu } template <class IComputationNodeInterface, bool SerializableState> -IComputationNode* TStatefulComputationNode<IComputationNodeInterface, SerializableState>::AddDependence(const IComputationNode* node) { +IComputationNode* TStatefulComputationNode<IComputationNodeInterface, SerializableState>::AddDependence( + const IComputationNode* node) { AddDependenceImpl(node); return this; } @@ -153,13 +178,18 @@ EValueRepresentation TStatefulComputationNode<IComputationNodeInterface, Seriali } template <class IComputationNodeInterface, bool SerializableState> -void TStatefulComputationNode<IComputationNodeInterface, SerializableState>::InitNode(TComputationContext&) const {} +void TStatefulComputationNode<IComputationNodeInterface, SerializableState>::InitNode(TComputationContext&) const { +} template <class IComputationNodeInterface, bool SerializableState> -ui32 TStatefulComputationNode<IComputationNodeInterface, SerializableState>::GetIndex() const { return ValueIndex_; } +ui32 TStatefulComputationNode<IComputationNodeInterface, SerializableState>::GetIndex() const { + return ValueIndex_; +} template <class IComputationNodeInterface, bool SerializableState> -ui32 TStatefulComputationNode<IComputationNodeInterface, SerializableState>::GetDependencesCount() const { return Dependencies_.size(); } +ui32 TStatefulComputationNode<IComputationNodeInterface, SerializableState>::GetDependencesCount() const { + return Dependencies_.size(); +} template class TStatefulComputationNode<IComputationNode, false>; template class TStatefulComputationNode<IComputationWideFlowNode, false>; @@ -172,11 +202,14 @@ Y_NO_INLINE ui32 TStatelessFlowComputationNodeBase::GetIndexImpl() const { THROW yexception() << "Failed to get stateless node index."; } -Y_NO_INLINE void TStatelessFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, - const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, +Y_NO_INLINE void TStatelessFlowComputationNodeBase::CollectDependentIndexesImpl( + const IComputationNode* self, + const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { - if (self == owner) + if (self == owner) { return; + } for (auto& dependence : dependences) { dependence->CollectDependentIndexes(owner, dependencies); @@ -186,12 +219,17 @@ Y_NO_INLINE void TStatelessFlowComputationNodeBase::CollectDependentIndexesImpl( Y_NO_INLINE TStatefulFlowComputationNodeBase::TStatefulFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind) : StateIndex_(stateIndex) , StateKind_(stateKind) -{} +{ +} -Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { - if (self == owner) +Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl( + const IComputationNode* self, + const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, + const TConstComputationNodePtrVector& dependences) const { + if (self == owner) { return; + } const auto ins = dependencies.emplace(StateIndex_, StateKind_); if (ins.second) { @@ -201,16 +239,22 @@ Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(c } } -Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind) +Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase( + ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind) : StateIndex_(stateIndex) , FirstKind_(firstKind) , SecondKind_(secondKind) -{} +{ +} -Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { - if (self == owner) +Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl( + const IComputationNode* self, + const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, + const TConstComputationNodePtrVector& dependences) const { + if (self == owner) { return; + } const auto ins1 = dependencies.emplace(StateIndex_, FirstKind_); const auto ins2 = dependencies.emplace(StateIndex_ + 1U, SecondKind_); @@ -225,10 +269,14 @@ Y_NO_INLINE ui32 TStatelessWideFlowComputationNodeBase::GetIndexImpl() const { THROW yexception() << "Failed to get stateless node index."; } -Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, - IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { - if (self == owner) +Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl( + const IComputationNode* self, + const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, + const TConstComputationNodePtrVector& dependences) const { + if (self == owner) { return; + } for (auto& dependence : dependences) { dependence->CollectDependentIndexes(owner, dependencies); @@ -243,15 +291,21 @@ Y_NO_INLINE NUdf::TUnboxedValue TWideFlowBaseComputationNodeBase::GetValueImpl(T THROW yexception() << "Failed to get value from wide flow node."; } -Y_NO_INLINE TStatefulWideFlowComputationNodeBase::TStatefulWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind) +Y_NO_INLINE TStatefulWideFlowComputationNodeBase::TStatefulWideFlowComputationNodeBase( + ui32 stateIndex, EValueRepresentation stateKind) : StateIndex_(stateIndex) , StateKind_(stateKind) -{} +{ +} -Y_NO_INLINE void TStatefulWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, - const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { - if (self == owner) +Y_NO_INLINE void TStatefulWideFlowComputationNodeBase::CollectDependentIndexesImpl( + const IComputationNode* self, + const IComputationNode* owner, + IComputationNode::TIndexesMap& dependencies, + const TConstComputationNodePtrVector& dependences) const { + if (self == owner) { return; + } const auto ins = dependencies.emplace(StateIndex_, StateKind_); if (ins.second) { @@ -266,13 +320,15 @@ Y_NO_INLINE TPairStateWideFlowComputationNodeBase::TPairStateWideFlowComputation : StateIndex_(stateIndex) , FirstKind_(firstKind) , SecondKind_(secondKind) -{} +{ +} Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesImpl( const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const TConstComputationNodePtrVector& dependences) const { - if (self == owner) + if (self == owner) { return; + } const auto ins1 = dependencies.emplace(StateIndex_, FirstKind_); const auto ins2 = dependencies.emplace(StateIndex_ + 1U, SecondKind_); @@ -286,7 +342,8 @@ Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesI Y_NO_INLINE TDecoratorComputationNodeBase::TDecoratorComputationNodeBase(IComputationNode* node, EValueRepresentation kind) : Node_(node) , Kind_(kind) -{} +{ +} Y_NO_INLINE ui32 TDecoratorComputationNodeBase::GetIndexImpl() const { THROW yexception() << "Can't get index from decorator node."; @@ -296,11 +353,13 @@ Y_NO_INLINE TString TDecoratorComputationNodeBase::DebugStringImpl(const TString return typeName + "(" + Node_->DebugString() + ")"; } -Y_NO_INLINE TBinaryComputationNodeBase::TBinaryComputationNodeBase(IComputationNode* left, IComputationNode* right, EValueRepresentation kind) +Y_NO_INLINE TBinaryComputationNodeBase::TBinaryComputationNodeBase( + IComputationNode* left, IComputationNode* right, EValueRepresentation kind) : Left_(left) , Right_(right) , Kind_(kind) -{} +{ +} Y_NO_INLINE ui32 TBinaryComputationNodeBase::GetIndexImpl() const { THROW yexception() << "Can't get index from decorator node."; @@ -338,7 +397,8 @@ TString TExternalComputationNode::DebugString() const { return "External"; } -void TExternalComputationNode::RegisterDependencies() const {} +void TExternalComputationNode::RegisterDependencies() const { +} void TExternalComputationNode::SetOwner(const IComputationNode* owner) { Y_DEBUG_ABORT_UNLESS(!Owner_); @@ -348,20 +408,25 @@ void TExternalComputationNode::SetOwner(const IComputationNode* owner) { void TExternalComputationNode::PrepareStageOne() { std::sort(Dependencies_.begin(), Dependencies_.end()); Dependencies_.erase(std::unique(Dependencies_.begin(), Dependencies_.end()), Dependencies_.cend()); - if (const auto it = std::find(Dependencies_.cbegin(), Dependencies_.cend(), Owner_); Dependencies_.cend() != it) + if (const auto it = std::find(Dependencies_.cbegin(), Dependencies_.cend(), Owner_); Dependencies_.cend() != it) { Dependencies_.erase(it); + } } void TExternalComputationNode::PrepareStageTwo() { TIndexesMap dependencies; std::for_each(Dependencies_.cbegin(), Dependencies_.cend(), - std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, Owner_, std::ref(dependencies))); + std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, Owner_, std::ref(dependencies))); InvalidationSet_.assign(dependencies.cbegin(), dependencies.cend()); } -const IComputationNode* TExternalComputationNode::GetSource() const { return nullptr; } +const IComputationNode* TExternalComputationNode::GetSource() const { + return nullptr; +} -ui32 TExternalComputationNode::GetDependencyWeight() const { return 0U; } +ui32 TExternalComputationNode::GetDependencyWeight() const { + return 0U; +} bool TExternalComputationNode::IsTemporaryValue() const { return bool(Getter_); @@ -490,9 +555,8 @@ const NUdf::TUnboxedValue* TComputationValueBaseNotSupportedStub::GetElements() } NUdf::TUnboxedValue TComputationValueBaseNotSupportedStub::Run( - const NUdf::IValueBuilder* valueBuilder, - const NUdf::TUnboxedValuePod* args) const -{ + const NUdf::IValueBuilder* valueBuilder, + const NUdf::TUnboxedValuePod* args) const { Y_UNUSED(valueBuilder); Y_UNUSED(args); ThrowNotSupported(__func__); @@ -598,7 +662,9 @@ NUdf::EFetchStatus TComputationValueBaseNotSupportedStub::WideFetch(NUdf::TUnbox return NUdf::EFetchStatus::Finish; } -TString TWideFlowProxyComputationNode::DebugString() const { return "WideFlowArg"; } +TString TWideFlowProxyComputationNode::DebugString() const { + return "WideFlowArg"; +} EValueRepresentation TWideFlowProxyComputationNode::GetRepresentation() const { THROW yexception() << "Failed to get representation kind."; @@ -625,23 +691,29 @@ IComputationNode* TWideFlowProxyComputationNode::AddDependence(const IComputatio return this; } -const IComputationNode* TWideFlowProxyComputationNode::GetSource() const { return nullptr; } +const IComputationNode* TWideFlowProxyComputationNode::GetSource() const { + return nullptr; +} -bool TWideFlowProxyComputationNode::IsTemporaryValue() const { return true; } +bool TWideFlowProxyComputationNode::IsTemporaryValue() const { + return true; +} -void TWideFlowProxyComputationNode::RegisterDependencies() const {} +void TWideFlowProxyComputationNode::RegisterDependencies() const { +} void TWideFlowProxyComputationNode::PrepareStageOne() { std::sort(Dependences_.begin(), Dependences_.end()); Dependences_.erase(std::unique(Dependences_.begin(), Dependences_.end()), Dependences_.cend()); - if (const auto it = std::find(Dependences_.cbegin(), Dependences_.cend(), Owner_); Dependences_.cend() != it) + if (const auto it = std::find(Dependences_.cbegin(), Dependences_.cend(), Owner_); Dependences_.cend() != it) { Dependences_.erase(it); + } } void TWideFlowProxyComputationNode::PrepareStageTwo() { TIndexesMap dependencies; std::for_each(Dependences_.cbegin(), Dependences_.cend(), - std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, Owner_, std::ref(dependencies))); + std::bind(&IComputationNode::CollectDependentIndexes, std::placeholders::_1, Owner_, std::ref(dependencies))); InvalidationSet_.assign(dependencies.cbegin(), dependencies.cend()); } @@ -664,7 +736,7 @@ void TWideFlowProxyComputationNode::SetFetcher(TFetcher&& fetcher) { Fetcher_ = std::move(fetcher); } -EFetchResult TWideFlowProxyComputationNode::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* values) const { +EFetchResult TWideFlowProxyComputationNode::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue* const* values) const { return Fetcher_(ctx, values); } @@ -680,7 +752,7 @@ IComputationExternalNode* LocateExternalNode(const TNodeLocator& nodeLocator, TC return dynamic_cast<IComputationExternalNode*>(LocateNode(nodeLocator, callable, index, pop)); } -template<class TContainerOne, class TContainerTwo> +template <class TContainerOne, class TContainerTwo> TPasstroughtMap GetPasstroughtMap(const TContainerOne& from, const TContainerTwo& to) { TPasstroughtMap map(from.size()); for (size_t i = 0U; i < map.size(); ++i) { @@ -694,7 +766,7 @@ TPasstroughtMap GetPasstroughtMap(const TContainerOne& from, const TContainerTwo return map; } -template<class TContainerOne, class TContainerTwo> +template <class TContainerOne, class TContainerTwo> TPasstroughtMap GetPasstroughtMapOneToOne(const TContainerOne& from, const TContainerTwo& to) { TPasstroughtMap map(from.size()); std::unordered_map<typename TContainerOne::value_type, size_t> unique(map.size()); @@ -705,13 +777,14 @@ TPasstroughtMap GetPasstroughtMapOneToOne(const TContainerOne& from, const TCont if (auto& item = map[i]) { item.reset(); break; - } else + } else { item.emplace(j); - + } } } - } else + } else { map[ins.first->second].reset(); + } } return map; } @@ -722,9 +795,11 @@ template TPasstroughtMap GetPasstroughtMapOneToOne(const TComputationExternalNod template TPasstroughtMap GetPasstroughtMapOneToOne(const TComputationNodePtrVector& from, const TComputationExternalNodePtrVector& to); std::optional<size_t> IsPasstrought(const IComputationNode* root, const TComputationExternalNodePtrVector& args) { - for (size_t i = 0U; i < args.size(); ++i) - if (root == args[i]) + for (size_t i = 0U; i < args.size(); ++i) { + if (root == args[i]) { return {i}; + } + } return std::nullopt; } @@ -742,11 +817,11 @@ TPasstroughtMap MergePasstroughtMaps(const TPasstroughtMap& lhs, const TPasstrou void ApplyChanges(const NUdf::TUnboxedValue& list, NUdf::IApplyContext& applyCtx) { TThresher<false>::DoForEachItem(list, - [&applyCtx] (const NUdf::TUnboxedValue& item) { - if (item.IsBoxed()) - item.Apply(applyCtx); - } - ); + [&applyCtx](const NUdf::TUnboxedValue& item) { + if (item.IsBoxed()) { + item.Apply(applyCtx); + } + }); } const IComputationNode* GetCommonSource(const IComputationNode* first, const IComputationNode* second, const IComputationNode* common) { @@ -769,5 +844,5 @@ void CleanupCurrentContext() { } } -} -} +} // namespace NMiniKQL +} // namespace NKikimr |
