aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-09-13 19:26:11 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-09-13 19:26:11 +0300
commit2dbcb34d68c8b32abf7b9190ddaf502fae86938c (patch)
tree1f84382ee1243fe6a095ef88a7f14a729cba2064
parent26ac7247b980e2d8366a491aa2807678f3ecf52d (diff)
downloadydb-2dbcb34d68c8b32abf7b9190ddaf502fae86938c.tar.gz
Refactor cache, get rid of singleton
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp6
-rw-r--r--ydb/core/kqp/executer/kqp_literal_executer.cpp22
-rw-r--r--ydb/core/kqp/rm/kqp_rm.cpp21
-rw-r--r--ydb/core/kqp/rm/kqp_rm.h4
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp60
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h5
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.h88
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp11
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h88
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp39
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp3
11 files changed, 189 insertions, 158 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index aeb2cafe70a..dcce35616e2 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/protos/tx_datashard.pb.h>
+#include <ydb/core/kqp/rm/kqp_rm.h>
#include <ydb/core/kqp/runtime/kqp_compute.h>
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/sys_view/scan.h>
@@ -35,7 +36,7 @@ public:
TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId)
: TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
@@ -70,6 +71,7 @@ public:
execCtx.ApplyCtx = nullptr;
execCtx.Alloc = nullptr;
execCtx.TypeEnv = nullptr;
+ execCtx.PatternCache = GetKqpResourceManager()->GetComputeActorPatternCache();
TDqTaskRunnerSettings settings;
settings.CollectBasicStats = RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC;
@@ -306,7 +308,7 @@ private:
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId)
{
return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp
index 0c663161f26..32113271184 100644
--- a/ydb/core/kqp/executer/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp
@@ -184,6 +184,7 @@ private:
// task runner settings
NMiniKQL::TKqpComputeContextBase computeCtx;
TDqTaskRunnerContext context = CreateTaskRunnerContext(&computeCtx, &alloc, &typeEnv);
+ context.PatternCache = GetKqpResourceManager()->GetLiteralPatternCache();
TDqTaskRunnerSettings settings = CreateTaskRunnerSettings(Request.StatsMode);
Y_DEFER {
@@ -269,27 +270,8 @@ private:
auto taskRunner = CreateKqpTaskRunner(context, settings, log);
TaskRunners.emplace_back(taskRunner);
- std::shared_ptr<NMiniKQL::TPatternWithEnv> patternEnv;
- bool useCache = AppData()->FeatureFlags.GetEnableKqpPatternCacheLiteral();
- bool foundInCache = false;
- if (useCache) {
- auto *cache = Singleton<NMiniKQL::TComputationPatternLRUCache>();
-
- patternEnv = cache->Find(protoTask.GetProgram().GetRaw());
- if (patternEnv) {
- foundInCache = true;
- } else {
- patternEnv = cache->CreateEnv();
- }
- }
-
taskRunner->Prepare(protoTask, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext(),
- parameterProvider, patternEnv);
-
- if (useCache && !foundInCache) {
- auto *cache = Singleton<NMiniKQL::TComputationPatternLRUCache>();
- cache->EmplacePattern(protoTask.GetProgram().GetRaw(), std::move(patternEnv));
- }
+ parameterProvider);
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);
diff --git a/ydb/core/kqp/rm/kqp_rm.cpp b/ydb/core/kqp/rm/kqp_rm.cpp
index dd79ef8ea72..81f950afdd7 100644
--- a/ydb/core/kqp/rm/kqp_rm.cpp
+++ b/ydb/core/kqp/rm/kqp_rm.cpp
@@ -226,7 +226,10 @@ public:
, Counters(counters)
, ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID())
, ExecutionUnitsResource(Config.GetComputeActorsCount())
- , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) {}
+ , ScanQueryMemoryResource(Config.GetQueryMemoryLimit())
+ //, LiteralPatternCache(std::make_shared<NMiniKQL::TComputationPatternLRUCache>())
+ //, ComputeActorPatternCache(std::make_shared<NMiniKQL::TComputationPatternLRUCache>())
+ {}
void Bootstrap() {
ActorSystem = TlsActivationContext->ActorSystem();
@@ -590,6 +593,18 @@ public:
}
}
+ std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetLiteralPatternCache() override {
+ with_lock (Lock) {
+ return LiteralPatternCache;
+ }
+ }
+
+ std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetComputeActorPatternCache() override {
+ with_lock (Lock) {
+ return ComputeActorPatternCache;
+ }
+ }
+
private:
STATEFN(WorkState) {
switch (ev->GetTypeRewrite()) {
@@ -918,6 +933,10 @@ private:
std::optional<TInstant> LastPublishTime;
};
TWhiteBoardState WbState;
+
+ // pattern caches for different actors
+ std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> LiteralPatternCache;
+ std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> ComputeActorPatternCache;
};
} // namespace NRm
diff --git a/ydb/core/kqp/rm/kqp_rm.h b/ydb/core/kqp/rm/kqp_rm.h
index 568e945d803..f52eed96ac7 100644
--- a/ydb/core/kqp/rm/kqp_rm.h
+++ b/ydb/core/kqp/rm/kqp_rm.h
@@ -2,6 +2,7 @@
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h>
#include <library/cpp/actors/core/actor.h>
@@ -86,6 +87,9 @@ public:
virtual TKqpLocalNodeResources GetLocalResources() const = 0;
virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0;
+
+ virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetLiteralPatternCache() = 0;
+ virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetComputeActorPatternCache() = 0;
};
} // namespace NRm
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 1b50ca14a38..0e36cd68d43 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -10,6 +10,7 @@
#include <ydb/library/yql/dq/runtime/dq_transport.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h>
#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
@@ -252,8 +253,7 @@ public:
return TaskId;
}
- void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider,
- const std::shared_ptr<TPatternWithEnv>& compPattern)
+ void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider)
{
LOG(TStringBuilder() << "Build task: " << TaskId);
auto startTime = TInstant::Now();
@@ -264,7 +264,21 @@ public:
YQL_ENSURE(program.GetRuntimeVersion());
YQL_ENSURE(program.GetRuntimeVersion() <= NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
- ProgramParsed.ProgramNode = DeserializeRuntimeNode(program.GetRaw(), typeEnv);
+ std::shared_ptr<TPatternCacheEntry> entry;
+ if (auto& cache = Context.PatternCache) {
+ entry = cache->Find(program.GetRaw());
+ if (!entry) {
+ entry = std::make_shared<TPatternCacheEntry>();
+ }
+ }
+
+ auto& patternAlloc = entry ? entry->Alloc.Ref() : Alloc().Ref();
+ auto& patternEnv = entry ? entry->Env : typeEnv;
+
+ {
+ auto guard = patternEnv.BindAllocator();
+ ProgramParsed.ProgramNode = DeserializeRuntimeNode(program.GetRaw(), patternEnv);
+ }
YQL_ENSURE(ProgramParsed.ProgramNode.IsImmediate() && ProgramParsed.ProgramNode.GetNode()->GetType()->IsStruct());
auto& programStruct = static_cast<TStructLiteral&>(*ProgramParsed.ProgramNode.GetNode());
auto programType = programStruct.GetType();
@@ -275,9 +289,9 @@ public:
TRuntimeNode programRoot = programStruct.GetValue(*programRootIdx);
if (Context.FuncProvider) {
TExploringNodeVisitor explorer;
- explorer.Walk(programRoot.GetNode(), typeEnv);
+ explorer.Walk(programRoot.GetNode(), patternEnv);
bool wereChanges = false;
- programRoot = SinglePassVisitCallables(programRoot, explorer, Context.FuncProvider, typeEnv, true, wereChanges);
+ programRoot = SinglePassVisitCallables(programRoot, explorer, Context.FuncProvider, patternEnv, true, wereChanges);
}
ProgramParsed.OutputItemTypes.resize(task.OutputsSize());
@@ -336,7 +350,7 @@ public:
auto paramsCount = paramsStruct->GetMembersCount();
TExploringNodeVisitor programExplorer;
- programExplorer.Walk(programRoot.GetNode(), typeEnv);
+ programExplorer.Walk(programRoot.GetNode(), patternEnv);
auto programSize = programExplorer.GetNodes().size();
LOG(TStringBuilder() << "task: " << TaskId << ", program size: " << programSize
@@ -348,9 +362,6 @@ public:
auto validatePolicy = Settings.TerminateOnError ? NUdf::EValidatePolicy::Fail : NUdf::EValidatePolicy::Exception;
- auto& compPatternAlloc = compPattern ? compPattern->Alloc.Ref() : Alloc().Ref();
- auto& compPatternEnv = compPattern ? compPattern->Env : typeEnv;
-
auto taskRunnerFactory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
auto& computationFactory = Context.ComputationFactory;
if (auto res = computationFactory(callable, ctx)) {
@@ -362,19 +373,22 @@ public:
return nullptr;
};
- TComputationPatternOpts opts(compPatternAlloc, compPatternEnv, taskRunnerFactory,
+ TComputationPatternOpts opts(patternAlloc, patternEnv, taskRunnerFactory,
Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi,
ProgramParsed.StatsRegistry.Get());
- SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams);
- opts.SecureParamsProvider = SecureParamsProvider.get();
+ SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams);
+ opts.SecureParamsProvider = SecureParamsProvider.get();
- if (compPattern) {
- if (!compPattern->Pattern) {
- auto guard = compPattern->Env.BindAllocator();
- compPattern->Pattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts);
+ bool filledCacheEntry = false;
+ if (entry) {
+ if (!entry->Pattern) {
+ auto guard = entry->Env.BindAllocator();
+ entry->Pattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts);
+ filledCacheEntry = true;
}
- ProgramParsed.CompPattern = compPattern->Pattern;
+ ProgramParsed.CompPattern = entry->Pattern;
+ ProgramParsed.CompPatternEntry = entry;
// clone pattern using alloc from current scope
ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone(
opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &Alloc().Ref()));
@@ -384,6 +398,12 @@ public:
opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider));
}
+ if (filledCacheEntry) {
+ if (auto& cache = Context.PatternCache) {
+ cache->EmplacePattern(task.GetProgram().GetRaw(), entry);
+ }
+ }
+
TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
auto paramNode = ProgramParsed.CompGraph->GetEntryPoint(programInputsCount, /* require */ false);
@@ -428,11 +448,10 @@ public:
}
void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
- const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider,
- const std::shared_ptr<TPatternWithEnv>& compPattern) override
+ const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider) override
{
TaskId = task.GetId();
- BuildTask(task, parameterProvider, compPattern);
+ BuildTask(task, parameterProvider);
LOG(TStringBuilder() << "Prepare task: " << TaskId);
auto startTime = TInstant::Now();
@@ -836,6 +855,7 @@ private:
};
struct TProgramParsed {
+ std::shared_ptr<TPatternCacheEntry> CompPatternEntry;
TRuntimeNode ProgramNode;
TVector<TType*> InputItemTypes;
TVector<TType*> OutputItemTypes;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index dbeec8cf8d4..710f4e5cf5a 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -9,6 +9,7 @@
#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
#include <ydb/library/yql/dq/runtime/dq_async_input.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
#include <ydb/library/yql/minikql/mkql_function_registry.h>
#include <ydb/library/yql/minikql/mkql_node.h>
@@ -236,6 +237,7 @@ struct TDqTaskRunnerContext {
NKikimr::NMiniKQL::TCallableVisitFuncProvider FuncProvider;
NKikimr::NMiniKQL::TScopedAlloc* Alloc = nullptr;
NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr;
+ std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> PatternCache;
};
class IDqTaskRunnerExecutionContext {
@@ -293,8 +295,7 @@ public:
virtual void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(),
- const TDqTaskRunnerParameterProvider& parameterProvider = {},
- const std::shared_ptr<NKikimr::NMiniKQL::TPatternWithEnv>& compPattern = {}) = 0;
+ const TDqTaskRunnerParameterProvider& parameterProvider = {}) = 0;
virtual ERunStatus Run() = 0;
virtual bool HasEffects() const = 0;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h
index 2d8061c7163..994bfc5c3be 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h
@@ -277,7 +277,7 @@ struct TComputationNodeFactoryContext {
using TComputationNodeFactory = std::function<IComputationNode* (TCallable&, const TComputationNodeFactoryContext&)>;
using TStreamEmitter = std::function<void(NUdf::TUnboxedValue&&)>;
-struct TPatternWithEnv;
+struct TPatternCacheEntry;
struct TComputationPatternOpts {
TComputationPatternOpts(std::shared_ptr<TInjectedAlloc> cacheAlloc, std::shared_ptr<TTypeEnvironment> cacheEnv)
@@ -330,13 +330,8 @@ struct TComputationPatternOpts {
SecureParamsProvider = secureParamsProvider;
}
- void SetPatternEnv(std::shared_ptr<TPatternWithEnv> cacheEnv) {
- PatternEnv = std::move(cacheEnv);
- }
-
- mutable std::shared_ptr<TInjectedAlloc> CacheAlloc;
- mutable std::shared_ptr<TTypeEnvironment> CacheTypeEnv;
- mutable std::shared_ptr<TPatternWithEnv> PatternEnv;
+ mutable std::shared_ptr<TInjectedAlloc> CacheAlloc; // obsolete
+ mutable std::shared_ptr<TTypeEnvironment> CacheTypeEnv; // obsolete
TAllocState& AllocState;
const TTypeEnvironment& Env;
@@ -385,83 +380,6 @@ public:
virtual size_t GetCacheHits() const = 0;
};
-using TPrepareFunc = std::function<IComputationPattern::TPtr(TScopedAlloc &, TTypeEnvironment &)>;
-
-struct TPatternWithEnv {
- TScopedAlloc Alloc;
- TTypeEnvironment Env;
- IComputationPattern::TPtr Pattern;
-
- TPatternWithEnv() : Env(Alloc) {
- Alloc.Release();
- }
-
- ~TPatternWithEnv() {
- Alloc.Acquire();
- }
-};
-
-class TComputationPatternLRUCache {
- mutable std::mutex Mutex;
-
- TLRUCache<TString, std::shared_ptr<TPatternWithEnv>> Cache;
- std::atomic<size_t> Hits = 0;
- std::atomic<size_t> TotalKeysSize = 0;
- std::atomic<size_t> TotalValuesSize = 0;
-public:
- TComputationPatternLRUCache(size_t size = 100)
- : Cache(size)
- {}
-
- static std::shared_ptr<TPatternWithEnv> CreateEnv() {
- return std::make_shared<TPatternWithEnv>();
- }
-
- std::shared_ptr<TPatternWithEnv> Find(const TString& serialized) {
- auto guard = std::unique_lock<std::mutex>(Mutex);
- if (auto it = Cache.Find(serialized); it != Cache.End()) {
- ++Hits;
- return *it;
- }
- return {};
- }
-
- void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternWithEnv> patternWithEnv) {
- auto guard = std::unique_lock<std::mutex>(Mutex);
- Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern);
- TotalKeysSize += serialized.Size();
- TotalValuesSize += patternWithEnv->Alloc.GetAllocated();
-
- if (Cache.TotalSize() == Cache.GetMaxSize()) {
- auto oldest = Cache.FindOldest();
- Y_VERIFY(oldest != Cache.End());
- TotalKeysSize -= oldest.Key().Size();
- TotalValuesSize -= oldest.Value()->Alloc.GetAllocated();
- Cache.Erase(oldest);
- }
-
- Cache.Insert(serialized, std::move(patternWithEnv));
- }
-
- void CleanCache() {
- auto guard = std::unique_lock<std::mutex>(Mutex);
- Cache.Clear();
- }
-
- size_t GetSize() const {
- auto guard = std::unique_lock<std::mutex>(Mutex);
- return Cache.TotalSize();
- }
-
- size_t GetCacheHits() const {
- return Hits.load();
- }
-
- ~TComputationPatternLRUCache() {
- Mutex.lock();
- }
-};
-
std::unique_ptr<NUdf::ISecureParamsProvider> MakeSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams);
} // namespace NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
index e44832123d1..3adedf5628c 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
@@ -2,6 +2,7 @@
#include "mkql_value_builder.h"
#include "mkql_computation_node_codegen.h"
#include <ydb/library/yql/minikql/arrow/mkql_memory_pool.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h>
#include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
#include <ydb/library/yql/minikql/mkql_terminator.h>
@@ -221,7 +222,7 @@ public:
, ValidatePolicy(opts.ValidatePolicy)
, GraphPerProcess(opts.GraphPerProcess)
, PatternNodes(MakeIntrusive<TPatternNodes>(opts.AllocState))
- , ExternalAlloc(opts.CacheAlloc || opts.PatternEnv)
+ , ExternalAlloc(opts.CacheAlloc)
{
PatternNodes->HolderFactory = MakeHolder<THolderFactory>(opts.AllocState, *PatternNodes->MemInfo, &FunctionRegistry);
PatternNodes->ValueBuilder = MakeHolder<TDefaultValueBuilder>(*PatternNodes->HolderFactory, ValidatePolicy);
@@ -525,7 +526,7 @@ private:
NUdf::EValidatePolicy ValidatePolicy;
EGraphPerProcess GraphPerProcess;
TPatternNodes::TPtr PatternNodes;
- const bool ExternalAlloc;
+ const bool ExternalAlloc; // obsolete, will be removed after YQL-13977
};
class TComputationGraph : public IComputationGraph {
@@ -843,11 +844,7 @@ TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNode
IComputationPattern::TPtr MakeComputationPattern(TExploringNodeVisitor& explorer, const TRuntimeNode& root,
const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) {
- auto pattern = MakeComputationPatternImpl(explorer, root, entryPoints, opts);
- if (opts.PatternEnv) {
- pattern->SetTypeEnv(&opts.PatternEnv->Env);
- }
- return pattern;
+ return MakeComputationPatternImpl(explorer, root, entryPoints, opts);
}
class TComputationPatternCache: public IComputationPatternCache {
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
new file mode 100644
index 00000000000..10deab906b2
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
@@ -0,0 +1,88 @@
+#pragma once
+
+#include "mkql_computation_node.h"
+
+#include <ydb/library/yql/minikql/mkql_node.h>
+
+#include <memory>
+
+namespace NKikimr::NMiniKQL {
+
+struct TPatternCacheEntry {
+ TScopedAlloc Alloc;
+ TTypeEnvironment Env;
+ IComputationPattern::TPtr Pattern;
+
+ TPatternCacheEntry()
+ : Env(Alloc)
+ {
+ Alloc.Release();
+ }
+
+ ~TPatternCacheEntry() {
+ Alloc.Acquire();
+ }
+};
+
+class TComputationPatternLRUCache {
+ mutable std::mutex Mutex;
+
+ TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache;
+ std::atomic<size_t> Hits = 0;
+ std::atomic<size_t> TotalKeysSize = 0;
+ std::atomic<size_t> TotalValuesSize = 0;
+public:
+ TComputationPatternLRUCache(size_t size = 100)
+ : Cache(size)
+ {}
+
+ static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry() {
+ return std::make_shared<TPatternCacheEntry>();
+ }
+
+ std::shared_ptr<TPatternCacheEntry> Find(const TString& serialized) {
+ auto guard = std::scoped_lock<std::mutex>(Mutex);
+ if (auto it = Cache.Find(serialized); it != Cache.End()) {
+ ++Hits;
+ return *it;
+ }
+ return {};
+ }
+
+ void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
+ auto guard = std::scoped_lock<std::mutex>(Mutex);
+ Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern);
+ TotalKeysSize += serialized.Size();
+ TotalValuesSize += patternWithEnv->Alloc.GetAllocated();
+
+ if (Cache.TotalSize() == Cache.GetMaxSize()) {
+ auto oldest = Cache.FindOldest();
+ Y_VERIFY(oldest != Cache.End());
+ TotalKeysSize -= oldest.Key().Size();
+ TotalValuesSize -= oldest.Value()->Alloc.GetAllocated();
+ Cache.Erase(oldest);
+ }
+
+ Cache.Insert(serialized, std::move(patternWithEnv));
+ }
+
+ void CleanCache() {
+ auto guard = std::scoped_lock<std::mutex>(Mutex);
+ Cache.Clear();
+ }
+
+ size_t GetSize() const {
+ auto guard = std::scoped_lock<std::mutex>(Mutex);
+ return Cache.TotalSize();
+ }
+
+ size_t GetCacheHits() const {
+ return Hits.load();
+ }
+
+ ~TComputationPatternLRUCache() {
+ Mutex.lock();
+ }
+};
+
+} // namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
index fee939dae74..81d6901daff 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -1,6 +1,7 @@
#include "library/cpp/threading/local_executor/local_executor.h"
#include "ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
#include <ydb/library/yql/utils/yql_panic.h>
@@ -439,9 +440,9 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
TComputationPatternLRUCache cache(cacheSize);
auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv();
- TScopedAlloc &alloc = patternEnv->Alloc;
- TTypeEnvironment &typeEnv = patternEnv->Env;
+ auto entry = std::make_shared<TPatternCacheEntry>();
+ TScopedAlloc &alloc = entry->Alloc;
+ TTypeEnvironment &typeEnv = entry->Env;
TProgramBuilder pb(typeEnv, *functionRegistry);
@@ -456,10 +457,10 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
{
- auto guard = patternEnv->Env.BindAllocator();
- patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
+ auto guard = entry->Env.BindAllocator();
+ entry->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
}
- cache.EmplacePattern("a", patternEnv);
+ cache.EmplacePattern("a", entry);
auto genData = [&]() {
std::vector<ui64> data;
data.reserve(vecSize);
@@ -482,13 +483,13 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
auto timeProvider = CreateDeterministicTimeProvider(10000000);
TScopedAlloc graphAlloc;
- auto patternEnv = cache.Find(key);
+ auto entry = cache.Find(key);
- TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetListTestFactory(),
+ TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetListTestFactory(),
functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
- auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
+ auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
TUnboxedValue* items = nullptr;
graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
@@ -572,9 +573,9 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
for (ui32 i = 0; i < cacheSize; ++i) {
- std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv();
- TScopedAlloc& alloc = patternEnv->Alloc;
- TTypeEnvironment& typeEnv = patternEnv->Env;
+ auto entry = std::make_shared<TPatternCacheEntry>();
+ TScopedAlloc& alloc = entry->Alloc;
+ TTypeEnvironment& typeEnv = entry->Env;
TProgramBuilder pb(typeEnv, *functionRegistry);
@@ -588,10 +589,10 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
"OFF", EGraphPerProcess::Multi);
{
- auto guard = patternEnv->Env.BindAllocator();
- patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
+ auto guard = entry->Env.BindAllocator();
+ entry->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
}
- cache.EmplacePattern(TString((char)('a' + i)), patternEnv);
+ cache.EmplacePattern(TString((char)('a' + i)), entry);
}
for (ui32 i = 0; i < cacheSize; ++i) {
@@ -600,13 +601,13 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
auto randomProvider = CreateDeterministicRandomProvider(1);
auto timeProvider = CreateDeterministicTimeProvider(10000000);
TScopedAlloc graphAlloc;
- auto patternEnv = cache.Find(key);
- UNIT_ASSERT(patternEnv);
- TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetBuiltinFactory(),
+ auto entry = cache.Find(key);
+ UNIT_ASSERT(entry);
+ TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetBuiltinFactory(),
functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
"OFF", EGraphPerProcess::Multi);
- auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
+ auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
auto value = graph->GetValue();
UNIT_ASSERT_EQUAL(value.AsStringRef(), NYql::NUdf::TStringRef("qwerty"));
}
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 2eb74891ae7..23a9eca9ff4 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1450,8 +1450,7 @@ public:
}
void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
- const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&,
- const std::shared_ptr<NKikimr::NMiniKQL::TPatternWithEnv>&) override
+ const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&) override
{
Y_UNUSED(memoryLimits);
Y_UNUSED(execCtx);