aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilezhankin <ilezhankin@yandex-team.com>2025-03-12 20:51:12 +0300
committerilezhankin <ilezhankin@yandex-team.com>2025-03-12 21:13:00 +0300
commit4e561baf685308dfb3a319787d8b0aad0ba66bbc (patch)
tree0b4193e6dc314a206ae34fb3b5a6e177f425b96e
parent0225cf80af14a18b60c5f52e353d238c671a5fab (diff)
downloadydb-4e561baf685308dfb3a319787d8b0aad0ba66bbc.tar.gz
Handle potential race in computation pattern cache
commit_hash:57e4f4066d30b24b967f1c4cb57bfafbd7948d34
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node.h1
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp108
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_pattern_cache.h68
3 files changed, 69 insertions, 108 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node.h b/yql/essentials/minikql/computation/mkql_computation_node.h
index 85462824fd..d300b7168f 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node.h
+++ b/yql/essentials/minikql/computation/mkql_computation_node.h
@@ -20,7 +20,6 @@
#include <library/cpp/time_provider/time_provider.h>
#include <map>
-#include <unordered_set>
#include <unordered_map>
#include <vector>
diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp
index 57164430cc..49d3476631 100644
--- a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.cpp
@@ -33,43 +33,43 @@ public:
return CurrentPatternsCompiledCodeSizeInBytes;
}
- std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) {
+ std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram) {
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
if (it == SerializedProgramToPatternCacheHolder.end()) {
- return nullptr;
+ return {};
}
PromoteEntry(&it->second);
- return &it->second.Entry;
+ return it->second.Entry;
}
- void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
+ void Insert(const TString& serializedProgram, TPatternCacheEntryPtr entry) {
auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct,
std::forward_as_tuple(serializedProgram),
std::forward_as_tuple(serializedProgram, entry));
if (!inserted) {
RemoveEntryFromLists(&it->second);
+ } else {
+ it->second.Entry->UpdateSizeForCache();
}
- entry->UpdateSizeForCache();
-
/// New item is inserted, insert it in the back of both LRU lists and recalculate sizes
- CurrentPatternsSizeBytes += entry->SizeForCache;
+ CurrentPatternsSizeBytes += it->second.Entry->SizeForCache;
LRUPatternList.PushBack(&it->second);
- if (entry->Pattern->IsCompiled()) {
+ if (it->second.Entry->Pattern->IsCompiled()) {
++CurrentCompiledPatternsSize;
- CurrentPatternsCompiledCodeSizeInBytes += entry->Pattern->CompiledCodeSize();
+ CurrentPatternsCompiledCodeSizeInBytes += it->second.Entry->Pattern->CompiledCodeSize();
LRUCompiledPatternList.PushBack(&it->second);
}
- entry->IsInCache.store(true);
+ it->second.Entry->IsInCache.store(true);
ClearIfNeeded();
}
- void NotifyPatternCompiled(const TString & serializedProgram) {
+ void NotifyPatternCompiled(const TString& serializedProgram) {
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
if (it == SerializedProgramToPatternCacheHolder.end()) {
return;
@@ -77,9 +77,10 @@ public:
const auto& entry = it->second.Entry;
- // TODO(ilezhankin): wait until migration of yql to arcadia is complete and merge the proper fix from here:
- // https://github.com/ydb-platform/ydb/pull/11129
if (!entry->Pattern->IsCompiled()) {
+ // This is possible if the old entry got removed from cache while being compiled - and the new entry got in.
+ // TODO: add metrics for this inefficient cache usage.
+ // TODO: make this scenario more consistent - don't waste compilation result.
return;
}
@@ -130,8 +131,8 @@ private:
return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty();
}
- TString SerializedProgram;
- std::shared_ptr<TPatternCacheEntry> Entry;
+ const TString SerializedProgram;
+ TPatternCacheEntryPtr Entry;
};
void PromoteEntry(TPatternCacheHolder* holder) {
@@ -232,52 +233,51 @@ TComputationPatternLRUCache::~TComputationPatternLRUCache() {
CleanCache();
}
-std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) {
+TPatternCacheEntryPtr TComputationPatternLRUCache::Find(const TString& serializedProgram) {
std::lock_guard<std::mutex> lock(Mutex);
if (auto it = Cache->Find(serializedProgram)) {
++*Hits;
- if ((*it)->Pattern->IsCompiled())
+ if (it->Pattern->IsCompiled())
++*HitsCompiled;
- return *it;
+ return it;
}
++*Misses;
return {};
}
-TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
+TPatternCacheEntryFuture TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
std::lock_guard lock(Mutex);
if (auto it = Cache->Find(serializedProgram)) {
++*Hits;
- AccessPattern(serializedProgram, *it);
- return TTicket(serializedProgram, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr);
+ AccessPattern(serializedProgram, it);
+ return NThreading::MakeFuture<TPatternCacheEntryPtr>(it);
}
- auto [notifyIt, isNew] = Notify.emplace(serializedProgram, Nothing());
+ auto [notifyIt, isNew] = Notify.emplace(std::piecewise_construct, std::forward_as_tuple(serializedProgram), std::forward_as_tuple());
if (isNew) {
++*Misses;
- return TTicket(serializedProgram, true, {}, this);
+ // First future is empty - so the subscriber can initiate the entry creation.
+ return {};
}
++*Waits;
- auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>();
+ auto promise = NThreading::NewPromise<TPatternCacheEntryPtr>();
auto& subscribers = notifyIt->second;
- if (!subscribers) {
- subscribers.ConstructInPlace();
- }
+ subscribers.push_back(promise);
- subscribers->push_back(promise);
- return TTicket(serializedProgram, false, promise, nullptr);
+ // Second and next futures are not empty - so subscribers can wait while first one creates the entry.
+ return promise;
}
-void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
+void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, TPatternCacheEntryPtr patternWithEnv) {
Y_DEBUG_ABORT_UNLESS(patternWithEnv && patternWithEnv->Pattern);
- TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
+ TVector<NThreading::TPromise<TPatternCacheEntryPtr>> subscribers;
{
- std::lock_guard<std::mutex> lock(Mutex);
+ std::lock_guard lock(Mutex);
Cache->Insert(serializedProgram, patternWithEnv);
auto notifyIt = Notify.find(serializedProgram);
@@ -292,10 +292,8 @@ void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgra
*SizeCompiledBytes = Cache->PatternsCompiledCodeSizeInBytes();
}
- if (subscribers) {
- for (auto& subscriber : *subscribers) {
- subscriber.SetValue(patternWithEnv);
- }
+ for (auto& subscriber : subscribers) {
+ subscriber.SetValue(patternWithEnv);
}
}
@@ -304,6 +302,24 @@ void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serialize
Cache->NotifyPatternCompiled(serializedProgram);
}
+void TComputationPatternLRUCache::NotifyPatternMissing(const TString& serializedProgram) {
+ TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>> subscribers;
+ {
+ std::lock_guard lock(Mutex);
+
+ auto notifyIt = Notify.find(serializedProgram);
+ if (notifyIt != Notify.end()) {
+ subscribers.swap(notifyIt->second);
+ Notify.erase(notifyIt);
+ }
+ }
+
+ for (auto& subscriber : subscribers) {
+ // It's part of API - to set nullptr as broken promise.
+ subscriber.SetValue(nullptr);
+ }
+}
+
size_t TComputationPatternLRUCache::GetSize() const {
std::lock_guard lock(Mutex);
return Cache->PatternsSize();
@@ -318,7 +334,7 @@ void TComputationPatternLRUCache::CleanCache() {
Cache->Clear();
}
-void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
+void TComputationPatternLRUCache::AccessPattern(const TString& serializedProgram, TPatternCacheEntryPtr entry) {
if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) {
return;
}
@@ -330,22 +346,4 @@ void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgra
}
}
-void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
- TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
- {
- std::lock_guard<std::mutex> lock(Mutex);
- auto notifyIt = Notify.find(serialized);
- if (notifyIt != Notify.end()) {
- subscribers.swap(notifyIt->second);
- Notify.erase(notifyIt);
- }
- }
-
- if (subscribers) {
- for (auto& subscriber : *subscribers) {
- subscriber.SetValue(nullptr);
- }
- }
-}
-
} // namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h
index 3d54d14714..1eac735d1b 100644
--- a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h
+++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h
@@ -8,6 +8,8 @@
#include <memory>
#include <mutex>
+#define NEW_PATTERN_CACHE_IN_MKQL
+
namespace NKikimr::NMiniKQL {
struct TPatternCacheEntry {
@@ -54,43 +56,11 @@ struct TPatternCacheEntry {
}
};
+using TPatternCacheEntryPtr = std::shared_ptr<TPatternCacheEntry>;
+using TPatternCacheEntryFuture = NThreading::TFuture<TPatternCacheEntryPtr>;
+
class TComputationPatternLRUCache {
public:
- class TTicket : private TNonCopyable {
- public:
- TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
- : Serialized(serialized)
- , IsOwned(isOwned)
- , Future(future)
- , Cache(cache)
- {}
-
- ~TTicket() {
- if (Cache) {
- Cache->NotifyMissing(Serialized);
- }
- }
-
- bool HasFuture() const {
- return !IsOwned;
- }
-
- std::shared_ptr<TPatternCacheEntry> GetValueSync() const {
- Y_ABORT_UNLESS(HasFuture());
- return Future.GetValueSync();
- }
-
- void Close() {
- Cache = nullptr;
- }
-
- private:
- const TString Serialized;
- const bool IsOwned;
- const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future;
- TComputationPatternLRUCache* Cache;
- };
-
struct Config {
Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes)
: MaxSizeBytes(maxSizeBytes)
@@ -118,20 +88,19 @@ public:
};
TComputationPatternLRUCache(const Config& configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>());
-
~TComputationPatternLRUCache();
- static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) {
+ static TPatternCacheEntryPtr CreateCacheEntry(bool useAlloc = true) {
return std::make_shared<TPatternCacheEntry>(useAlloc);
}
- std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram);
-
- TTicket FindOrSubscribe(const TString& serializedProgram);
+ TPatternCacheEntryPtr Find(const TString& serializedProgram);
+ TPatternCacheEntryFuture FindOrSubscribe(const TString& serializedProgram);
- void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
+ void EmplacePattern(const TString& serializedProgram, TPatternCacheEntryPtr patternWithEnv);
void NotifyPatternCompiled(const TString& serializedProgram);
+ void NotifyPatternMissing(const TString& serializedProgram);
size_t GetSize() const;
@@ -160,27 +129,22 @@ public:
return PatternsToCompile.size();
}
- void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) {
+ void GetPatternsToCompile(THashMap<TString, TPatternCacheEntryPtr> & result) {
std::lock_guard lock(Mutex);
result.swap(PatternsToCompile);
}
private:
- void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
-
- void NotifyMissing(const TString& serialized);
+ class TLRUPatternCacheImpl;
static constexpr size_t CacheMaxElementsSize = 10000;
- friend class TTicket;
+ void AccessPattern(const TString& serializedProgram, TPatternCacheEntryPtr entry);
mutable std::mutex Mutex;
- THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
-
- class TLRUPatternCacheImpl;
- std::unique_ptr<TLRUPatternCacheImpl> Cache;
-
- THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
+ THashMap<TString, TVector<NThreading::TPromise<TPatternCacheEntryPtr>>> Notify; // protected by Mutex
+ std::unique_ptr<TLRUPatternCacheImpl> Cache; // protected by Mutex
+ THashMap<TString, TPatternCacheEntryPtr> PatternsToCompile; // protected by Mutex
const Config Configuration;