aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@yandex-team.com>2024-12-17 00:12:19 +0300
committergrigoriypisar <grigoriypisar@yandex-team.com>2024-12-17 00:35:46 +0300
commit37b9c88deefbfee76b8f4a1fcd917b028fea3c30 (patch)
treee20796127dc23ff4e7c4f439263974ea54d59917
parenta0ed0141c57419c25ed8b04129d18443261a8ad0 (diff)
downloadydb-37b9c88deefbfee76b8f4a1fcd917b028fea3c30.tar.gz
added cache invalidation for computation graph
Fixed purecalc cache invalidation commit_hash:088ac16b3481db39893f4fab0cf80c3809a5dcdd
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node.h4
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_graph.cpp6
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp1
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_impl.cpp16
-rw-r--r--yql/essentials/public/purecalc/common/interface.h5
-rw-r--r--yql/essentials/public/purecalc/common/worker.cpp11
-rw-r--r--yql/essentials/public/purecalc/common/worker.h1
-rw-r--r--yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp2
-rw-r--r--yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp3
9 files changed, 40 insertions, 9 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node.h b/yql/essentials/minikql/computation/mkql_computation_node.h
index da109d4234..85462824fd 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node.h
+++ b/yql/essentials/minikql/computation/mkql_computation_node.h
@@ -77,6 +77,7 @@ struct TComputationMutables {
std::vector<ui32> SerializableValues; // Indices of values that need to be saved in IComputationGraph::SaveGraphState() and restored in IComputationGraph::LoadGraphState().
ui32 CurWideFieldsIndex = 0U;
std::vector<TWideFieldsInitInfo> WideFieldInitialize;
+ std::vector<ui32> CachedValues; // Indices of values that holds temporary cached data and unreachable by dependencies
void DeferWideFieldsInit(ui32 count, std::set<ui32> used) {
Y_DEBUG_ABORT_UNLESS(AllOf(used, [count](ui32 i) { return i < count; }));
@@ -251,7 +252,8 @@ public:
virtual IComputationExternalNode* GetEntryPoint(size_t index, bool require) = 0;
virtual const TArrowKernelsTopology* GetKernelsTopology() = 0;
virtual const TComputationNodePtrDeque& GetNodes() const = 0;
- virtual void Invalidate() = 0;
+ virtual void Invalidate() = 0; // Invalidate all mutable values in graph (may lead to udf recreation)
+ virtual void InvalidateCaches() = 0; // Invalidate only cached values
virtual TMemoryUsageInfo& GetMemInfo() const = 0;
virtual const THolderFactory& GetHolderFactory() const = 0;
virtual ITerminator* GetTerminator() const = 0;
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp
index 67e3546a1c..f9209ad239 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp
@@ -696,6 +696,12 @@ public:
std::fill_n(Ctx->MutableValues.get(), PatternNodes->GetMutables().CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
}
+ void InvalidateCaches() override {
+ for (const auto cachedIndex : Ctx->Mutables.CachedValues) {
+ Ctx->MutableValues[cachedIndex] = NUdf::TUnboxedValuePod::Invalid();
+ }
+ }
+
const TComputationNodePtrDeque& GetNodes() const override {
return PatternNodes->GetNodes();
}
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp b/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp
index 2d8ea64c10..3fe7772ba9 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.cpp
@@ -10,6 +10,7 @@ TContainerCacheOnContext::TContainerCacheOnContext(TComputationMutables& mutable
: Index(mutables.CurValueIndex++)
{
++++mutables.CurValueIndex;
+ mutables.CachedValues.insert(mutables.CachedValues.end(), {Index, Index + 1, Index + 2});
}
NUdf::TUnboxedValuePod TContainerCacheOnContext::NewArray(TComputationContext& ctx, ui64 size, NUdf::TUnboxedValue*& items) const {
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
index 8a0cbf65ba..49f686dd9e 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_impl.cpp
@@ -7,7 +7,7 @@ namespace NMiniKQL {
void ThrowNotSupportedImplForClass(const TString& className, const char *func) {
THROW yexception() << "Unsupported access to '" << func << "' method of: " << className;
-}
+}
template <class IComputationNodeInterface>
void TRefCountedComputationNode<IComputationNodeInterface>::Ref() {
@@ -100,7 +100,7 @@ Y_NO_INLINE void TStatefulComputationNodeBase::AddDependenceImpl(const IComputat
Dependencies.emplace_back(node);
}
-Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
+Y_NO_INLINE void TStatefulComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, bool stateless) const {
if (self == owner)
return;
@@ -188,7 +188,7 @@ Y_NO_INLINE TStatefulFlowComputationNodeBase::TStatefulFlowComputationNodeBase(u
, StateKind(stateKind)
{}
-Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
+Y_NO_INLINE void TStatefulFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
if (self == owner)
return;
@@ -205,7 +205,7 @@ Y_NO_INLINE TPairStateFlowComputationNodeBase::TPairStateFlowComputationNodeBase
, SecondKind(secondKind)
{}
-Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
+Y_NO_INLINE void TPairStateFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
if (self == owner)
return;
@@ -221,7 +221,7 @@ Y_NO_INLINE ui32 TStatelessWideFlowComputationNodeBase::GetIndexImpl() const {
THROW yexception() << "Failed to get stateless node index.";
}
-Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
+Y_NO_INLINE void TStatelessWideFlowComputationNodeBase::CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
if (self == owner)
return;
@@ -263,7 +263,7 @@ Y_NO_INLINE TPairStateWideFlowComputationNodeBase::TPairStateWideFlowComputation
{}
Y_NO_INLINE void TPairStateWideFlowComputationNodeBase::CollectDependentIndexesImpl(
- const IComputationNode* self, const IComputationNode* owner,
+ const IComputationNode* self, const IComputationNode* owner,
IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const {
if (self == owner)
return;
@@ -308,7 +308,9 @@ void TExternalComputationNode::CollectDependentIndexes(const IComputationNode*,
TExternalComputationNode::TExternalComputationNode(TComputationMutables& mutables, EValueRepresentation kind)
: TStatefulComputationNode(mutables, kind)
-{}
+{
+ mutables.CachedValues.push_back(ValueIndex);
+}
NUdf::TUnboxedValue TExternalComputationNode::GetValue(TComputationContext& ctx) const {
return Getter ? Getter(ctx) : ValueRef(ctx);
diff --git a/yql/essentials/public/purecalc/common/interface.h b/yql/essentials/public/purecalc/common/interface.h
index 6e56c9aa3f..4096195bd3 100644
--- a/yql/essentials/public/purecalc/common/interface.h
+++ b/yql/essentials/public/purecalc/common/interface.h
@@ -684,6 +684,11 @@ namespace NYql {
* Get time provider
*/
virtual ITimeProvider* GetTimeProvider() const = 0;
+
+ /**
+ * Release all input data from worker state
+ */
+ virtual void Invalidate() = 0;
};
/**
diff --git a/yql/essentials/public/purecalc/common/worker.cpp b/yql/essentials/public/purecalc/common/worker.cpp
index 58cbf23c92..285b638025 100644
--- a/yql/essentials/public/purecalc/common/worker.cpp
+++ b/yql/essentials/public/purecalc/common/worker.cpp
@@ -340,6 +340,17 @@ void TWorker<TBase>::Release() {
}
}
+template <typename TBase>
+void TWorker<TBase>::Invalidate() {
+ auto& ctx = Graph_.ComputationGraph_->GetContext();
+ for (const auto* selfNode : Graph_.SelfNodes_) {
+ if (selfNode) {
+ selfNode->InvalidateValue(ctx);
+ }
+ }
+ Graph_.ComputationGraph_->InvalidateCaches();
+}
+
TPullStreamWorker::~TPullStreamWorker() {
auto guard = Guard(GetScopedAlloc());
Output_.Clear();
diff --git a/yql/essentials/public/purecalc/common/worker.h b/yql/essentials/public/purecalc/common/worker.h
index 07b8dfa2e7..39f381c9ea 100644
--- a/yql/essentials/public/purecalc/common/worker.h
+++ b/yql/essentials/public/purecalc/common/worker.h
@@ -104,6 +104,7 @@ namespace NYql {
const TString& GetLLVMSettings() const override;
ui64 GetNativeYtTypeFlags() const override;
ITimeProvider* GetTimeProvider() const override;
+ void Invalidate() override;
protected:
void Release() override;
};
diff --git a/yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp b/yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp
index 797f3c5b51..2932538f78 100644
--- a/yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp
+++ b/yql/essentials/public/purecalc/ut/test_mixed_allocators.cpp
@@ -51,7 +51,7 @@ namespace {
// Clear graph after each object because
// values allocated on another allocator and should be released
- Worker_->GetGraph().Invalidate();
+ Worker_->Invalidate();
}
}
diff --git a/yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp b/yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp
index dd6a8bbb20..a2f7bf601f 100644
--- a/yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp
+++ b/yt/yql/providers/yt/lib/lambda_builder/lambda_builder.cpp
@@ -130,6 +130,9 @@ public:
void Invalidate() final {
return Graph->Invalidate();
}
+ void InvalidateCaches() final {
+ return Graph->InvalidateCaches();
+ }
TMemoryUsageInfo& GetMemInfo() const final {
return Graph->GetMemInfo();
}