summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h
diff options
context:
space:
mode:
authorilezhankin <[email protected]>2025-03-12 20:51:12 +0300
committerilezhankin <[email protected]>2025-03-12 21:13:00 +0300
commit4e561baf685308dfb3a319787d8b0aad0ba66bbc (patch)
tree0b4193e6dc314a206ae34fb3b5a6e177f425b96e /yql/essentials/minikql/computation/mkql_computation_pattern_cache.h
parent0225cf80af14a18b60c5f52e353d238c671a5fab (diff)
Handle potential race in computation pattern cache
commit_hash:57e4f4066d30b24b967f1c4cb57bfafbd7948d34
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_pattern_cache.h')
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_pattern_cache.h68
1 files changed, 16 insertions, 52 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h
index 3d54d14714b..1eac735d1bf 100644
--- a/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h
+++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache.h
@@ -8,6 +8,8 @@
#include <memory>
#include <mutex>
+#define NEW_PATTERN_CACHE_IN_MKQL
+
namespace NKikimr::NMiniKQL {
struct TPatternCacheEntry {
@@ -54,43 +56,11 @@ struct TPatternCacheEntry {
}
};
+using TPatternCacheEntryPtr = std::shared_ptr<TPatternCacheEntry>;
+using TPatternCacheEntryFuture = NThreading::TFuture<TPatternCacheEntryPtr>;
+
class TComputationPatternLRUCache {
public:
- class TTicket : private TNonCopyable {
- public:
- TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
- : Serialized(serialized)
- , IsOwned(isOwned)
- , Future(future)
- , Cache(cache)
- {}
-
- ~TTicket() {
- if (Cache) {
- Cache->NotifyMissing(Serialized);
- }
- }
-
- bool HasFuture() const {
- return !IsOwned;
- }
-
- std::shared_ptr<TPatternCacheEntry> GetValueSync() const {
- Y_ABORT_UNLESS(HasFuture());
- return Future.GetValueSync();
- }
-
- void Close() {
- Cache = nullptr;
- }
-
- private:
- const TString Serialized;
- const bool IsOwned;
- const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future;
- TComputationPatternLRUCache* Cache;
- };
-
struct Config {
Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes)
: MaxSizeBytes(maxSizeBytes)
@@ -118,20 +88,19 @@ public:
};
TComputationPatternLRUCache(const Config& configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>());
-
~TComputationPatternLRUCache();
- static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) {
+ static TPatternCacheEntryPtr CreateCacheEntry(bool useAlloc = true) {
return std::make_shared<TPatternCacheEntry>(useAlloc);
}
- std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram);
-
- TTicket FindOrSubscribe(const TString& serializedProgram);
+ TPatternCacheEntryPtr Find(const TString& serializedProgram);
+ TPatternCacheEntryFuture FindOrSubscribe(const TString& serializedProgram);
- void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
+ void EmplacePattern(const TString& serializedProgram, TPatternCacheEntryPtr patternWithEnv);
void NotifyPatternCompiled(const TString& serializedProgram);
+ void NotifyPatternMissing(const TString& serializedProgram);
size_t GetSize() const;
@@ -160,27 +129,22 @@ public:
return PatternsToCompile.size();
}
- void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) {
+ void GetPatternsToCompile(THashMap<TString, TPatternCacheEntryPtr> & result) {
std::lock_guard lock(Mutex);
result.swap(PatternsToCompile);
}
private:
- void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
-
- void NotifyMissing(const TString& serialized);
+ class TLRUPatternCacheImpl;
static constexpr size_t CacheMaxElementsSize = 10000;
- friend class TTicket;
+ void AccessPattern(const TString& serializedProgram, TPatternCacheEntryPtr entry);
mutable std::mutex Mutex;
- THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
-
- class TLRUPatternCacheImpl;
- std::unique_ptr<TLRUPatternCacheImpl> Cache;
-
- THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
+ THashMap<TString, TVector<NThreading::TPromise<TPatternCacheEntryPtr>>> Notify; // protected by Mutex
+ std::unique_ptr<TLRUPatternCacheImpl> Cache; // protected by Mutex
+ THashMap<TString, TPatternCacheEntryPtr> PatternsToCompile; // protected by Mutex
const Config Configuration;