diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2024-12-17 00:12:19 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2024-12-17 00:35:46 +0300 |
commit | 37b9c88deefbfee76b8f4a1fcd917b028fea3c30 (patch) | |
tree | e20796127dc23ff4e7c4f439263974ea54d59917 | |
parent | a0ed0141c57419c25ed8b04129d18443261a8ad0 (diff) | |
download | ydb-37b9c88deefbfee76b8f4a1fcd917b028fea3c30.tar.gz |
added cache invalidation for computation graph
Fixed purecalc cache invalidation
commit_hash:088ac16b3481db39893f4fab0cf80c3809a5dcdd
9 files changed, 40 insertions, 9 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node.h b/yql/essentials/minikql/computation/mkql_computation_node.h index da109d4234..85462824fd 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node.h +++ b/yql/essentials/minikql/computation/mkql_computation_node.h @@ -77,6 +77,7 @@ struct TComputationMutables { std::vector<ui32> SerializableValues; // Indices of values that need to be saved in IComputationGraph::SaveGraphState() and restored in IComputationGraph::LoadGraphState(). ui32 CurWideFieldsIndex = 0U; std::vector<TWideFieldsInitInfo> WideFieldInitialize; + std::vector<ui32> CachedValues; // Indices of values that holds temporary cached data and unreachable by dependencies void DeferWideFieldsInit(ui32 count, std::set<ui32> used) { Y_DEBUG_ABORT_UNLESS(AllOf(used, [count](ui32 i) { return i < count; })); @@ -251,7 +252,8 @@ public: virtual IComputationExternalNode* GetEntryPoint(size_t index, bool require) = 0; virtual const TArrowKernelsTopology* GetKernelsTopology() = 0; virtual const TComputationNodePtrDeque& GetNodes() const = 0; - virtual void Invalidate() = 0; + virtual void Invalidate() = 0; // Invalidate all mutable values in graph (may lead to udf recreation) + virtual void InvalidateCaches() = 0; // Invalidate only cached values virtual TMemoryUsageInfo& GetMemInfo() const = 0; virtual const THolderFactory& GetHolderFactory() const = 0; virtual ITerminator* GetTerminator() const = 0; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp index 67e3546a1c..f9209ad239 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp @@ -696,6 +696,12 @@ public: std::fill_n(Ctx->MutableValues.get(), PatternNodes->GetMutables().CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); } + void InvalidateCaches() override { + for (const auto cachedIndex : Ctx->Mutables.CachedValues) { + Ctx->MutableValues[cachedIndex] = NUdf::TUnboxedValuePod::Invalid(); + } + } + const TComputationNodePtrDeque& GetNodes() const override { return PatternNodes->GetNodes(); } diff --git a/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp b/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp index 2d8ea64c10..3fe7772ba9 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp @@ -10,6 +10,7 @@ TContainerCacheOnContext::TContainerCacheOnContext(TComputationMutables& mutable : Index(mutables.CurValueIndex++) { ++++mutables.CurValueIndex; + mutables.CachedValues.insert(mutables.CachedValues.end(), {Index, Index + 1, Index + 2}); } NUdf::TUnboxedValuePod TContainerCacheOnContext::NewArray(TComputationContext& ctx, ui64 size, NUdf::TUnboxedValue*& items) const { diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp index 8a0cbf65ba..49f686dd9e 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp @@ -7,7 +7,7 @@ namespace NMiniKQL { void ThrowNotSupportedImplForClass(const TString& className, const char *func) { THROW yexception() << "Unsupported access to '" << func << "' method of: " << className; -} +} template <class IComputationNodeInterface> void TRefCountedComputationNode<IComputationNodeInterface>::Ref() { @@ -100,7 +100,7 @@ Y_NO_INLINE void TStatefulComputationNodeBase::AddDependenceImpl(const IComputat Dependencies.emplace_back(node); } -Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, bool stateless) const { if (self == owner) return; @@ -188,7 +188,7 @@ Y_NO_INLINE TStatefulFlowComputationNodeBase::TStatefulFlowComputationNodeBase(u , StateKind(stateKind) {} -Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { if (self == owner) return; @@ -205,7 +205,7 @@ Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase , SecondKind(secondKind) {} -Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { if (self == owner) return; @@ -221,7 +221,7 @@ Y_NO_INLINE ui32 TStatelessWideFlowComputationNodeBase::GetIndexImpl() const { THROW yexception() << "Failed to get stateless node index."; } -Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, +Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { if (self == owner) return; @@ -263,7 +263,7 @@ Y_NO_INLINE TPairStateWideFlowComputationNodeBase::TPairStateWideFlowComputation {} Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesImpl( - const IComputationNode* self, const IComputationNode* owner, + const IComputationNode* self, const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const { if (self == owner) return; @@ -308,7 +308,9 @@ void TExternalComputationNode::CollectDependentIndexes(const IComputationNode*, TExternalComputationNode::TExternalComputationNode(TComputationMutables& mutables, EValueRepresentation kind) : TStatefulComputationNode(mutables, kind) -{} +{ + mutables.CachedValues.push_back(ValueIndex); +} NUdf::TUnboxedValue TExternalComputationNode::GetValue(TComputationContext& ctx) const { return Getter ? Getter(ctx) : ValueRef(ctx); diff --git a/yql/essentials/public/purecalc/common/interface.h b/yql/essentials/public/purecalc/common/interface.h index 6e56c9aa3f..4096195bd3 100644 --- a/yql/essentials/public/purecalc/common/interface.h +++ b/yql/essentials/public/purecalc/common/interface.h @@ -684,6 +684,11 @@ namespace NYql { * Get time provider */ virtual ITimeProvider* GetTimeProvider() const = 0; + + /** + * Release all input data from worker state + */ + virtual void Invalidate() = 0; }; /** diff --git a/yql/essentials/public/purecalc/common/worker.cpp b/yql/essentials/public/purecalc/common/worker.cpp index 58cbf23c92..285b638025 100644 --- a/yql/essentials/public/purecalc/common/worker.cpp +++ b/yql/essentials/public/purecalc/common/worker.cpp @@ -340,6 +340,17 @@ void TWorker<TBase>::Release() { } } +template <typename TBase> +void TWorker<TBase>::Invalidate() { + auto& ctx = Graph_.ComputationGraph_->GetContext(); + for (const auto* selfNode : Graph_.SelfNodes_) { + if (selfNode) { + selfNode->InvalidateValue(ctx); + } + } + Graph_.ComputationGraph_->InvalidateCaches(); +} + TPullStreamWorker::~TPullStreamWorker() { auto guard = Guard(GetScopedAlloc()); Output_.Clear(); diff --git a/yql/essentials/public/purecalc/common/worker.h b/yql/essentials/public/purecalc/common/worker.h index 07b8dfa2e7..39f381c9ea 100644 --- a/yql/essentials/public/purecalc/common/worker.h +++ b/yql/essentials/public/purecalc/common/worker.h @@ -104,6 +104,7 @@ namespace NYql { const TString& GetLLVMSettings() const override; ui64 GetNativeYtTypeFlags() const override; ITimeProvider* GetTimeProvider() const override; + void Invalidate() override; protected: void Release() override; }; diff --git a/yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp b/yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp index 797f3c5b51..2932538f78 100644 --- a/yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp +++ b/yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp @@ -51,7 +51,7 @@ namespace { // Clear graph after each object because // values allocated on another allocator and should be released - Worker_->GetGraph().Invalidate(); + Worker_->Invalidate(); } } diff --git a/yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp b/yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp index dd6a8bbb20..a2f7bf601f 100644 --- a/yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp +++ b/yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp @@ -130,6 +130,9 @@ public: void Invalidate() final { return Graph->Invalidate(); } + void InvalidateCaches() final { + return Graph->InvalidateCaches(); + } TMemoryUsageInfo& GetMemInfo() const final { return Graph->GetMemInfo(); } |