diff options
author | ilezhankin <ilezhankin@yandex-team.com> | 2025-03-12 20:51:12 +0300 |
---|---|---|
committer | ilezhankin <ilezhankin@yandex-team.com> | 2025-03-12 21:13:00 +0300 |
commit | 4e561baf685308dfb3a319787d8b0aad0ba66bbc (patch) | |
tree | 0b4193e6dc314a206ae34fb3b5a6e177f425b96e | |
parent | 0225cf80af14a18b60c5f52e353d238c671a5fab (diff) | |
download | ydb-4e561baf685308dfb3a319787d8b0aad0ba66bbc.tar.gz |
Handle potential race in computation pattern cache
commit_hash:57e4f4066d30b24b967f1c4cb57bfafbd7948d34
3 files changed, 69 insertions, 108 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node.h b/yql/essentials/minikql/computation/mkql_computation_node.h index 85462824fd..d300b7168f 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node.h +++ b/yql/essentials/minikql/computation/mkql_computation_node.h @@ -20,7 +20,6 @@ #include <library/cpp/time_provider/time_provider.h> #include <map> -#include <unordered_set> #include <unordered_map> #include <vector> diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp index 57164430cc..49d3476631 100644 --- a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp @@ -33,43 +33,43 @@ public: return CurrentPatternsCompiledCodeSizeInBytes; } - std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) { + std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram) { auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram); if (it == SerializedProgramToPatternCacheHolder.end()) { - return nullptr; + return {}; } PromoteEntry(&it->second); - return &it->second.Entry; + return it->second.Entry; } - void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) { + void Insert(const TString& serializedProgram, TPatternCacheEntryPtr entry) { auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct, std::forward_as_tuple(serializedProgram), std::forward_as_tuple(serializedProgram, entry)); if (!inserted) { RemoveEntryFromLists(&it->second); + } else { + it->second.Entry->UpdateSizeForCache(); } - entry->UpdateSizeForCache(); - /// New item is inserted, insert it in the back of both LRU lists and recalculate sizes - CurrentPatternsSizeBytes += entry->SizeForCache; + CurrentPatternsSizeBytes += it->second.Entry->SizeForCache; LRUPatternList.PushBack(&it->second); - if (entry->Pattern->IsCompiled()) { + if (it->second.Entry->Pattern->IsCompiled()) { ++CurrentCompiledPatternsSize; - CurrentPatternsCompiledCodeSizeInBytes += entry->Pattern->CompiledCodeSize(); + CurrentPatternsCompiledCodeSizeInBytes += it->second.Entry->Pattern->CompiledCodeSize(); LRUCompiledPatternList.PushBack(&it->second); } - entry->IsInCache.store(true); + it->second.Entry->IsInCache.store(true); ClearIfNeeded(); } - void NotifyPatternCompiled(const TString & serializedProgram) { + void NotifyPatternCompiled(const TString& serializedProgram) { auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram); if (it == SerializedProgramToPatternCacheHolder.end()) { return; @@ -77,9 +77,10 @@ public: const auto& entry = it->second.Entry; - // TODO(ilezhankin): wait until migration of yql to arcadia is complete and merge the proper fix from here: - // https://github.com/ydb-platform/ydb/pull/11129 if (!entry->Pattern->IsCompiled()) { + // This is possible if the old entry got removed from cache while being compiled - and the new entry got in. + // TODO: add metrics for this inefficient cache usage. + // TODO: make this scenario more consistent - don't waste compilation result. return; } @@ -130,8 +131,8 @@ private: return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty(); } - TString SerializedProgram; - std::shared_ptr<TPatternCacheEntry> Entry; + const TString SerializedProgram; + TPatternCacheEntryPtr Entry; }; void PromoteEntry(TPatternCacheHolder* holder) { @@ -232,52 +233,51 @@ TComputationPatternLRUCache::~TComputationPatternLRUCache() { CleanCache(); } -std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) { +TPatternCacheEntryPtr TComputationPatternLRUCache::Find(const TString& serializedProgram) { std::lock_guard<std::mutex> lock(Mutex); if (auto it = Cache->Find(serializedProgram)) { ++*Hits; - if ((*it)->Pattern->IsCompiled()) + if (it->Pattern->IsCompiled()) ++*HitsCompiled; - return *it; + return it; } ++*Misses; return {}; } -TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) { +TPatternCacheEntryFuture TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) { std::lock_guard lock(Mutex); if (auto it = Cache->Find(serializedProgram)) { ++*Hits; - AccessPattern(serializedProgram, *it); - return TTicket(serializedProgram, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr); + AccessPattern(serializedProgram, it); + return NThreading::MakeFuture<TPatternCacheEntryPtr>(it); } - auto [notifyIt, isNew] = Notify.emplace(serializedProgram, Nothing()); + auto [notifyIt, isNew] = Notify.emplace(std::piecewise_construct, std::forward_as_tuple(serializedProgram), std::forward_as_tuple()); if (isNew) { ++*Misses; - return TTicket(serializedProgram, true, {}, this); + // First future is empty - so the subscriber can initiate the entry creation. + return {}; } ++*Waits; - auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>(); + auto promise = NThreading::NewPromise<TPatternCacheEntryPtr>(); auto& subscribers = notifyIt->second; - if (!subscribers) { - subscribers.ConstructInPlace(); - } + subscribers.push_back(promise); - subscribers->push_back(promise); - return TTicket(serializedProgram, false, promise, nullptr); + // Second and next futures are not empty - so subscribers can wait while first one creates the entry. + return promise; } -void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) { +void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, TPatternCacheEntryPtr patternWithEnv) { Y_DEBUG_ABORT_UNLESS(patternWithEnv && patternWithEnv->Pattern); - TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; + TVector<NThreading::TPromise<TPatternCacheEntryPtr>> subscribers; { - std::lock_guard<std::mutex> lock(Mutex); + std::lock_guard lock(Mutex); Cache->Insert(serializedProgram, patternWithEnv); auto notifyIt = Notify.find(serializedProgram); @@ -292,10 +292,8 @@ void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgra *SizeCompiledBytes = Cache->PatternsCompiledCodeSizeInBytes(); } - if (subscribers) { - for (auto& subscriber : *subscribers) { - subscriber.SetValue(patternWithEnv); - } + for (auto& subscriber : subscribers) { + subscriber.SetValue(patternWithEnv); } } @@ -304,6 +302,24 @@ void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serialize Cache->NotifyPatternCompiled(serializedProgram); } +void TComputationPatternLRUCache::NotifyPatternMissing(const TString& serializedProgram) { + TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>> subscribers; + { + std::lock_guard lock(Mutex); + + auto notifyIt = Notify.find(serializedProgram); + if (notifyIt != Notify.end()) { + subscribers.swap(notifyIt->second); + Notify.erase(notifyIt); + } + } + + for (auto& subscriber : subscribers) { + // It's part of API - to set nullptr as broken promise. + subscriber.SetValue(nullptr); + } +} + size_t TComputationPatternLRUCache::GetSize() const { std::lock_guard lock(Mutex); return Cache->PatternsSize(); @@ -318,7 +334,7 @@ void TComputationPatternLRUCache::CleanCache() { Cache->Clear(); } -void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) { +void TComputationPatternLRUCache::AccessPattern(const TString& serializedProgram, TPatternCacheEntryPtr entry) { if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) { return; } @@ -330,22 +346,4 @@ void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgra } } -void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) { - TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; - { - std::lock_guard<std::mutex> lock(Mutex); - auto notifyIt = Notify.find(serialized); - if (notifyIt != Notify.end()) { - subscribers.swap(notifyIt->second); - Notify.erase(notifyIt); - } - } - - if (subscribers) { - for (auto& subscriber : *subscribers) { - subscriber.SetValue(nullptr); - } - } -} - } // namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h index 3d54d14714..1eac735d1b 100644 --- a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h +++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h @@ -8,6 +8,8 @@ #include <memory> #include <mutex> +#define NEW_PATTERN_CACHE_IN_MKQL + namespace NKikimr::NMiniKQL { struct TPatternCacheEntry { @@ -54,43 +56,11 @@ struct TPatternCacheEntry { } }; +using TPatternCacheEntryPtr = std::shared_ptr<TPatternCacheEntry>; +using TPatternCacheEntryFuture = NThreading::TFuture<TPatternCacheEntryPtr>; + class TComputationPatternLRUCache { public: - class TTicket : private TNonCopyable { - public: - TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache) - : Serialized(serialized) - , IsOwned(isOwned) - , Future(future) - , Cache(cache) - {} - - ~TTicket() { - if (Cache) { - Cache->NotifyMissing(Serialized); - } - } - - bool HasFuture() const { - return !IsOwned; - } - - std::shared_ptr<TPatternCacheEntry> GetValueSync() const { - Y_ABORT_UNLESS(HasFuture()); - return Future.GetValueSync(); - } - - void Close() { - Cache = nullptr; - } - - private: - const TString Serialized; - const bool IsOwned; - const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future; - TComputationPatternLRUCache* Cache; - }; - struct Config { Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes) : MaxSizeBytes(maxSizeBytes) @@ -118,20 +88,19 @@ public: }; TComputationPatternLRUCache(const Config& configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>()); - ~TComputationPatternLRUCache(); - static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) { + static TPatternCacheEntryPtr CreateCacheEntry(bool useAlloc = true) { return std::make_shared<TPatternCacheEntry>(useAlloc); } - std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram); - - TTicket FindOrSubscribe(const TString& serializedProgram); + TPatternCacheEntryPtr Find(const TString& serializedProgram); + TPatternCacheEntryFuture FindOrSubscribe(const TString& serializedProgram); - void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv); + void EmplacePattern(const TString& serializedProgram, TPatternCacheEntryPtr patternWithEnv); void NotifyPatternCompiled(const TString& serializedProgram); + void NotifyPatternMissing(const TString& serializedProgram); size_t GetSize() const; @@ -160,27 +129,22 @@ public: return PatternsToCompile.size(); } - void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) { + void GetPatternsToCompile(THashMap<TString, TPatternCacheEntryPtr> & result) { std::lock_guard lock(Mutex); result.swap(PatternsToCompile); } private: - void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry); - - void NotifyMissing(const TString& serialized); + class TLRUPatternCacheImpl; static constexpr size_t CacheMaxElementsSize = 10000; - friend class TTicket; + void AccessPattern(const TString& serializedProgram, TPatternCacheEntryPtr entry); mutable std::mutex Mutex; - THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify; - - class TLRUPatternCacheImpl; - std::unique_ptr<TLRUPatternCacheImpl> Cache; - - THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile; + THashMap<TString, TVector<NThreading::TPromise<TPatternCacheEntryPtr>>> Notify; // protected by Mutex + std::unique_ptr<TLRUPatternCacheImpl> Cache; // protected by Mutex + THashMap<TString, TPatternCacheEntryPtr> PatternsToCompile; // protected by Mutex const Config Configuration; |