aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaksim Kita <kitaetoya@gmail.com>2023-08-23 19:24:24 +0300
committermaksim-kita <maksim-kita@yandex-team.com>2023-08-23 20:15:10 +0300
commitba0e871a8c90ed7abba88cc4036f541f5bee4e4f (patch)
tree9627d08335aa59ddd75e6139ad3b06f8a05f8d75
parent996b0dd4bcdc66f79875f5e91ff9eb1d21d8ad75 (diff)
downloadydb-ba0e871a8c90ed7abba88cc4036f541f5bee4e4f.tar.gz
Pattern cache restrict compiled code size
Pattern cache restrict compiled code size Pull Request resolved: #333
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp40
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp12
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/library/yql/minikql/codegen/codegen.cpp17
-rw-r--r--ydb/library/yql/minikql/codegen/codegen.h5
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.h2
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp43
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp308
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h42
9 files changed, 356 insertions, 114 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
index b41d4bf6d6..463bdd09ce 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
@@ -54,18 +54,18 @@ private:
size_t patternsToCompileSize = PatternsToCompile.size();
for (; PatternToCompileIndex < patternsToCompileSize && compilationIntervalMs > 0; ++PatternToCompileIndex) {
- auto & entry = PatternsToCompile[PatternToCompileIndex];
- if (!entry->IsInCache.load()) {
+ auto& patternToCompile = PatternsToCompile[PatternToCompileIndex];
+ if (!patternToCompile.Entry->IsInCache.load()) {
continue;
}
timer.Reset();
- entry->Pattern->Compile({}, nullptr);
- Counters->CompiledComputationPatterns->Inc();
-
- entry = nullptr;
+ patternToCompile.Entry->Pattern->Compile({}, nullptr);
+ patternCache->NotifyPatternCompiled(patternToCompile.SerializedProgram, patternToCompile.Entry);
+ patternToCompile.Entry = nullptr;
+ Counters->CompiledComputationPatterns->Inc();
compilationIntervalMs -= static_cast<i64>(timer.Get().MilliSeconds());
}
@@ -73,6 +73,8 @@ private:
PatternsToCompile.clear();
}
+ Counters->CompileComputationPatternsQueueSize->Set(PatternsToCompile.size());
+
ScheduleWakeup(ctx);
}
@@ -85,21 +87,21 @@ private:
THashMap<TString, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile;
patternCache->GetPatternsToCompile(patternsToCompile);
- TVector<std::pair<std::shared_ptr<NMiniKQL::TPatternCacheEntry>, size_t>> patternsToCompileWithAccessSize;
- for (auto & [_, entry] : patternsToCompile) {
- patternsToCompileWithAccessSize.emplace_back(entry, entry->AccessTimes.load());
+ TVector<std::pair<TPatternToCompile, size_t>> patternsToCompileWithAccessTimes;
+ for (auto& [serializedProgram, entry] : patternsToCompile) {
+ patternsToCompileWithAccessTimes.emplace_back(TPatternToCompile{serializedProgram, entry}, entry->AccessTimes.load());
}
- std::sort(patternsToCompileWithAccessSize.begin(), patternsToCompileWithAccessSize.end(), [](auto & lhs, auto & rhs) {
+ std::sort(patternsToCompileWithAccessTimes.begin(), patternsToCompileWithAccessTimes.end(), [](auto & lhs, auto & rhs) {
return lhs.second > rhs.second;
});
- PatternsToCompile.reserve(patternsToCompileWithAccessSize.size());
- for (auto & [entry, _] : patternsToCompileWithAccessSize) {
- PatternsToCompile.push_back(entry);
+ PatternsToCompile.reserve(patternsToCompileWithAccessTimes.size());
+ for (auto& [patternToCompile, _] : patternsToCompileWithAccessTimes) {
+ PatternsToCompile.push_back(patternToCompile);
}
- *Counters->CompileQueueSize = PatternsToCompile.size();
+ Counters->CompileComputationPatternsQueueSize->Set(PatternsToCompile.size());
PatternToCompileIndex = 0;
}
@@ -112,9 +114,13 @@ private:
TDuration WakeupInterval;
TIntrusivePtr<TKqpCounters> Counters;
- using PatternsToCompileContainer = TVector<std::shared_ptr<NMiniKQL::TPatternCacheEntry>>;
- using PatternsToCompileContainerIterator = PatternsToCompileContainer::iterator;
- PatternsToCompileContainer PatternsToCompile;
+ struct TPatternToCompile {
+ TString SerializedProgram;
+ std::shared_ptr<NMiniKQL::TPatternCacheEntry> Entry;
+ };
+
+ using TPatternsToCompileContainer = TVector<TPatternToCompile>;
+ TPatternsToCompileContainer PatternsToCompile;
size_t PatternToCompileIndex = 0;
};
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index 14bb7e20a8..874a4b1721 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -150,7 +150,9 @@ public:
}
ActorSystem = actorSystem;
SelfId = selfId;
- UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(), Config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
+ UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(),
+ Config.GetKqpPatternCacheCompiledCapacityBytes(),
+ Config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
if (PublishResourcesByExchanger) {
CreateResourceInfoExchanger(Config.GetInfoExchangerSettings());
@@ -619,13 +621,13 @@ public:
ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources);
}
- void UpdatePatternCache(ui64 maxSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) {
+ void UpdatePatternCache(ui64 maxSizeBytes, ui64 maxCompiledSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) {
if (maxSizeBytes == 0) {
PatternCache.reset();
return;
}
- NMiniKQL::TComputationPatternLRUCache::Config config{maxSizeBytes, patternAccessTimesBeforeTryToCompile};
+ NMiniKQL::TComputationPatternLRUCache::Config config{maxSizeBytes, maxCompiledSizeBytes, patternAccessTimesBeforeTryToCompile};
if (!PatternCache || PatternCache->GetConfiguration() != config) {
PatternCache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(config, Counters->GetKqpCounters());
}
@@ -888,7 +890,9 @@ private:
Send(ev->Sender, new NConsole::TEvConsole::TEvConfigNotificationResponse(event), IEventHandle::FlagTrackDelivery, ev->Cookie);
auto& config = *event.MutableConfig()->MutableTableServiceConfig()->MutableResourceManager();
- ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(), config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
+ ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(),
+ config.GetKqpPatternCacheCompiledCapacityBytes(),
+ config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
bool enablePublishResourcesByExchanger = config.GetEnablePublishResourcesByExchanger();
if (enablePublishResourcesByExchanger != PublishResourcesByExchanger) {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index c06cb45f37..ca7118320f 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1165,6 +1165,7 @@ message TTableServiceConfig {
optional bool EnablePublishResourcesByExchanger = 18 [default = true];
optional TInfoExchangerSettings InfoExchangerSettings = 19;
optional uint64 KqpPatternCachePatternAccessTimesBeforeTryToCompile = 20 [default = 5];
+ optional uint64 KqpPatternCacheCompiledCapacityBytes = 21 [default = 104857600]; // 100 MiB
}
message TSpillingServiceConfig {
diff --git a/ydb/library/yql/minikql/codegen/codegen.cpp b/ydb/library/yql/minikql/codegen/codegen.cpp
index 6e4096af22..7508858c1c 100644
--- a/ydb/library/yql/minikql/codegen/codegen.cpp
+++ b/ydb/library/yql/minikql/codegen/codegen.cpp
@@ -487,6 +487,7 @@ public:
passManagerBuilder.populateModulePassManager(*modulePassManager);
passManagerBuilder.populateFunctionPassManager(*functionPassManager);
+ auto functionPassStart = Now();
functionPassManager->doInitialization();
for (auto it = Module_->begin(), jt = Module_->end(); it != jt; ++it) {
if (!it->isDeclaration()) {
@@ -495,6 +496,10 @@ public:
}
functionPassManager->doFinalization();
+ if (compileStats) {
+ compileStats->FunctionPassTime = (Now() - functionPassStart).MilliSeconds();
+ }
+
auto modulePassStart = Now();
modulePassManager->run(*Module_);
if (compileStats) {
@@ -531,6 +536,10 @@ public:
llvm::TimerGroup::printAll(llvm::errs());
llvm::TimePassesIsEnabled = false;
}
+
+ if (compileStats) {
+ compileStats->TotalObjectSize = TotalObjectSize;
+ }
}
void* GetPointerToFunction(llvm::Function* function) override {
@@ -660,6 +669,8 @@ public:
const llvm::RuntimeDyld::LoadedObjectInfo &loi) override
{
Y_UNUSED(key);
+ TotalObjectSize += obj.getData().size();
+
for (const auto& section : obj.sections()) {
//auto nameExp = section.getName();
//auto name = nameExp.get();
@@ -732,6 +743,7 @@ private:
llvm::JITEventListener* PerfListener_ = nullptr;
std::unique_ptr<llvm::ExecutionEngine> Engine_;
std::vector<std::pair<llvm::object::SectionRef, ui64>> CodeSections_;
+ ui64 TotalObjectSize = 0;
std::vector<std::pair<ui64, llvm::Function*>> SortedFuncs_;
TMaybe<THashSet<TString>> ExportedSymbols;
THashMap<const void*, TString> ReverseGlobalMapping_;
@@ -745,5 +757,10 @@ ICodegen::Make(ETarget target, ESanitize sanitize) {
return std::make_unique<TCodegen>(target, sanitize);
}
+ICodegen::TSharedPtr
+ICodegen::MakeShared(ETarget target, ESanitize sanitize) {
+ return std::make_shared<TCodegen>(target, sanitize);
+}
+
}
}
diff --git a/ydb/library/yql/minikql/codegen/codegen.h b/ydb/library/yql/minikql/codegen/codegen.h
index 4b252c30e6..6bfac89317 100644
--- a/ydb/library/yql/minikql/codegen/codegen.h
+++ b/ydb/library/yql/minikql/codegen/codegen.h
@@ -38,8 +38,10 @@ struct TCodegenStats {
};
struct TCompileStats {
+ ui32 FunctionPassTime = 0;
ui32 ModulePassTime = 0;
ui32 FinalizeTime = 0;
+ ui64 TotalObjectSize = 0;
};
class ICodegen {
@@ -62,6 +64,9 @@ public:
using TPtr = std::unique_ptr<ICodegen>;
static TPtr Make(ETarget target, ESanitize sanitize = ESanitize::Auto);
+
+ using TSharedPtr = std::shared_ptr<ICodegen>;
+ static TSharedPtr MakeShared(ETarget target, ESanitize sanitize = ESanitize::Auto);
};
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h
index 25d2c08185..92de6674b0 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h
@@ -407,6 +407,8 @@ public:
virtual ~IComputationPattern() = default;
virtual void Compile(TString optLLVM, IStatsRegistry* stats) = 0;
virtual bool IsCompiled() const = 0;
+ virtual size_t CompiledCodeSize() const = 0;
+ virtual void RemoveCompiledCode() = 0;
virtual THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) = 0;
virtual bool GetSuitableForCache() const = 0;
};
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
index 89ca1b4a95..f50083cbc5 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
@@ -45,6 +45,7 @@ const static TStatKey CodeGen_CompileTime("CodeGen_CompileTime", true);
const static TStatKey CodeGen_TotalFunctions("CodeGen_TotalFunctions", true);
const static TStatKey CodeGen_TotalInstructions("CodeGen_TotalInstructions", true);
const static TStatKey CodeGen_MaxFunctionInstructions("CodeGen_MaxFunctionInstructions", false);
+const static TStatKey CodeGen_FunctionPassTime("CodeGen_FunctionPassTime", true);
const static TStatKey CodeGen_ModulePassTime("CodeGen_ModulePassTime", true);
const static TStatKey CodeGen_FinalizeTime("CodeGen_FinalizeTime", true);
@@ -563,11 +564,11 @@ private:
class TComputationGraph final : public IComputationGraph {
public:
- TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, bool executeLLVM = false)
+ TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, NYql::NCodegen::ICodegen::TSharedPtr codegen)
: PatternNodes(patternNodes)
, MemInfo(MakeIntrusive<TMemoryUsageInfo>("ComputationGraph"))
, CompOpts(compOpts)
- , ExecuteLLVM(executeLLVM)
+ , Codegen(std::move(codegen))
{
#ifndef NDEBUG
CompOpts.AllocState.ActiveMemInfo.emplace(MemInfo.Get(), MemInfo);
@@ -599,7 +600,7 @@ public:
PatternNodes->GetMutables(),
//*ArrowMemoryPool
*arrow::default_memory_pool()));
- Ctx->ExecuteLLVM = ExecuteLLVM;
+ Ctx->ExecuteLLVM = Codegen.get() != nullptr;
ValueBuilder->SetCalleePositionHolder(Ctx->CalleePosition);
for (auto& node : PatternNodes->GetNodes()) {
node->InitNode(*Ctx);
@@ -712,7 +713,6 @@ public:
bool SetExecuteLLVM(bool value) override {
const bool old = Ctx->ExecuteLLVM;
Ctx->ExecuteLLVM = value;
- ExecuteLLVM = value;
return old;
}
@@ -762,8 +762,8 @@ private:
std::unique_ptr<arrow::MemoryPool> ArrowMemoryPool;
THolder<TComputationContext> Ctx;
TComputationOptsFull CompOpts;
+ NYql::NCodegen::ICodegen::TSharedPtr Codegen;
bool IsPrepared = false;
- bool ExecuteLLVM = false;
std::optional<TArrowKernelsTopology> KernelsTopology;
};
@@ -773,9 +773,9 @@ public:
#if defined(MKQL_DISABLE_CODEGEN)
: Codegen()
#elif defined(MKQL_FORCE_USE_CODEGEN)
- : Codegen(NYql::NCodegen::ICodegen::Make(NYql::NCodegen::ETarget::Native))
+ : Codegen(NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native))
#else
- : Codegen(opts.OptLLVM != "OFF" || GetEnv(TString("MKQL_FORCE_USE_LLVM")) ? NYql::NCodegen::ICodegen::Make(NYql::NCodegen::ETarget::Native) : NYql::NCodegen::ICodegen::TPtr())
+ : Codegen(opts.OptLLVM != "OFF" || GetEnv(TString("MKQL_FORCE_USE_LLVM")) ? NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native) : NYql::NCodegen::ICodegen::TPtr())
#endif
{
const auto& nodes = builder->GetNodes();
@@ -861,11 +861,11 @@ public:
Codegen.reset();
} else {
Codegen->Verify();
- NYql::NCodegen::TCompileStats compileStats;
- Codegen->Compile(GetCompileOptions(optLLVM), &compileStats);
+ Codegen->Compile(GetCompileOptions(optLLVM), &CompileStats);
- MKQL_ADD_STAT(stats, CodeGen_ModulePassTime, compileStats.ModulePassTime);
- MKQL_ADD_STAT(stats, CodeGen_FinalizeTime, compileStats.FinalizeTime);
+ MKQL_ADD_STAT(stats, CodeGen_FunctionPassTime, CompileStats.FunctionPassTime);
+ MKQL_ADD_STAT(stats, CodeGen_ModulePassTime, CompileStats.ModulePassTime);
+ MKQL_ADD_STAT(stats, CodeGen_FinalizeTime, CompileStats.FinalizeTime);
}
timerComp.Release();
@@ -900,17 +900,31 @@ public:
timerFull.Release();
timerFull.Report(stats);
+#endif
IsPatternCompiled.store(true);
-#endif
}
bool IsCompiled() const {
return IsPatternCompiled.load();
}
+ size_t CompiledCodeSize() const {
+ return CompileStats.TotalObjectSize;
+ }
+
+ void RemoveCompiledCode() {
+ IsPatternCompiled.store(false);
+ CompileStats = {};
+ Codegen.reset();
+ }
+
THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) {
- return MakeHolder<TComputationGraph>(PatternNodes, compOpts, IsPatternCompiled.load());
+ if (IsPatternCompiled.load()) {
+ return MakeHolder<TComputationGraph>(PatternNodes, compOpts, Codegen);
+ }
+
+ return MakeHolder<TComputationGraph>(PatternNodes, compOpts, nullptr);
}
bool GetSuitableForCache() const {
@@ -933,8 +947,9 @@ private:
TTypeEnvironment* TypeEnv = nullptr;
TPatternNodes::TPtr PatternNodes;
- NYql::NCodegen::ICodegen::TPtr Codegen;
+ NYql::NCodegen::ICodegen::TSharedPtr Codegen;
std::atomic<bool> IsPatternCompiled = false;
+ NYql::NCodegen::TCompileStats CompileStats;
};
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
index a50c72e686..d2fe8dfdd6 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
@@ -1,24 +1,226 @@
#include "mkql_computation_pattern_cache.h"
+#include <util/generic/intrlist.h>
+
namespace NKikimr::NMiniKQL {
+class TComputationPatternLRUCache::TLRUPatternCacheImpl
+{
+public:
+ TLRUPatternCacheImpl(size_t maxPatternsSize,
+ size_t maxPatternsSizeBytes,
+ size_t maxCompiledPatternsSize,
+ size_t maxCompiledPatternsSizeBytes)
+ : MaxPatternsSize(maxPatternsSize)
+ , MaxPatternsSizeBytes(maxPatternsSizeBytes)
+ , MaxCompiledPatternsSize(maxCompiledPatternsSize)
+ , MaxCompiledPatternsSizeBytes(maxCompiledPatternsSizeBytes)
+ {}
+
+ size_t PatternsSize() const {
+ return SerializedProgramToPatternCacheHolder.size();
+ }
+
+ size_t PatternsSizeInBytes() const {
+ return CurrentPatternsSizeBytes;
+ }
+
+ size_t CompiledPatternsSize() const {
+ return CurrentCompiledPatternsSize;
+ }
+
+ size_t PatternsCompiledCodeSizeInBytes() const {
+ return CurrentPatternsCompiledCodeSizeInBytes;
+ }
+
+ std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) {
+ auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
+ if (it == SerializedProgramToPatternCacheHolder.end()) {
+ return nullptr;
+ }
+
+ PromoteEntry(&it->second);
+
+ return &it->second.Entry;
+ }
+
+ void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
+ auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct,
+ std::forward_as_tuple(serializedProgram),
+ std::forward_as_tuple(serializedProgram, entry));
+
+ if (!inserted) {
+ RemoveEntryFromLists(&it->second);
+ }
+
+ entry->UpdateSizeForCache();
+
+ /// New item is inserted, insert it in the back of both LRU lists and recalculate sizes
+ CurrentPatternsSizeBytes += entry->SizeForCache;
+ LRUPatternList.PushBack(&it->second);
+
+ if (entry->Pattern->IsCompiled()) {
+ ++CurrentCompiledPatternsSize;
+ CurrentPatternsCompiledCodeSizeInBytes += entry->Pattern->CompiledCodeSize();
+ LRUCompiledPatternList.PushBack(&it->second);
+ }
+
+ entry->IsInCache.store(true);
+ ClearIfNeeded();
+ }
+
+ void NotifyPatternCompiled(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
+ auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
+ if (it == SerializedProgramToPatternCacheHolder.end()) {
+ return;
+ }
+
+ Y_ASSERT(entry->Pattern->IsCompiled());
+
+ Y_ASSERT(!it->second.LinkedInCompiledPatternLRUList());
+ PromoteEntry(&it->second);
+
+ ++CurrentCompiledPatternsSize;
+ CurrentPatternsCompiledCodeSizeInBytes += entry->Pattern->CompiledCodeSize();
+ LRUCompiledPatternList.PushBack(&it->second);
+
+ ClearIfNeeded();
+ }
+
+ void Clear() {
+ SerializedProgramToPatternCacheHolder.clear();
+ for (auto & holder : LRUPatternList) {
+ holder.Entry->IsInCache.store(false);
+ }
+
+ LRUPatternList.Clear();
+ LRUCompiledPatternList.Clear();
+ }
+private:
+ struct TPatternLRUListTag {};
+ struct TCompiledPatternLRUListTag {};
+
+ /** Cache holder is used to store serialized program and pattern cache entry in intrusive LRU lists.
+ * Most recently accessed items are in back of the lists, least recently accessed items are in front of the lists.
+ */
+ struct TPatternCacheHolder : public TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>, TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag> {
+ TPatternCacheHolder(TString serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
+ : SerializedProgram(std::move(serializedProgram))
+ , Entry(std::move(entry))
+ {}
+
+ bool LinkedInPatternLRUList() const {
+ return !TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>::Empty();
+ }
+
+ bool LinkedInCompiledPatternLRUList() const {
+ return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty();
+ }
+
+ TString SerializedProgram;
+ std::shared_ptr<TPatternCacheEntry> Entry;
+ };
+
+ void PromoteEntry(TPatternCacheHolder* holder) {
+ Y_ASSERT(holder->LinkedInPatternLRUList());
+ LRUPatternList.Remove(holder);
+ LRUPatternList.PushBack(holder);
+
+ if (!holder->LinkedInCompiledPatternLRUList()) {
+ return;
+ }
+
+ LRUCompiledPatternList.Remove(holder);
+ LRUCompiledPatternList.PushBack(holder);
+ }
+
+ void RemoveEntryFromLists(TPatternCacheHolder* holder) {
+ Y_ASSERT(holder->LinkedInPatternLRUList());
+ LRUPatternList.Remove(holder);
+
+ Y_ASSERT(holder->Entry->SizeForCache <= CurrentPatternsSizeBytes);
+ CurrentPatternsSizeBytes -= holder->Entry->SizeForCache;
+
+ if (!holder->LinkedInCompiledPatternLRUList()) {
+ return;
+ }
+
+ Y_ASSERT(CurrentCompiledPatternsSize > 0);
+ --CurrentCompiledPatternsSize;
+
+ size_t patternCompiledCodeSize = holder->Entry->Pattern->CompiledCodeSize();
+ Y_ASSERT(patternCompiledCodeSize <= CurrentPatternsCompiledCodeSizeInBytes);
+ CurrentPatternsCompiledCodeSizeInBytes -= patternCompiledCodeSize;
+
+ LRUCompiledPatternList.Remove(holder);
+
+ holder->Entry->IsInCache.store(false);
+ }
+
+ void ClearIfNeeded() {
+ /// Remove from pattern LRU list and compiled pattern LRU list
+ while (SerializedProgramToPatternCacheHolder.size() > MaxPatternsSize || CurrentPatternsSizeBytes > MaxPatternsSizeBytes) {
+ TPatternCacheHolder* holder = LRUPatternList.Front();
+ RemoveEntryFromLists(holder);
+ SerializedProgramToPatternCacheHolder.erase(holder->SerializedProgram);
+ }
+
+ /// Only remove from compiled pattern LRU list
+ while (CurrentCompiledPatternsSize > MaxCompiledPatternsSize || CurrentPatternsCompiledCodeSizeInBytes > MaxCompiledPatternsSizeBytes) {
+ TPatternCacheHolder* holder = LRUCompiledPatternList.PopFront();
+
+ Y_ASSERT(CurrentCompiledPatternsSize > 0);
+ --CurrentCompiledPatternsSize;
+
+ auto & pattern = holder->Entry->Pattern;
+ size_t patternCompiledSize = pattern->CompiledCodeSize();
+ Y_ASSERT(patternCompiledSize <= CurrentPatternsCompiledCodeSizeInBytes);
+ CurrentPatternsCompiledCodeSizeInBytes -= patternCompiledSize;
+
+ pattern->RemoveCompiledCode();
+ holder->Entry->AccessTimes.store(0);
+ }
+ }
+
+ const size_t MaxPatternsSize = 0;
+ const size_t MaxPatternsSizeBytes = 0;
+ const size_t MaxCompiledPatternsSize = 0;
+ const size_t MaxCompiledPatternsSizeBytes = 0;
+
+ size_t CurrentPatternsSizeBytes = 0;
+ size_t CurrentCompiledPatternsSize = 0;
+ size_t CurrentPatternsCompiledCodeSizeInBytes = 0;
+
+ THashMap<TString, TPatternCacheHolder> SerializedProgramToPatternCacheHolder;
+ TIntrusiveList<TPatternCacheHolder, TPatternLRUListTag> LRUPatternList;
+ TIntrusiveList<TPatternCacheHolder, TCompiledPatternLRUListTag> LRUCompiledPatternList;
+};
+
TComputationPatternLRUCache::TComputationPatternLRUCache(TComputationPatternLRUCache::Config configuration, NMonitoring::TDynamicCounterPtr counters)
- : Cache(CacheMaxElementsSize)
+ : Cache(std::make_unique<TLRUPatternCacheImpl>(CacheMaxElementsSize, configuration.MaxSizeBytes, CacheMaxElementsSize, configuration.MaxCompiledSizeBytes))
, Configuration(configuration)
, Hits(counters->GetCounter("PatternCache/Hits", true))
, Waits(counters->GetCounter("PatternCache/Waits", true))
, Misses(counters->GetCounter("PatternCache/Misses", true))
, NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true))
, SizeItems(counters->GetCounter("PatternCache/SizeItems", false))
+ , SizeCompiledItems(counters->GetCounter("PatternCache/SizeCompiledItems", false))
, SizeBytes(counters->GetCounter("PatternCache/SizeBytes", false))
+ , SizeCompiledBytes(counters->GetCounter("PatternCache/SizeCompiledBytes", false))
, MaxSizeBytesCounter(counters->GetCounter("PatternCache/MaxSizeBytes", false))
+ , MaxCompiledSizeBytesCounter(counters->GetCounter("PatternCache/MaxCompiledSizeBytes", false))
{
*MaxSizeBytesCounter = Configuration.MaxSizeBytes;
+ *MaxCompiledSizeBytesCounter = Configuration.MaxCompiledSizeBytes;
}
-std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serialized) {
+TComputationPatternLRUCache::~TComputationPatternLRUCache() {
+ CleanCache();
+}
+
+std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) {
std::lock_guard<std::mutex> lock(Mutex);
- if (auto it = Cache.Find(serialized); it != Cache.End()) {
+ if (auto it = Cache->Find(serializedProgram)) {
++*Hits;
return *it;
}
@@ -27,18 +229,18 @@ std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TStr
return {};
}
-TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serialized) {
+TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
std::lock_guard lock(Mutex);
- if (auto it = Cache.Find(serialized); it != Cache.End()) {
+ if (auto it = Cache->Find(serializedProgram)) {
++*Hits;
- AccessPattern(serialized, *it);
- return TTicket(serialized, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr);
+ AccessPattern(serializedProgram, *it);
+ return TTicket(serializedProgram, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr);
}
- auto [notifyIt, isNew] = Notify.emplace(serialized, Nothing());
+ auto [notifyIt, isNew] = Notify.emplace(serializedProgram, Nothing());
if (isNew) {
++*Misses;
- return TTicket(serialized, true, {}, this);
+ return TTicket(serializedProgram, true, {}, this);
}
++*Waits;
@@ -49,54 +251,27 @@ TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscrib
}
subscribers->push_back(promise);
- return TTicket(serialized, false, promise, nullptr);
-}
-
-void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
- TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
- {
- std::lock_guard<std::mutex> lock(Mutex);
- auto notifyIt = Notify.find(serialized);
- if (notifyIt != Notify.end()) {
- subscribers.swap(notifyIt->second);
- Notify.erase(notifyIt);
- }
- }
-
- if (subscribers) {
- for (auto& subscriber : *subscribers) {
- subscriber.SetValue(nullptr);
- }
- }
+ return TTicket(serializedProgram, false, promise, nullptr);
}
-void TComputationPatternLRUCache::EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
+void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern);
TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
+
{
std::lock_guard<std::mutex> lock(Mutex);
- // normally remove only one old cache entry by iteration to prevent bursts
- if (CurrentSizeBytes > Configuration.MaxSizeBytes) {
- RemoveOldest();
- }
- // to prevent huge memory overusage remove as much as needed
- while (CurrentSizeBytes > 2 * Configuration.MaxSizeBytes) {
- RemoveOldest();
- }
-
- patternWithEnv->UpdateSizeForCache();
- CurrentSizeBytes += patternWithEnv->SizeForCache;
+ Cache->Insert(serializedProgram, patternWithEnv);
- Cache.Insert(serialized, patternWithEnv);
- patternWithEnv->IsInCache.store(true);
- auto notifyIt = Notify.find(serialized);
+ auto notifyIt = Notify.find(serializedProgram);
if (notifyIt != Notify.end()) {
subscribers.swap(notifyIt->second);
Notify.erase(notifyIt);
}
- *SizeItems = Cache.Size();
- *SizeBytes = CurrentSizeBytes;
+ *SizeItems = Cache->PatternsSize();
+ *SizeBytes = Cache->PatternsSizeInBytes();
+ *SizeCompiledItems = Cache->CompiledPatternsSize();
+ *SizeCompiledBytes = Cache->PatternsCompiledCodeSizeInBytes();
}
if (subscribers) {
@@ -106,27 +281,23 @@ void TComputationPatternLRUCache::EmplacePattern(const TString& serialized, std:
}
}
+void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
+ std::lock_guard lock(Mutex);
+ Cache->NotifyPatternCompiled(serializedProgram, patternWithEnv);
+}
+
+size_t TComputationPatternLRUCache::GetSize() const {
+ std::lock_guard lock(Mutex);
+ return Cache->PatternsSize();
+}
+
void TComputationPatternLRUCache::CleanCache() {
*SizeItems = 0;
*SizeBytes = 0;
*MaxSizeBytesCounter = 0;
std::lock_guard lock(Mutex);
- CurrentSizeBytes = 0;
PatternsToCompile.clear();
- auto CacheEnd = Cache.End();
- for (auto PatternIt = Cache.Begin(); PatternIt != CacheEnd; ++PatternIt) {
- const_cast<std::shared_ptr<TPatternCacheEntry> &>(PatternIt.Value())->IsInCache.store(false);
- }
- Cache.Clear();
-}
-
-void TComputationPatternLRUCache::RemoveOldest() {
- auto oldest = Cache.FindOldest();
- Y_VERIFY_DEBUG(oldest != Cache.End());
- CurrentSizeBytes -= oldest.Value()->SizeForCache;
- oldest.Value()->IsInCache.store(false);
- PatternsToCompile.erase(oldest.Key());
- Cache.Erase(oldest);
+ Cache->Clear();
}
void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
@@ -141,5 +312,22 @@ void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgra
}
}
+void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
+ TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
+ {
+ std::lock_guard<std::mutex> lock(Mutex);
+ auto notifyIt = Notify.find(serialized);
+ if (notifyIt != Notify.end()) {
+ subscribers.swap(notifyIt->second);
+ Notify.erase(notifyIt);
+ }
+ }
+
+ if (subscribers) {
+ for (auto& subscriber : *subscribers) {
+ subscriber.SetValue(nullptr);
+ }
+ }
+}
} // namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
index 86ffa65963..170ff09040 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
@@ -95,17 +95,19 @@ public:
: MaxSizeBytes(maxSizeBytes)
{}
- Config(size_t maxSizeBytes, size_t patternAccessTimesBeforeTryToCompile)
+ Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes, size_t patternAccessTimesBeforeTryToCompile)
: MaxSizeBytes(maxSizeBytes)
+ , MaxCompiledSizeBytes(maxCompiledSizeBytes)
, PatternAccessTimesBeforeTryToCompile(patternAccessTimesBeforeTryToCompile)
{}
size_t MaxSizeBytes = 0;
+ size_t MaxCompiledSizeBytes = 0;
std::optional<size_t> PatternAccessTimesBeforeTryToCompile;
bool operator==(const Config & rhs) {
- return std::tie(MaxSizeBytes, PatternAccessTimesBeforeTryToCompile) ==
- std::tie(rhs.MaxSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile);
+ return std::tie(MaxSizeBytes, MaxCompiledSizeBytes, PatternAccessTimesBeforeTryToCompile) ==
+ std::tie(rhs.MaxSizeBytes, rhs.MaxCompiledSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile);
}
bool operator!=(const Config & rhs) {
@@ -115,28 +117,23 @@ public:
TComputationPatternLRUCache(Config configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>());
- ~TComputationPatternLRUCache() {
- CleanCache();
- }
+ ~TComputationPatternLRUCache();
static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) {
return std::make_shared<TPatternCacheEntry>(useAlloc);
}
- std::shared_ptr<TPatternCacheEntry> Find(const TString& serialized);
+ std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram);
- TTicket FindOrSubscribe(const TString& serialized);
+ TTicket FindOrSubscribe(const TString& serializedProgram);
- void NotifyMissing(const TString& serialized);
+ void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
- void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
+ void NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
- void CleanCache();
+ size_t GetSize() const;
- size_t GetSize() const {
- std::lock_guard lock(Mutex);
- return Cache.Size();
- }
+ void CleanCache();
Config GetConfiguration() const {
std::lock_guard lock(Mutex);
@@ -167,16 +164,20 @@ public:
}
private:
- void RemoveOldest();
-
void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
+ void NotifyMissing(const TString& serialized);
+
static constexpr size_t CacheMaxElementsSize = 10000;
+ friend class TTicket;
+
mutable std::mutex Mutex;
THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
- TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache;
- size_t CurrentSizeBytes = 0;
+
+ class TLRUPatternCacheImpl;
+ std::unique_ptr<TLRUPatternCacheImpl> Cache;
+
THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
const Config Configuration;
@@ -186,8 +187,11 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr Misses;
NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern;
NMonitoring::TDynamicCounters::TCounterPtr SizeItems;
+ NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledItems;
NMonitoring::TDynamicCounters::TCounterPtr SizeBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledBytes;
NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxCompiledSizeBytesCounter;
};
} // namespace NKikimr::NMiniKQL