diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2024-12-17 00:12:19 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2024-12-17 00:35:46 +0300 |
commit | 37b9c88deefbfee76b8f4a1fcd917b028fea3c30 (patch) | |
tree | e20796127dc23ff4e7c4f439263974ea54d59917 /yql/essentials/minikql | |
parent | a0ed0141c57419c25ed8b04129d18443261a8ad0 (diff) | |
download | ydb-37b9c88deefbfee76b8f4a1fcd917b028fea3c30.tar.gz |
added cache invalidation for computation graph
Fixed purecalc cache invalidation
commit_hash:088ac16b3481db39893f4fab0cf80c3809a5dcdd
Diffstat (limited to 'yql/essentials/minikql')
4 files changed, 19 insertions, 8 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node.h b/yql/essentials/minikql/computation/mkql_computation_node.h index da109d4234..85462824fd 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node.h +++ b/yql/essentials/minikql/computation/mkql_computation_node.h @@ -77,6 +77,7 @@ struct TComputationMutables { std::vector<ui32> SerializableValues; // Indices of values that need to be saved in IComputationGraph::SaveGraphState() and restored in IComputationGraph::LoadGraphState(). ui32 CurWideFieldsIndex = 0U; std::vector<TWideFieldsInitInfo> WideFieldInitialize; + std::vector<ui32> CachedValues; // Indices of values that holds temporary cached data and unreachable by dependencies void DeferWideFieldsInit(ui32 count, std::set<ui32> used) { Y_DEBUG_ABORT_UNLESS(AllOf(used, [count](ui32 i) { return i < count; })); @@ -251,7 +252,8 @@ public: virtual IComputationExternalNode* GetEntryPoint(size_t index, bool require) = 0; virtual const TArrowKernelsTopology* GetKernelsTopology() = 0; virtual const TComputationNodePtrDeque& GetNodes() const = 0; - virtual void Invalidate() = 0; + virtual void Invalidate() = 0; // Invalidate all mutable values in graph (may lead to udf recreation) + virtual void InvalidateCaches() = 0; // Invalidate only cached values virtual TMemoryUsageInfo& GetMemInfo() const = 0; virtual const THolderFactory& GetHolderFactory() const = 0; virtual ITerminator* GetTerminator() const = 0; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp index 67e3546a1c..f9209ad239 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp @@ -696,6 +696,12 @@ public: std::fill_n(Ctx->MutableValues.get(), PatternNodes->GetMutables().CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); } + void InvalidateCaches() override { + for (const auto cachedIndex : Ctx->Mutables.CachedValues) { + Ctx->MutableValues[cachedIndex] = NUdf::TUnboxedValuePod::Invalid(); + } + } + const TComputationNodePtrDeque& GetNodes() const override { return PatternNodes->GetNodes(); } diff --git a/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp b/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp index 2d8ea64c10..3fe7772ba9 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp @@ -10,6 +10,7 @@ TContainerCacheOnContext::TContainerCacheOnContext(TComputationMutables& mutable : Index(mutables.CurValueIndex++) { ++++mutables.CurValueIndex; + mutables.CachedValues.insert(mutables.CachedValues.end(), {Index, Index + 1, Index + 2}); } NUdf::TUnboxedValuePod TContainerCacheOnContext::NewArray(TComputationContext& ctx, ui64 size, NUdf::TUnboxedValue*& items) const { diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp index 8a0cbf65ba..49f686dd9e 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp @@ -7,7 +7,7 @@ namespace NMiniKQL { void ThrowNotSupportedImplForClass(const TString& className, const char *func) { THROW yexception() << "Unsupported access to '" << func << "' method of: " << className; -} +} template <class IComputationNodeInterface> void TRefCountedComputationNode<IComputationNodeInterface>::Ref() { @@ -100,7 +100,7 @@ Y_NO_INLINE void TStatefulComputationNodeBase::AddDependenceImpl(const IComputat Dependencies.emplace_back(node); } -Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, bool stateless) const { if (self == owner) return; @@ -188,7 +188,7 @@ Y_NO_INLINE TStatefulFlowComputationNodeBase::TStatefulFlowComputationNodeBase(u , StateKind(stateKind) {} -Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { if (self == owner) return; @@ -205,7 +205,7 @@ Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase , SecondKind(secondKind) {} -Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { if (self == owner) return; @@ -221,7 +221,7 @@ Y_NO_INLINE ui32 TStatelessWideFlowComputationNodeBase::GetIndexImpl() const { THROW yexception() << "Failed to get stateless node index."; } -Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { if (self == owner) return; @@ -263,7 +263,7 @@ Y_NO_INLINE TPairStateWideFlowComputationNodeBase::TPairStateWideFlowComputation {} Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesImpl( - const IComputationNode* self, const IComputationNode* owner, + const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { if (self == owner) return; @@ -308,7 +308,9 @@ void TExternalComputationNode::CollectDependentIndexes(const IComputationNode*, TExternalComputationNode::TExternalComputationNode(TComputationMutables& mutables, EValueRepresentation kind) : TStatefulComputationNode(mutables, kind) -{} +{ + mutables.CachedValues.push_back(ValueIndex); +} NUdf::TUnboxedValue TExternalComputationNode::GetValue(TComputationContext& ctx) const { return Getter ? Getter(ctx) : ValueRef(ctx); |