aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-02-13 14:18:20 +0300
committervvvv <vvvv@ydb.tech>2023-02-13 14:18:20 +0300
commit3e456a1cf49ee640659b22a6799a221b0ebd01ec (patch)
treed93803ce66a8070d7d256837ab31e56e96524258
parentb4c5e2b57915c19c000c30a492e13ec0ce87fc49 (diff)
downloadydb-3e456a1cf49ee640659b22a6799a221b0ebd01ec.tar.gz
blocking cache for computation patters, activating cache in local dq gateway
-rw-r--r--ydb/core/yq/libs/init/init.cpp2
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp11
-rw-r--r--ydb/library/yql/minikql/computation/CMakeLists.darwin.txt2
-rw-r--r--ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/minikql/computation/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h125
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp13
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h1
8 files changed, 135 insertions, 23 deletions
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 87b63d46e6..2538d08a05 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -201,7 +201,7 @@ void Init(
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Counters = workerManagerCounters;
lwmOptions.DqTaskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr;
- lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false);
+ lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, nullptr, false);
lwmOptions.AsyncIoFactory = asyncIoFactory;
lwmOptions.FunctionRegistry = appData->FunctionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory();
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index bfe4a3f8e8..802e67a8c9 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -417,16 +417,21 @@ public:
std::shared_ptr<TPatternCacheEntry> entry;
if (UseSeparatePatternAlloc() && Context.PatternCache) {
auto& cache = Context.PatternCache;
- entry = cache->Find(program.GetRaw());
- if (!entry) {
+ auto ticket = cache->FindOrSubscribe(program.GetRaw());
+ if (!ticket.HasFuture()) {
entry = CreateComputationPattern(task, program.GetRaw());
if (entry->Pattern->GetSuitableForCache()) {
cache->EmplacePattern(task.GetProgram().GetRaw(), entry);
+ ticket.Close();
} else {
cache->IncNotSuitablePattern();
}
+ } else {
+ entry = ticket.GetValueSync();
}
- } else {
+ }
+
+ if (!entry) {
entry = CreateComputationPattern(task, program.GetRaw());
}
diff --git a/ydb/library/yql/minikql/computation/CMakeLists.darwin.txt b/ydb/library/yql/minikql/computation/CMakeLists.darwin.txt
index 8ace42eef1..029c23aed2 100644
--- a/ydb/library/yql/minikql/computation/CMakeLists.darwin.txt
+++ b/ydb/library/yql/minikql/computation/CMakeLists.darwin.txt
@@ -25,6 +25,7 @@ target_link_libraries(yql-minikql-computation PUBLIC
parser-pg_wrapper-interface
yql-public-udf
library-yql-utils
+ cpp-threading-future
yql-minikql-codegen
llvm12-lib-IR
lib-ExecutionEngine-MCJIT
@@ -67,6 +68,7 @@ target_link_libraries(yql-minikql-computation.global PUBLIC
parser-pg_wrapper-interface
yql-public-udf
library-yql-utils
+ cpp-threading-future
yql-minikql-codegen
llvm12-lib-IR
lib-ExecutionEngine-MCJIT
diff --git a/ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt
index 4173cc4ef2..a32525a2f3 100644
--- a/ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/CMakeLists.linux-aarch64.txt
@@ -26,6 +26,7 @@ target_link_libraries(yql-minikql-computation PUBLIC
parser-pg_wrapper-interface
yql-public-udf
library-yql-utils
+ cpp-threading-future
yql-minikql-codegen
llvm12-lib-IR
lib-ExecutionEngine-MCJIT
@@ -69,6 +70,7 @@ target_link_libraries(yql-minikql-computation.global PUBLIC
parser-pg_wrapper-interface
yql-public-udf
library-yql-utils
+ cpp-threading-future
yql-minikql-codegen
llvm12-lib-IR
lib-ExecutionEngine-MCJIT
diff --git a/ydb/library/yql/minikql/computation/CMakeLists.linux.txt b/ydb/library/yql/minikql/computation/CMakeLists.linux.txt
index 4173cc4ef2..a32525a2f3 100644
--- a/ydb/library/yql/minikql/computation/CMakeLists.linux.txt
+++ b/ydb/library/yql/minikql/computation/CMakeLists.linux.txt
@@ -26,6 +26,7 @@ target_link_libraries(yql-minikql-computation PUBLIC
parser-pg_wrapper-interface
yql-public-udf
library-yql-utils
+ cpp-threading-future
yql-minikql-codegen
llvm12-lib-IR
lib-ExecutionEngine-MCJIT
@@ -69,6 +70,7 @@ target_link_libraries(yql-minikql-computation.global PUBLIC
parser-pg_wrapper-interface
yql-public-udf
library-yql-utils
+ cpp-threading-future
yql-minikql-codegen
llvm12-lib-IR
lib-ExecutionEngine-MCJIT
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
index fdf3429fd8..1cb848bb93 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
@@ -3,6 +3,7 @@
#include "mkql_computation_node.h"
#include <ydb/library/yql/minikql/mkql_node.h>
+#include <library/cpp/threading/future/future.h>
#include <memory>
@@ -52,11 +53,13 @@ struct TPatternCacheEntry {
class TComputationPatternLRUCache {
mutable std::mutex Mutex;
+ THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache;
size_t CurrentSizeBytes = 0;
const size_t MaxSizeBytes = 0;
public:
NMonitoring::TDynamicCounters::TCounterPtr Hits;
+ NMonitoring::TDynamicCounters::TCounterPtr Waits;
NMonitoring::TDynamicCounters::TCounterPtr Misses;
NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern;
NMonitoring::TDynamicCounters::TCounterPtr SizeItems;
@@ -64,10 +67,46 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter;
public:
+ class TTicket : private TNonCopyable {
+ public:
+ TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
+ : Serialized(serialized)
+ , IsOwned(isOwned)
+ , Future(future)
+ , Cache(cache)
+ {}
+
+ ~TTicket() {
+ if (Cache) {
+ Cache->NotifyMissing(Serialized);
+ }
+ }
+
+ bool HasFuture() const {
+ return !IsOwned;
+ }
+
+ std::shared_ptr<TPatternCacheEntry> GetValueSync() const {
+ Y_VERIFY(HasFuture());
+ return Future.GetValueSync();
+ }
+
+ void Close() {
+ Cache = nullptr;
+ }
+
+ private:
+ const TString Serialized;
+ const bool IsOwned;
+ const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future;
+ TComputationPatternLRUCache* Cache;
+ };
+
TComputationPatternLRUCache(size_t sizeBytes, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>())
: Cache(10000)
, MaxSizeBytes(sizeBytes)
, Hits(counters->GetCounter("PatternCache/Hits", true))
+ , Waits(counters->GetCounter("PatternCache/Waits", true))
, Misses(counters->GetCounter("PatternCache/Misses", true))
, NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true))
, SizeItems(counters->GetCounter("PatternCache/SizeItems", false))
@@ -92,6 +131,30 @@ public:
}
}
+ TTicket FindOrSubscribe(const TString& serialized) {
+ auto guard = std::scoped_lock<std::mutex>(Mutex);
+ if (auto it = Cache.Find(serialized); it != Cache.End()) {
+ ++*Hits;
+ return TTicket(serialized, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr);
+ }
+
+ auto [notifyIt, isNew] = Notify.emplace(serialized, Nothing());
+ if (isNew) {
+ ++*Misses;
+ return TTicket(serialized, true, {}, this);
+ }
+
+ ++*Waits;
+ auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>();
+ auto& subscribers = Notify[serialized];
+ if (!subscribers) {
+ subscribers.ConstructInPlace();
+ }
+
+ subscribers->push_back(promise);
+ return TTicket(serialized, false, promise, nullptr);
+ }
+
void RemoveOldest() {
auto oldest = Cache.FindOldest();
Y_VERIFY_DEBUG(oldest != Cache.End());
@@ -99,25 +162,57 @@ public:
Cache.Erase(oldest);
}
+ void NotifyMissing(const TString& serialized) {
+ TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
+ {
+ auto guard = std::scoped_lock<std::mutex>(Mutex);
+ auto notifyIt = Notify.find(serialized);
+ if (notifyIt != Notify.end()) {
+ subscribers.swap(notifyIt->second);
+ Notify.erase(notifyIt);
+ }
+ }
+
+ if (subscribers) {
+ for (auto& s : *subscribers) {
+ s.SetValue(nullptr);
+ }
+ }
+ }
+
void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern);
- auto guard = std::scoped_lock<std::mutex>(Mutex);
- // normally remove only one old cache entry by iteration to prevent bursts
- if (CurrentSizeBytes > MaxSizeBytes) {
- RemoveOldest();
- }
- // to prevent huge memory overusage remove as much as needed
- while (CurrentSizeBytes > 2 * MaxSizeBytes) {
- RemoveOldest();
+ TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
+ {
+ auto guard = std::scoped_lock<std::mutex>(Mutex);
+ // normally remove only one old cache entry by iteration to prevent bursts
+ if (CurrentSizeBytes > MaxSizeBytes) {
+ RemoveOldest();
+ }
+ // to prevent huge memory overusage remove as much as needed
+ while (CurrentSizeBytes > 2 * MaxSizeBytes) {
+ RemoveOldest();
+ }
+
+ patternWithEnv->UpdateSizeForCache();
+ CurrentSizeBytes += patternWithEnv->SizeForCache;
+
+ Cache.Insert(serialized, patternWithEnv);
+ auto notifyIt = Notify.find(serialized);
+ if (notifyIt != Notify.end()) {
+ subscribers.swap(notifyIt->second);
+ Notify.erase(notifyIt);
+ }
+
+ *SizeItems = Cache.Size();
+ *SizeBytes = CurrentSizeBytes;
}
- patternWithEnv->UpdateSizeForCache();
- CurrentSizeBytes += patternWithEnv->SizeForCache;
-
- Cache.Insert(serialized, std::move(patternWithEnv));
-
- *SizeItems = Cache.Size();
- *SizeBytes = CurrentSizeBytes;
+ if (subscribers) {
+ for (auto& s : *subscribers) {
+ s.SetValue(patternWithEnv);
+ }
+ }
}
void CleanCache() {
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
index be139375b3..bcda5b2119 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
@@ -206,7 +206,10 @@ private:
class TAbstractFactory: public IProxyFactory {
public:
- TAbstractFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory)
+ TAbstractFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
+ TTaskTransformFactory taskTransformFactory,
+ std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache)
: DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE"))
, RandomProvider(
DeterministicMode
@@ -223,6 +226,7 @@ public:
ExecutionContext.ComputationFactory = compFactory;
ExecutionContext.RandomProvider = RandomProvider.Get();
ExecutionContext.TimeProvider = TimeProvider.Get();
+ ExecutionContext.PatternCache = patternCache;
}
protected:
@@ -242,8 +246,9 @@ public:
TLocalFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory,
+ std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache,
bool terminateOnError)
- : TAbstractFactory(functionRegistry, compFactory, taskTransformFactory)
+ : TAbstractFactory(functionRegistry, compFactory, taskTransformFactory, patternCache)
, TerminateOnError(terminateOnError)
{ }
@@ -284,9 +289,9 @@ private:
IProxyFactory::TPtr CreateFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
- TTaskTransformFactory taskTransformFactory, bool terminateOnError)
+ TTaskTransformFactory taskTransformFactory, std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache, bool terminateOnError)
{
- return new TLocalFactory(functionRegistry, compFactory, taskTransformFactory, terminateOnError);
+ return new TLocalFactory(functionRegistry, compFactory, taskTransformFactory, patternCache, terminateOnError);
}
} // namespace NYql::NTaskRunnerProxy
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h
index 23fbb54b32..9640edce2f 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h
@@ -8,6 +8,7 @@ namespace NYql::NTaskRunnerProxy {
IProxyFactory::TPtr CreateFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory,
+ std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache,
bool terminateOnError
);