diff options
author | vvvv <vvvv@ydb.tech> | 2023-02-13 14:18:20 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-02-13 14:18:20 +0300 |
commit | 3e456a1cf49ee640659b22a6799a221b0ebd01ec (patch) | |
tree | d93803ce66a8070d7d256837ab31e56e96524258 | |
parent | b4c5e2b57915c19c000c30a492e13ec0ce87fc49 (diff) | |
download | ydb-3e456a1cf49ee640659b22a6799a221b0ebd01ec.tar.gz |
blocking cache for computation patters, activating cache in local dq gateway
8 files changed, 135 insertions, 23 deletions
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 87b63d46e6..2538d08a05 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -201,7 +201,7 @@ void Init( NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Counters = workerManagerCounters; lwmOptions.DqTaskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr; - lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false); + lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, nullptr, false); lwmOptions.AsyncIoFactory = asyncIoFactory; lwmOptions.FunctionRegistry = appData->FunctionRegistry; lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory(); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index bfe4a3f8e8..802e67a8c9 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -417,16 +417,21 @@ public: std::shared_ptr<TPatternCacheEntry> entry; if (UseSeparatePatternAlloc() && Context.PatternCache) { auto& cache = Context.PatternCache; - entry = cache->Find(program.GetRaw()); - if (!entry) { + auto ticket = cache->FindOrSubscribe(program.GetRaw()); + if (!ticket.HasFuture()) { entry = CreateComputationPattern(task, program.GetRaw()); if (entry->Pattern->GetSuitableForCache()) { cache->EmplacePattern(task.GetProgram().GetRaw(), entry); + ticket.Close(); } else { cache->IncNotSuitablePattern(); } + } else { + entry = ticket.GetValueSync(); } - } else { + } + + if (!entry) { entry = CreateComputationPattern(task, program.GetRaw()); } diff --git a/ydb/library/yql/minikql/computation/CMakeLists.darwin.txt b/ydb/library/yql/minikql/computation/CMakeLists.darwin.txt index 8ace42eef1..029c23aed2 100644 --- a/ydb/library/yql/minikql/computation/CMakeLists.darwin.txt +++ b/ydb/library/yql/minikql/computation/CMakeLists.darwin.txt @@ -25,6 +25,7 @@ target_link_libraries(yql-minikql-computation PUBLIC parser-pg_wrapper-interface yql-public-udf library-yql-utils + cpp-threading-future yql-minikql-codegen llvm12-lib-IR lib-ExecutionEngine-MCJIT @@ -67,6 +68,7 @@ target_link_libraries(yql-minikql-computation.global PUBLIC parser-pg_wrapper-interface yql-public-udf library-yql-utils + cpp-threading-future yql-minikql-codegen llvm12-lib-IR lib-ExecutionEngine-MCJIT diff --git a/ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt index 4173cc4ef2..a32525a2f3 100644 --- a/ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt @@ -26,6 +26,7 @@ target_link_libraries(yql-minikql-computation PUBLIC parser-pg_wrapper-interface yql-public-udf library-yql-utils + cpp-threading-future yql-minikql-codegen llvm12-lib-IR lib-ExecutionEngine-MCJIT @@ -69,6 +70,7 @@ target_link_libraries(yql-minikql-computation.global PUBLIC parser-pg_wrapper-interface yql-public-udf library-yql-utils + cpp-threading-future yql-minikql-codegen llvm12-lib-IR lib-ExecutionEngine-MCJIT diff --git a/ydb/library/yql/minikql/computation/CMakeLists.linux.txt b/ydb/library/yql/minikql/computation/CMakeLists.linux.txt index 4173cc4ef2..a32525a2f3 100644 --- a/ydb/library/yql/minikql/computation/CMakeLists.linux.txt +++ b/ydb/library/yql/minikql/computation/CMakeLists.linux.txt @@ -26,6 +26,7 @@ target_link_libraries(yql-minikql-computation PUBLIC parser-pg_wrapper-interface yql-public-udf library-yql-utils + cpp-threading-future yql-minikql-codegen llvm12-lib-IR lib-ExecutionEngine-MCJIT @@ -69,6 +70,7 @@ target_link_libraries(yql-minikql-computation.global PUBLIC parser-pg_wrapper-interface yql-public-udf library-yql-utils + cpp-threading-future yql-minikql-codegen llvm12-lib-IR lib-ExecutionEngine-MCJIT diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h index fdf3429fd8..1cb848bb93 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h @@ -3,6 +3,7 @@ #include "mkql_computation_node.h" #include <ydb/library/yql/minikql/mkql_node.h> +#include <library/cpp/threading/future/future.h> #include <memory> @@ -52,11 +53,13 @@ struct TPatternCacheEntry { class TComputationPatternLRUCache { mutable std::mutex Mutex; + THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify; TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache; size_t CurrentSizeBytes = 0; const size_t MaxSizeBytes = 0; public: NMonitoring::TDynamicCounters::TCounterPtr Hits; + NMonitoring::TDynamicCounters::TCounterPtr Waits; NMonitoring::TDynamicCounters::TCounterPtr Misses; NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern; NMonitoring::TDynamicCounters::TCounterPtr SizeItems; @@ -64,10 +67,46 @@ public: NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter; public: + class TTicket : private TNonCopyable { + public: + TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache) + : Serialized(serialized) + , IsOwned(isOwned) + , Future(future) + , Cache(cache) + {} + + ~TTicket() { + if (Cache) { + Cache->NotifyMissing(Serialized); + } + } + + bool HasFuture() const { + return !IsOwned; + } + + std::shared_ptr<TPatternCacheEntry> GetValueSync() const { + Y_VERIFY(HasFuture()); + return Future.GetValueSync(); + } + + void Close() { + Cache = nullptr; + } + + private: + const TString Serialized; + const bool IsOwned; + const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future; + TComputationPatternLRUCache* Cache; + }; + TComputationPatternLRUCache(size_t sizeBytes, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>()) : Cache(10000) , MaxSizeBytes(sizeBytes) , Hits(counters->GetCounter("PatternCache/Hits", true)) + , Waits(counters->GetCounter("PatternCache/Waits", true)) , Misses(counters->GetCounter("PatternCache/Misses", true)) , NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true)) , SizeItems(counters->GetCounter("PatternCache/SizeItems", false)) @@ -92,6 +131,30 @@ public: } } + TTicket FindOrSubscribe(const TString& serialized) { + auto guard = std::scoped_lock<std::mutex>(Mutex); + if (auto it = Cache.Find(serialized); it != Cache.End()) { + ++*Hits; + return TTicket(serialized, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr); + } + + auto [notifyIt, isNew] = Notify.emplace(serialized, Nothing()); + if (isNew) { + ++*Misses; + return TTicket(serialized, true, {}, this); + } + + ++*Waits; + auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>(); + auto& subscribers = Notify[serialized]; + if (!subscribers) { + subscribers.ConstructInPlace(); + } + + subscribers->push_back(promise); + return TTicket(serialized, false, promise, nullptr); + } + void RemoveOldest() { auto oldest = Cache.FindOldest(); Y_VERIFY_DEBUG(oldest != Cache.End()); @@ -99,25 +162,57 @@ public: Cache.Erase(oldest); } + void NotifyMissing(const TString& serialized) { + TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; + { + auto guard = std::scoped_lock<std::mutex>(Mutex); + auto notifyIt = Notify.find(serialized); + if (notifyIt != Notify.end()) { + subscribers.swap(notifyIt->second); + Notify.erase(notifyIt); + } + } + + if (subscribers) { + for (auto& s : *subscribers) { + s.SetValue(nullptr); + } + } + } + void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) { Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern); - auto guard = std::scoped_lock<std::mutex>(Mutex); - // normally remove only one old cache entry by iteration to prevent bursts - if (CurrentSizeBytes > MaxSizeBytes) { - RemoveOldest(); - } - // to prevent huge memory overusage remove as much as needed - while (CurrentSizeBytes > 2 * MaxSizeBytes) { - RemoveOldest(); + TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; + { + auto guard = std::scoped_lock<std::mutex>(Mutex); + // normally remove only one old cache entry by iteration to prevent bursts + if (CurrentSizeBytes > MaxSizeBytes) { + RemoveOldest(); + } + // to prevent huge memory overusage remove as much as needed + while (CurrentSizeBytes > 2 * MaxSizeBytes) { + RemoveOldest(); + } + + patternWithEnv->UpdateSizeForCache(); + CurrentSizeBytes += patternWithEnv->SizeForCache; + + Cache.Insert(serialized, patternWithEnv); + auto notifyIt = Notify.find(serialized); + if (notifyIt != Notify.end()) { + subscribers.swap(notifyIt->second); + Notify.erase(notifyIt); + } + + *SizeItems = Cache.Size(); + *SizeBytes = CurrentSizeBytes; } - patternWithEnv->UpdateSizeForCache(); - CurrentSizeBytes += patternWithEnv->SizeForCache; - - Cache.Insert(serialized, std::move(patternWithEnv)); - - *SizeItems = Cache.Size(); - *SizeBytes = CurrentSizeBytes; + if (subscribers) { + for (auto& s : *subscribers) { + s.SetValue(patternWithEnv); + } + } } void CleanCache() { diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp index be139375b3..bcda5b2119 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp @@ -206,7 +206,10 @@ private: class TAbstractFactory: public IProxyFactory { public: - TAbstractFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory) + TAbstractFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + NKikimr::NMiniKQL::TComputationNodeFactory compFactory, + TTaskTransformFactory taskTransformFactory, + std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache) : DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE")) , RandomProvider( DeterministicMode @@ -223,6 +226,7 @@ public: ExecutionContext.ComputationFactory = compFactory; ExecutionContext.RandomProvider = RandomProvider.Get(); ExecutionContext.TimeProvider = TimeProvider.Get(); + ExecutionContext.PatternCache = patternCache; } protected: @@ -242,8 +246,9 @@ public: TLocalFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, + std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache, bool terminateOnError) - : TAbstractFactory(functionRegistry, compFactory, taskTransformFactory) + : TAbstractFactory(functionRegistry, compFactory, taskTransformFactory, patternCache) , TerminateOnError(terminateOnError) { } @@ -284,9 +289,9 @@ private: IProxyFactory::TPtr CreateFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, - TTaskTransformFactory taskTransformFactory, bool terminateOnError) + TTaskTransformFactory taskTransformFactory, std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache, bool terminateOnError) { - return new TLocalFactory(functionRegistry, compFactory, taskTransformFactory, terminateOnError); + return new TLocalFactory(functionRegistry, compFactory, taskTransformFactory, patternCache, terminateOnError); } } // namespace NYql::NTaskRunnerProxy diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h index 23fbb54b32..9640edce2f 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h @@ -8,6 +8,7 @@ namespace NYql::NTaskRunnerProxy { IProxyFactory::TPtr CreateFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, + std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache, bool terminateOnError ); |