diff options
author | Maksim Kita <kitaetoya@gmail.com> | 2023-08-23 19:24:24 +0300 |
---|---|---|
committer | maksim-kita <maksim-kita@yandex-team.com> | 2023-08-23 20:15:10 +0300 |
commit | ba0e871a8c90ed7abba88cc4036f541f5bee4e4f (patch) | |
tree | 9627d08335aa59ddd75e6139ad3b06f8a05f8d75 | |
parent | 996b0dd4bcdc66f79875f5e91ff9eb1d21d8ad75 (diff) | |
download | ydb-ba0e871a8c90ed7abba88cc4036f541f5bee4e4f.tar.gz |
Pattern cache restrict compiled code size
Pattern cache restrict compiled code size
Pull Request resolved: #333
9 files changed, 356 insertions, 114 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp index b41d4bf6d6..463bdd09ce 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp @@ -54,18 +54,18 @@ private: size_t patternsToCompileSize = PatternsToCompile.size(); for (; PatternToCompileIndex < patternsToCompileSize && compilationIntervalMs > 0; ++PatternToCompileIndex) { - auto & entry = PatternsToCompile[PatternToCompileIndex]; - if (!entry->IsInCache.load()) { + auto& patternToCompile = PatternsToCompile[PatternToCompileIndex]; + if (!patternToCompile.Entry->IsInCache.load()) { continue; } timer.Reset(); - entry->Pattern->Compile({}, nullptr); - Counters->CompiledComputationPatterns->Inc(); - - entry = nullptr; + patternToCompile.Entry->Pattern->Compile({}, nullptr); + patternCache->NotifyPatternCompiled(patternToCompile.SerializedProgram, patternToCompile.Entry); + patternToCompile.Entry = nullptr; + Counters->CompiledComputationPatterns->Inc(); compilationIntervalMs -= static_cast<i64>(timer.Get().MilliSeconds()); } @@ -73,6 +73,8 @@ private: PatternsToCompile.clear(); } + Counters->CompileComputationPatternsQueueSize->Set(PatternsToCompile.size()); + ScheduleWakeup(ctx); } @@ -85,21 +87,21 @@ private: THashMap<TString, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile; patternCache->GetPatternsToCompile(patternsToCompile); - TVector<std::pair<std::shared_ptr<NMiniKQL::TPatternCacheEntry>, size_t>> patternsToCompileWithAccessSize; - for (auto & [_, entry] : patternsToCompile) { - patternsToCompileWithAccessSize.emplace_back(entry, entry->AccessTimes.load()); + TVector<std::pair<TPatternToCompile, size_t>> patternsToCompileWithAccessTimes; + for (auto& [serializedProgram, entry] : patternsToCompile) { + patternsToCompileWithAccessTimes.emplace_back(TPatternToCompile{serializedProgram, entry}, entry->AccessTimes.load()); } - std::sort(patternsToCompileWithAccessSize.begin(), patternsToCompileWithAccessSize.end(), [](auto & lhs, auto & rhs) { + std::sort(patternsToCompileWithAccessTimes.begin(), patternsToCompileWithAccessTimes.end(), [](auto & lhs, auto & rhs) { return lhs.second > rhs.second; }); - PatternsToCompile.reserve(patternsToCompileWithAccessSize.size()); - for (auto & [entry, _] : patternsToCompileWithAccessSize) { - PatternsToCompile.push_back(entry); + PatternsToCompile.reserve(patternsToCompileWithAccessTimes.size()); + for (auto& [patternToCompile, _] : patternsToCompileWithAccessTimes) { + PatternsToCompile.push_back(patternToCompile); } - *Counters->CompileQueueSize = PatternsToCompile.size(); + Counters->CompileComputationPatternsQueueSize->Set(PatternsToCompile.size()); PatternToCompileIndex = 0; } @@ -112,9 +114,13 @@ private: TDuration WakeupInterval; TIntrusivePtr<TKqpCounters> Counters; - using PatternsToCompileContainer = TVector<std::shared_ptr<NMiniKQL::TPatternCacheEntry>>; - using PatternsToCompileContainerIterator = PatternsToCompileContainer::iterator; - PatternsToCompileContainer PatternsToCompile; + struct TPatternToCompile { + TString SerializedProgram; + std::shared_ptr<NMiniKQL::TPatternCacheEntry> Entry; + }; + + using TPatternsToCompileContainer = TVector<TPatternToCompile>; + TPatternsToCompileContainer PatternsToCompile; size_t PatternToCompileIndex = 0; }; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 14bb7e20a8..874a4b1721 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -150,7 +150,9 @@ public: } ActorSystem = actorSystem; SelfId = selfId; - UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(), Config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); + UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(), + Config.GetKqpPatternCacheCompiledCapacityBytes(), + Config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); if (PublishResourcesByExchanger) { CreateResourceInfoExchanger(Config.GetInfoExchangerSettings()); @@ -619,13 +621,13 @@ public: ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources); } - void UpdatePatternCache(ui64 maxSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) { + void UpdatePatternCache(ui64 maxSizeBytes, ui64 maxCompiledSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) { if (maxSizeBytes == 0) { PatternCache.reset(); return; } - NMiniKQL::TComputationPatternLRUCache::Config config{maxSizeBytes, patternAccessTimesBeforeTryToCompile}; + NMiniKQL::TComputationPatternLRUCache::Config config{maxSizeBytes, maxCompiledSizeBytes, patternAccessTimesBeforeTryToCompile}; if (!PatternCache || PatternCache->GetConfiguration() != config) { PatternCache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(config, Counters->GetKqpCounters()); } @@ -888,7 +890,9 @@ private: Send(ev->Sender, new NConsole::TEvConsole::TEvConfigNotificationResponse(event), IEventHandle::FlagTrackDelivery, ev->Cookie); auto& config = *event.MutableConfig()->MutableTableServiceConfig()->MutableResourceManager(); - ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(), config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); + ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(), + config.GetKqpPatternCacheCompiledCapacityBytes(), + config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); bool enablePublishResourcesByExchanger = config.GetEnablePublishResourcesByExchanger(); if (enablePublishResourcesByExchanger != PublishResourcesByExchanger) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c06cb45f37..ca7118320f 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1165,6 +1165,7 @@ message TTableServiceConfig { optional bool EnablePublishResourcesByExchanger = 18 [default = true]; optional TInfoExchangerSettings InfoExchangerSettings = 19; optional uint64 KqpPatternCachePatternAccessTimesBeforeTryToCompile = 20 [default = 5]; + optional uint64 KqpPatternCacheCompiledCapacityBytes = 21 [default = 104857600]; // 100 MiB } message TSpillingServiceConfig { diff --git a/ydb/library/yql/minikql/codegen/codegen.cpp b/ydb/library/yql/minikql/codegen/codegen.cpp index 6e4096af22..7508858c1c 100644 --- a/ydb/library/yql/minikql/codegen/codegen.cpp +++ b/ydb/library/yql/minikql/codegen/codegen.cpp @@ -487,6 +487,7 @@ public: passManagerBuilder.populateModulePassManager(*modulePassManager); passManagerBuilder.populateFunctionPassManager(*functionPassManager); + auto functionPassStart = Now(); functionPassManager->doInitialization(); for (auto it = Module_->begin(), jt = Module_->end(); it != jt; ++it) { if (!it->isDeclaration()) { @@ -495,6 +496,10 @@ public: } functionPassManager->doFinalization(); + if (compileStats) { + compileStats->FunctionPassTime = (Now() - functionPassStart).MilliSeconds(); + } + auto modulePassStart = Now(); modulePassManager->run(*Module_); if (compileStats) { @@ -531,6 +536,10 @@ public: llvm::TimerGroup::printAll(llvm::errs()); llvm::TimePassesIsEnabled = false; } + + if (compileStats) { + compileStats->TotalObjectSize = TotalObjectSize; + } } void* GetPointerToFunction(llvm::Function* function) override { @@ -660,6 +669,8 @@ public: const llvm::RuntimeDyld::LoadedObjectInfo &loi) override { Y_UNUSED(key); + TotalObjectSize += obj.getData().size(); + for (const auto& section : obj.sections()) { //auto nameExp = section.getName(); //auto name = nameExp.get(); @@ -732,6 +743,7 @@ private: llvm::JITEventListener* PerfListener_ = nullptr; std::unique_ptr<llvm::ExecutionEngine> Engine_; std::vector<std::pair<llvm::object::SectionRef, ui64>> CodeSections_; + ui64 TotalObjectSize = 0; std::vector<std::pair<ui64, llvm::Function*>> SortedFuncs_; TMaybe<THashSet<TString>> ExportedSymbols; THashMap<const void*, TString> ReverseGlobalMapping_; @@ -745,5 +757,10 @@ ICodegen::Make(ETarget target, ESanitize sanitize) { return std::make_unique<TCodegen>(target, sanitize); } +ICodegen::TSharedPtr +ICodegen::MakeShared(ETarget target, ESanitize sanitize) { + return std::make_shared<TCodegen>(target, sanitize); +} + } } diff --git a/ydb/library/yql/minikql/codegen/codegen.h b/ydb/library/yql/minikql/codegen/codegen.h index 4b252c30e6..6bfac89317 100644 --- a/ydb/library/yql/minikql/codegen/codegen.h +++ b/ydb/library/yql/minikql/codegen/codegen.h @@ -38,8 +38,10 @@ struct TCodegenStats { }; struct TCompileStats { + ui32 FunctionPassTime = 0; ui32 ModulePassTime = 0; ui32 FinalizeTime = 0; + ui64 TotalObjectSize = 0; }; class ICodegen { @@ -62,6 +64,9 @@ public: using TPtr = std::unique_ptr<ICodegen>; static TPtr Make(ETarget target, ESanitize sanitize = ESanitize::Auto); + + using TSharedPtr = std::shared_ptr<ICodegen>; + static TSharedPtr MakeShared(ETarget target, ESanitize sanitize = ESanitize::Auto); }; } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h index 25d2c08185..92de6674b0 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h @@ -407,6 +407,8 @@ public: virtual ~IComputationPattern() = default; virtual void Compile(TString optLLVM, IStatsRegistry* stats) = 0; virtual bool IsCompiled() const = 0; + virtual size_t CompiledCodeSize() const = 0; + virtual void RemoveCompiledCode() = 0; virtual THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) = 0; virtual bool GetSuitableForCache() const = 0; }; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp index 89ca1b4a95..f50083cbc5 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp @@ -45,6 +45,7 @@ const static TStatKey CodeGen_CompileTime("CodeGen_CompileTime", true); const static TStatKey CodeGen_TotalFunctions("CodeGen_TotalFunctions", true); const static TStatKey CodeGen_TotalInstructions("CodeGen_TotalInstructions", true); const static TStatKey CodeGen_MaxFunctionInstructions("CodeGen_MaxFunctionInstructions", false); +const static TStatKey CodeGen_FunctionPassTime("CodeGen_FunctionPassTime", true); const static TStatKey CodeGen_ModulePassTime("CodeGen_ModulePassTime", true); const static TStatKey CodeGen_FinalizeTime("CodeGen_FinalizeTime", true); @@ -563,11 +564,11 @@ private: class TComputationGraph final : public IComputationGraph { public: - TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, bool executeLLVM = false) + TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, NYql::NCodegen::ICodegen::TSharedPtr codegen) : PatternNodes(patternNodes) , MemInfo(MakeIntrusive<TMemoryUsageInfo>("ComputationGraph")) , CompOpts(compOpts) - , ExecuteLLVM(executeLLVM) + , Codegen(std::move(codegen)) { #ifndef NDEBUG CompOpts.AllocState.ActiveMemInfo.emplace(MemInfo.Get(), MemInfo); @@ -599,7 +600,7 @@ public: PatternNodes->GetMutables(), //*ArrowMemoryPool *arrow::default_memory_pool())); - Ctx->ExecuteLLVM = ExecuteLLVM; + Ctx->ExecuteLLVM = Codegen.get() != nullptr; ValueBuilder->SetCalleePositionHolder(Ctx->CalleePosition); for (auto& node : PatternNodes->GetNodes()) { node->InitNode(*Ctx); @@ -712,7 +713,6 @@ public: bool SetExecuteLLVM(bool value) override { const bool old = Ctx->ExecuteLLVM; Ctx->ExecuteLLVM = value; - ExecuteLLVM = value; return old; } @@ -762,8 +762,8 @@ private: std::unique_ptr<arrow::MemoryPool> ArrowMemoryPool; THolder<TComputationContext> Ctx; TComputationOptsFull CompOpts; + NYql::NCodegen::ICodegen::TSharedPtr Codegen; bool IsPrepared = false; - bool ExecuteLLVM = false; std::optional<TArrowKernelsTopology> KernelsTopology; }; @@ -773,9 +773,9 @@ public: #if defined(MKQL_DISABLE_CODEGEN) : Codegen() #elif defined(MKQL_FORCE_USE_CODEGEN) - : Codegen(NYql::NCodegen::ICodegen::Make(NYql::NCodegen::ETarget::Native)) + : Codegen(NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native)) #else - : Codegen(opts.OptLLVM != "OFF" || GetEnv(TString("MKQL_FORCE_USE_LLVM")) ? NYql::NCodegen::ICodegen::Make(NYql::NCodegen::ETarget::Native) : NYql::NCodegen::ICodegen::TPtr()) + : Codegen(opts.OptLLVM != "OFF" || GetEnv(TString("MKQL_FORCE_USE_LLVM")) ? NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native) : NYql::NCodegen::ICodegen::TPtr()) #endif { const auto& nodes = builder->GetNodes(); @@ -861,11 +861,11 @@ public: Codegen.reset(); } else { Codegen->Verify(); - NYql::NCodegen::TCompileStats compileStats; - Codegen->Compile(GetCompileOptions(optLLVM), &compileStats); + Codegen->Compile(GetCompileOptions(optLLVM), &CompileStats); - MKQL_ADD_STAT(stats, CodeGen_ModulePassTime, compileStats.ModulePassTime); - MKQL_ADD_STAT(stats, CodeGen_FinalizeTime, compileStats.FinalizeTime); + MKQL_ADD_STAT(stats, CodeGen_FunctionPassTime, CompileStats.FunctionPassTime); + MKQL_ADD_STAT(stats, CodeGen_ModulePassTime, CompileStats.ModulePassTime); + MKQL_ADD_STAT(stats, CodeGen_FinalizeTime, CompileStats.FinalizeTime); } timerComp.Release(); @@ -900,17 +900,31 @@ public: timerFull.Release(); timerFull.Report(stats); +#endif IsPatternCompiled.store(true); -#endif } bool IsCompiled() const { return IsPatternCompiled.load(); } + size_t CompiledCodeSize() const { + return CompileStats.TotalObjectSize; + } + + void RemoveCompiledCode() { + IsPatternCompiled.store(false); + CompileStats = {}; + Codegen.reset(); + } + THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) { - return MakeHolder<TComputationGraph>(PatternNodes, compOpts, IsPatternCompiled.load()); + if (IsPatternCompiled.load()) { + return MakeHolder<TComputationGraph>(PatternNodes, compOpts, Codegen); + } + + return MakeHolder<TComputationGraph>(PatternNodes, compOpts, nullptr); } bool GetSuitableForCache() const { @@ -933,8 +947,9 @@ private: TTypeEnvironment* TypeEnv = nullptr; TPatternNodes::TPtr PatternNodes; - NYql::NCodegen::ICodegen::TPtr Codegen; + NYql::NCodegen::ICodegen::TSharedPtr Codegen; std::atomic<bool> IsPatternCompiled = false; + NYql::NCodegen::TCompileStats CompileStats; }; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp index a50c72e686..d2fe8dfdd6 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp @@ -1,24 +1,226 @@ #include "mkql_computation_pattern_cache.h" +#include <util/generic/intrlist.h> + namespace NKikimr::NMiniKQL { +class TComputationPatternLRUCache::TLRUPatternCacheImpl +{ +public: + TLRUPatternCacheImpl(size_t maxPatternsSize, + size_t maxPatternsSizeBytes, + size_t maxCompiledPatternsSize, + size_t maxCompiledPatternsSizeBytes) + : MaxPatternsSize(maxPatternsSize) + , MaxPatternsSizeBytes(maxPatternsSizeBytes) + , MaxCompiledPatternsSize(maxCompiledPatternsSize) + , MaxCompiledPatternsSizeBytes(maxCompiledPatternsSizeBytes) + {} + + size_t PatternsSize() const { + return SerializedProgramToPatternCacheHolder.size(); + } + + size_t PatternsSizeInBytes() const { + return CurrentPatternsSizeBytes; + } + + size_t CompiledPatternsSize() const { + return CurrentCompiledPatternsSize; + } + + size_t PatternsCompiledCodeSizeInBytes() const { + return CurrentPatternsCompiledCodeSizeInBytes; + } + + std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) { + auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram); + if (it == SerializedProgramToPatternCacheHolder.end()) { + return nullptr; + } + + PromoteEntry(&it->second); + + return &it->second.Entry; + } + + void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) { + auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct, + std::forward_as_tuple(serializedProgram), + std::forward_as_tuple(serializedProgram, entry)); + + if (!inserted) { + RemoveEntryFromLists(&it->second); + } + + entry->UpdateSizeForCache(); + + /// New item is inserted, insert it in the back of both LRU lists and recalculate sizes + CurrentPatternsSizeBytes += entry->SizeForCache; + LRUPatternList.PushBack(&it->second); + + if (entry->Pattern->IsCompiled()) { + ++CurrentCompiledPatternsSize; + CurrentPatternsCompiledCodeSizeInBytes += entry->Pattern->CompiledCodeSize(); + LRUCompiledPatternList.PushBack(&it->second); + } + + entry->IsInCache.store(true); + ClearIfNeeded(); + } + + void NotifyPatternCompiled(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) { + auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram); + if (it == SerializedProgramToPatternCacheHolder.end()) { + return; + } + + Y_ASSERT(entry->Pattern->IsCompiled()); + + Y_ASSERT(!it->second.LinkedInCompiledPatternLRUList()); + PromoteEntry(&it->second); + + ++CurrentCompiledPatternsSize; + CurrentPatternsCompiledCodeSizeInBytes += entry->Pattern->CompiledCodeSize(); + LRUCompiledPatternList.PushBack(&it->second); + + ClearIfNeeded(); + } + + void Clear() { + SerializedProgramToPatternCacheHolder.clear(); + for (auto & holder : LRUPatternList) { + holder.Entry->IsInCache.store(false); + } + + LRUPatternList.Clear(); + LRUCompiledPatternList.Clear(); + } +private: + struct TPatternLRUListTag {}; + struct TCompiledPatternLRUListTag {}; + + /** Cache holder is used to store serialized program and pattern cache entry in intrusive LRU lists. + * Most recently accessed items are in back of the lists, least recently accessed items are in front of the lists. + */ + struct TPatternCacheHolder : public TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>, TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag> { + TPatternCacheHolder(TString serializedProgram, std::shared_ptr<TPatternCacheEntry> entry) + : SerializedProgram(std::move(serializedProgram)) + , Entry(std::move(entry)) + {} + + bool LinkedInPatternLRUList() const { + return !TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>::Empty(); + } + + bool LinkedInCompiledPatternLRUList() const { + return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty(); + } + + TString SerializedProgram; + std::shared_ptr<TPatternCacheEntry> Entry; + }; + + void PromoteEntry(TPatternCacheHolder* holder) { + Y_ASSERT(holder->LinkedInPatternLRUList()); + LRUPatternList.Remove(holder); + LRUPatternList.PushBack(holder); + + if (!holder->LinkedInCompiledPatternLRUList()) { + return; + } + + LRUCompiledPatternList.Remove(holder); + LRUCompiledPatternList.PushBack(holder); + } + + void RemoveEntryFromLists(TPatternCacheHolder* holder) { + Y_ASSERT(holder->LinkedInPatternLRUList()); + LRUPatternList.Remove(holder); + + Y_ASSERT(holder->Entry->SizeForCache <= CurrentPatternsSizeBytes); + CurrentPatternsSizeBytes -= holder->Entry->SizeForCache; + + if (!holder->LinkedInCompiledPatternLRUList()) { + return; + } + + Y_ASSERT(CurrentCompiledPatternsSize > 0); + --CurrentCompiledPatternsSize; + + size_t patternCompiledCodeSize = holder->Entry->Pattern->CompiledCodeSize(); + Y_ASSERT(patternCompiledCodeSize <= CurrentPatternsCompiledCodeSizeInBytes); + CurrentPatternsCompiledCodeSizeInBytes -= patternCompiledCodeSize; + + LRUCompiledPatternList.Remove(holder); + + holder->Entry->IsInCache.store(false); + } + + void ClearIfNeeded() { + /// Remove from pattern LRU list and compiled pattern LRU list + while (SerializedProgramToPatternCacheHolder.size() > MaxPatternsSize || CurrentPatternsSizeBytes > MaxPatternsSizeBytes) { + TPatternCacheHolder* holder = LRUPatternList.Front(); + RemoveEntryFromLists(holder); + SerializedProgramToPatternCacheHolder.erase(holder->SerializedProgram); + } + + /// Only remove from compiled pattern LRU list + while (CurrentCompiledPatternsSize > MaxCompiledPatternsSize || CurrentPatternsCompiledCodeSizeInBytes > MaxCompiledPatternsSizeBytes) { + TPatternCacheHolder* holder = LRUCompiledPatternList.PopFront(); + + Y_ASSERT(CurrentCompiledPatternsSize > 0); + --CurrentCompiledPatternsSize; + + auto & pattern = holder->Entry->Pattern; + size_t patternCompiledSize = pattern->CompiledCodeSize(); + Y_ASSERT(patternCompiledSize <= CurrentPatternsCompiledCodeSizeInBytes); + CurrentPatternsCompiledCodeSizeInBytes -= patternCompiledSize; + + pattern->RemoveCompiledCode(); + holder->Entry->AccessTimes.store(0); + } + } + + const size_t MaxPatternsSize = 0; + const size_t MaxPatternsSizeBytes = 0; + const size_t MaxCompiledPatternsSize = 0; + const size_t MaxCompiledPatternsSizeBytes = 0; + + size_t CurrentPatternsSizeBytes = 0; + size_t CurrentCompiledPatternsSize = 0; + size_t CurrentPatternsCompiledCodeSizeInBytes = 0; + + THashMap<TString, TPatternCacheHolder> SerializedProgramToPatternCacheHolder; + TIntrusiveList<TPatternCacheHolder, TPatternLRUListTag> LRUPatternList; + TIntrusiveList<TPatternCacheHolder, TCompiledPatternLRUListTag> LRUCompiledPatternList; +}; + TComputationPatternLRUCache::TComputationPatternLRUCache(TComputationPatternLRUCache::Config configuration, NMonitoring::TDynamicCounterPtr counters) - : Cache(CacheMaxElementsSize) + : Cache(std::make_unique<TLRUPatternCacheImpl>(CacheMaxElementsSize, configuration.MaxSizeBytes, CacheMaxElementsSize, configuration.MaxCompiledSizeBytes)) , Configuration(configuration) , Hits(counters->GetCounter("PatternCache/Hits", true)) , Waits(counters->GetCounter("PatternCache/Waits", true)) , Misses(counters->GetCounter("PatternCache/Misses", true)) , NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true)) , SizeItems(counters->GetCounter("PatternCache/SizeItems", false)) + , SizeCompiledItems(counters->GetCounter("PatternCache/SizeCompiledItems", false)) , SizeBytes(counters->GetCounter("PatternCache/SizeBytes", false)) + , SizeCompiledBytes(counters->GetCounter("PatternCache/SizeCompiledBytes", false)) , MaxSizeBytesCounter(counters->GetCounter("PatternCache/MaxSizeBytes", false)) + , MaxCompiledSizeBytesCounter(counters->GetCounter("PatternCache/MaxCompiledSizeBytes", false)) { *MaxSizeBytesCounter = Configuration.MaxSizeBytes; + *MaxCompiledSizeBytesCounter = Configuration.MaxCompiledSizeBytes; } -std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serialized) { +TComputationPatternLRUCache::~TComputationPatternLRUCache() { + CleanCache(); +} + +std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) { std::lock_guard<std::mutex> lock(Mutex); - if (auto it = Cache.Find(serialized); it != Cache.End()) { + if (auto it = Cache->Find(serializedProgram)) { ++*Hits; return *it; } @@ -27,18 +229,18 @@ std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TStr return {}; } -TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serialized) { +TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) { std::lock_guard lock(Mutex); - if (auto it = Cache.Find(serialized); it != Cache.End()) { + if (auto it = Cache->Find(serializedProgram)) { ++*Hits; - AccessPattern(serialized, *it); - return TTicket(serialized, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr); + AccessPattern(serializedProgram, *it); + return TTicket(serializedProgram, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr); } - auto [notifyIt, isNew] = Notify.emplace(serialized, Nothing()); + auto [notifyIt, isNew] = Notify.emplace(serializedProgram, Nothing()); if (isNew) { ++*Misses; - return TTicket(serialized, true, {}, this); + return TTicket(serializedProgram, true, {}, this); } ++*Waits; @@ -49,54 +251,27 @@ TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscrib } subscribers->push_back(promise); - return TTicket(serialized, false, promise, nullptr); -} - -void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) { - TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; - { - std::lock_guard<std::mutex> lock(Mutex); - auto notifyIt = Notify.find(serialized); - if (notifyIt != Notify.end()) { - subscribers.swap(notifyIt->second); - Notify.erase(notifyIt); - } - } - - if (subscribers) { - for (auto& subscriber : *subscribers) { - subscriber.SetValue(nullptr); - } - } + return TTicket(serializedProgram, false, promise, nullptr); } -void TComputationPatternLRUCache::EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) { +void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) { Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern); TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; + { std::lock_guard<std::mutex> lock(Mutex); - // normally remove only one old cache entry by iteration to prevent bursts - if (CurrentSizeBytes > Configuration.MaxSizeBytes) { - RemoveOldest(); - } - // to prevent huge memory overusage remove as much as needed - while (CurrentSizeBytes > 2 * Configuration.MaxSizeBytes) { - RemoveOldest(); - } - - patternWithEnv->UpdateSizeForCache(); - CurrentSizeBytes += patternWithEnv->SizeForCache; + Cache->Insert(serializedProgram, patternWithEnv); - Cache.Insert(serialized, patternWithEnv); - patternWithEnv->IsInCache.store(true); - auto notifyIt = Notify.find(serialized); + auto notifyIt = Notify.find(serializedProgram); if (notifyIt != Notify.end()) { subscribers.swap(notifyIt->second); Notify.erase(notifyIt); } - *SizeItems = Cache.Size(); - *SizeBytes = CurrentSizeBytes; + *SizeItems = Cache->PatternsSize(); + *SizeBytes = Cache->PatternsSizeInBytes(); + *SizeCompiledItems = Cache->CompiledPatternsSize(); + *SizeCompiledBytes = Cache->PatternsCompiledCodeSizeInBytes(); } if (subscribers) { @@ -106,27 +281,23 @@ void TComputationPatternLRUCache::EmplacePattern(const TString& serialized, std: } } +void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) { + std::lock_guard lock(Mutex); + Cache->NotifyPatternCompiled(serializedProgram, patternWithEnv); +} + +size_t TComputationPatternLRUCache::GetSize() const { + std::lock_guard lock(Mutex); + return Cache->PatternsSize(); +} + void TComputationPatternLRUCache::CleanCache() { *SizeItems = 0; *SizeBytes = 0; *MaxSizeBytesCounter = 0; std::lock_guard lock(Mutex); - CurrentSizeBytes = 0; PatternsToCompile.clear(); - auto CacheEnd = Cache.End(); - for (auto PatternIt = Cache.Begin(); PatternIt != CacheEnd; ++PatternIt) { - const_cast<std::shared_ptr<TPatternCacheEntry> &>(PatternIt.Value())->IsInCache.store(false); - } - Cache.Clear(); -} - -void TComputationPatternLRUCache::RemoveOldest() { - auto oldest = Cache.FindOldest(); - Y_VERIFY_DEBUG(oldest != Cache.End()); - CurrentSizeBytes -= oldest.Value()->SizeForCache; - oldest.Value()->IsInCache.store(false); - PatternsToCompile.erase(oldest.Key()); - Cache.Erase(oldest); + Cache->Clear(); } void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) { @@ -141,5 +312,22 @@ void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgra } } +void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) { + TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; + { + std::lock_guard<std::mutex> lock(Mutex); + auto notifyIt = Notify.find(serialized); + if (notifyIt != Notify.end()) { + subscribers.swap(notifyIt->second); + Notify.erase(notifyIt); + } + } + + if (subscribers) { + for (auto& subscriber : *subscribers) { + subscriber.SetValue(nullptr); + } + } +} } // namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h index 86ffa65963..170ff09040 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h @@ -95,17 +95,19 @@ public: : MaxSizeBytes(maxSizeBytes) {} - Config(size_t maxSizeBytes, size_t patternAccessTimesBeforeTryToCompile) + Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes, size_t patternAccessTimesBeforeTryToCompile) : MaxSizeBytes(maxSizeBytes) + , MaxCompiledSizeBytes(maxCompiledSizeBytes) , PatternAccessTimesBeforeTryToCompile(patternAccessTimesBeforeTryToCompile) {} size_t MaxSizeBytes = 0; + size_t MaxCompiledSizeBytes = 0; std::optional<size_t> PatternAccessTimesBeforeTryToCompile; bool operator==(const Config & rhs) { - return std::tie(MaxSizeBytes, PatternAccessTimesBeforeTryToCompile) == - std::tie(rhs.MaxSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile); + return std::tie(MaxSizeBytes, MaxCompiledSizeBytes, PatternAccessTimesBeforeTryToCompile) == + std::tie(rhs.MaxSizeBytes, rhs.MaxCompiledSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile); } bool operator!=(const Config & rhs) { @@ -115,28 +117,23 @@ public: TComputationPatternLRUCache(Config configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>()); - ~TComputationPatternLRUCache() { - CleanCache(); - } + ~TComputationPatternLRUCache(); static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) { return std::make_shared<TPatternCacheEntry>(useAlloc); } - std::shared_ptr<TPatternCacheEntry> Find(const TString& serialized); + std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram); - TTicket FindOrSubscribe(const TString& serialized); + TTicket FindOrSubscribe(const TString& serializedProgram); - void NotifyMissing(const TString& serialized); + void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv); - void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv); + void NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv); - void CleanCache(); + size_t GetSize() const; - size_t GetSize() const { - std::lock_guard lock(Mutex); - return Cache.Size(); - } + void CleanCache(); Config GetConfiguration() const { std::lock_guard lock(Mutex); @@ -167,16 +164,20 @@ public: } private: - void RemoveOldest(); - void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry); + void NotifyMissing(const TString& serialized); + static constexpr size_t CacheMaxElementsSize = 10000; + friend class TTicket; + mutable std::mutex Mutex; THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify; - TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache; - size_t CurrentSizeBytes = 0; + + class TLRUPatternCacheImpl; + std::unique_ptr<TLRUPatternCacheImpl> Cache; + THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile; const Config Configuration; @@ -186,8 +187,11 @@ private: NMonitoring::TDynamicCounters::TCounterPtr Misses; NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern; NMonitoring::TDynamicCounters::TCounterPtr SizeItems; + NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledItems; NMonitoring::TDynamicCounters::TCounterPtr SizeBytes; + NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledBytes; NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter; + NMonitoring::TDynamicCounters::TCounterPtr MaxCompiledSizeBytesCounter; }; } // namespace NKikimr::NMiniKQL |