diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-09-13 19:26:11 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-09-13 19:26:11 +0300 |
commit | 2dbcb34d68c8b32abf7b9190ddaf502fae86938c (patch) | |
tree | 1f84382ee1243fe6a095ef88a7f14a729cba2064 | |
parent | 26ac7247b980e2d8366a491aa2807678f3ecf52d (diff) | |
download | ydb-2dbcb34d68c8b32abf7b9190ddaf502fae86938c.tar.gz |
Refactor cache, get rid of singleton
11 files changed, 189 insertions, 158 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index aeb2cafe70a..dcce35616e2 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -3,6 +3,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/protos/tx_datashard.pb.h> +#include <ydb/core/kqp/rm/kqp_rm.h> #include <ydb/core/kqp/runtime/kqp_compute.h> #include <ydb/core/kqp/runtime/kqp_scan_data.h> #include <ydb/core/sys_view/scan.h> @@ -35,7 +36,7 @@ public: TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId) : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId)) , ComputeCtx(settings.StatsMode) @@ -70,6 +71,7 @@ public: execCtx.ApplyCtx = nullptr; execCtx.Alloc = nullptr; execCtx.TypeEnv = nullptr; + execCtx.PatternCache = GetKqpResourceManager()->GetComputeActorPatternCache(); TDqTaskRunnerSettings settings; settings.CollectBasicStats = RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC; @@ -306,7 +308,7 @@ private: IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId) { return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp index 0c663161f26..32113271184 100644 --- a/ydb/core/kqp/executer/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp @@ -184,6 +184,7 @@ private: // task runner settings NMiniKQL::TKqpComputeContextBase computeCtx; TDqTaskRunnerContext context = CreateTaskRunnerContext(&computeCtx, &alloc, &typeEnv); + context.PatternCache = GetKqpResourceManager()->GetLiteralPatternCache(); TDqTaskRunnerSettings settings = CreateTaskRunnerSettings(Request.StatsMode); Y_DEFER { @@ -269,27 +270,8 @@ private: auto taskRunner = CreateKqpTaskRunner(context, settings, log); TaskRunners.emplace_back(taskRunner); - std::shared_ptr<NMiniKQL::TPatternWithEnv> patternEnv; - bool useCache = AppData()->FeatureFlags.GetEnableKqpPatternCacheLiteral(); - bool foundInCache = false; - if (useCache) { - auto *cache = Singleton<NMiniKQL::TComputationPatternLRUCache>(); - - patternEnv = cache->Find(protoTask.GetProgram().GetRaw()); - if (patternEnv) { - foundInCache = true; - } else { - patternEnv = cache->CreateEnv(); - } - } - taskRunner->Prepare(protoTask, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext(), - parameterProvider, patternEnv); - - if (useCache && !foundInCache) { - auto *cache = Singleton<NMiniKQL::TComputationPatternLRUCache>(); - cache->EmplacePattern(protoTask.GetProgram().GetRaw(), std::move(patternEnv)); - } + parameterProvider); auto status = taskRunner->Run(); YQL_ENSURE(status == ERunStatus::Finished); diff --git a/ydb/core/kqp/rm/kqp_rm.cpp b/ydb/core/kqp/rm/kqp_rm.cpp index dd79ef8ea72..81f950afdd7 100644 --- a/ydb/core/kqp/rm/kqp_rm.cpp +++ b/ydb/core/kqp/rm/kqp_rm.cpp @@ -226,7 +226,10 @@ public: , Counters(counters) , ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID()) , ExecutionUnitsResource(Config.GetComputeActorsCount()) - , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) {} + , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) + //, LiteralPatternCache(std::make_shared<NMiniKQL::TComputationPatternLRUCache>()) + //, ComputeActorPatternCache(std::make_shared<NMiniKQL::TComputationPatternLRUCache>()) + {} void Bootstrap() { ActorSystem = TlsActivationContext->ActorSystem(); @@ -590,6 +593,18 @@ public: } } + std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetLiteralPatternCache() override { + with_lock (Lock) { + return LiteralPatternCache; + } + } + + std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetComputeActorPatternCache() override { + with_lock (Lock) { + return ComputeActorPatternCache; + } + } + private: STATEFN(WorkState) { switch (ev->GetTypeRewrite()) { @@ -918,6 +933,10 @@ private: std::optional<TInstant> LastPublishTime; }; TWhiteBoardState WbState; + + // pattern caches for different actors + std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> LiteralPatternCache; + std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> ComputeActorPatternCache; }; } // namespace NRm diff --git a/ydb/core/kqp/rm/kqp_rm.h b/ydb/core/kqp/rm/kqp_rm.h index 568e945d803..f52eed96ac7 100644 --- a/ydb/core/kqp/rm/kqp_rm.h +++ b/ydb/core/kqp/rm/kqp_rm.h @@ -2,6 +2,7 @@ #include <ydb/core/protos/config.pb.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h> #include <library/cpp/actors/core/actor.h> @@ -86,6 +87,9 @@ public: virtual TKqpLocalNodeResources GetLocalResources() const = 0; virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0; + + virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetLiteralPatternCache() = 0; + virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetComputeActorPatternCache() = 0; }; } // namespace NRm diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 1b50ca14a38..0e36cd68d43 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -10,6 +10,7 @@ #include <ydb/library/yql/dq/runtime/dq_transport.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h> #include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> @@ -252,8 +253,7 @@ public: return TaskId; } - void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider, - const std::shared_ptr<TPatternWithEnv>& compPattern) + void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider) { LOG(TStringBuilder() << "Build task: " << TaskId); auto startTime = TInstant::Now(); @@ -264,7 +264,21 @@ public: YQL_ENSURE(program.GetRuntimeVersion()); YQL_ENSURE(program.GetRuntimeVersion() <= NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); - ProgramParsed.ProgramNode = DeserializeRuntimeNode(program.GetRaw(), typeEnv); + std::shared_ptr<TPatternCacheEntry> entry; + if (auto& cache = Context.PatternCache) { + entry = cache->Find(program.GetRaw()); + if (!entry) { + entry = std::make_shared<TPatternCacheEntry>(); + } + } + + auto& patternAlloc = entry ? entry->Alloc.Ref() : Alloc().Ref(); + auto& patternEnv = entry ? entry->Env : typeEnv; + + { + auto guard = patternEnv.BindAllocator(); + ProgramParsed.ProgramNode = DeserializeRuntimeNode(program.GetRaw(), patternEnv); + } YQL_ENSURE(ProgramParsed.ProgramNode.IsImmediate() && ProgramParsed.ProgramNode.GetNode()->GetType()->IsStruct()); auto& programStruct = static_cast<TStructLiteral&>(*ProgramParsed.ProgramNode.GetNode()); auto programType = programStruct.GetType(); @@ -275,9 +289,9 @@ public: TRuntimeNode programRoot = programStruct.GetValue(*programRootIdx); if (Context.FuncProvider) { TExploringNodeVisitor explorer; - explorer.Walk(programRoot.GetNode(), typeEnv); + explorer.Walk(programRoot.GetNode(), patternEnv); bool wereChanges = false; - programRoot = SinglePassVisitCallables(programRoot, explorer, Context.FuncProvider, typeEnv, true, wereChanges); + programRoot = SinglePassVisitCallables(programRoot, explorer, Context.FuncProvider, patternEnv, true, wereChanges); } ProgramParsed.OutputItemTypes.resize(task.OutputsSize()); @@ -336,7 +350,7 @@ public: auto paramsCount = paramsStruct->GetMembersCount(); TExploringNodeVisitor programExplorer; - programExplorer.Walk(programRoot.GetNode(), typeEnv); + programExplorer.Walk(programRoot.GetNode(), patternEnv); auto programSize = programExplorer.GetNodes().size(); LOG(TStringBuilder() << "task: " << TaskId << ", program size: " << programSize @@ -348,9 +362,6 @@ public: auto validatePolicy = Settings.TerminateOnError ? NUdf::EValidatePolicy::Fail : NUdf::EValidatePolicy::Exception; - auto& compPatternAlloc = compPattern ? compPattern->Alloc.Ref() : Alloc().Ref(); - auto& compPatternEnv = compPattern ? compPattern->Env : typeEnv; - auto taskRunnerFactory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { auto& computationFactory = Context.ComputationFactory; if (auto res = computationFactory(callable, ctx)) { @@ -362,19 +373,22 @@ public: return nullptr; }; - TComputationPatternOpts opts(compPatternAlloc, compPatternEnv, taskRunnerFactory, + TComputationPatternOpts opts(patternAlloc, patternEnv, taskRunnerFactory, Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi, ProgramParsed.StatsRegistry.Get()); - SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams); - opts.SecureParamsProvider = SecureParamsProvider.get(); + SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams); + opts.SecureParamsProvider = SecureParamsProvider.get(); - if (compPattern) { - if (!compPattern->Pattern) { - auto guard = compPattern->Env.BindAllocator(); - compPattern->Pattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts); + bool filledCacheEntry = false; + if (entry) { + if (!entry->Pattern) { + auto guard = entry->Env.BindAllocator(); + entry->Pattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts); + filledCacheEntry = true; } - ProgramParsed.CompPattern = compPattern->Pattern; + ProgramParsed.CompPattern = entry->Pattern; + ProgramParsed.CompPatternEntry = entry; // clone pattern using alloc from current scope ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone( opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &Alloc().Ref())); @@ -384,6 +398,12 @@ public: opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider)); } + if (filledCacheEntry) { + if (auto& cache = Context.PatternCache) { + cache->EmplacePattern(task.GetProgram().GetRaw(), entry); + } + } + TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); auto paramNode = ProgramParsed.CompGraph->GetEntryPoint(programInputsCount, /* require */ false); @@ -428,11 +448,10 @@ public: } void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider, - const std::shared_ptr<TPatternWithEnv>& compPattern) override + const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider) override { TaskId = task.GetId(); - BuildTask(task, parameterProvider, compPattern); + BuildTask(task, parameterProvider); LOG(TStringBuilder() << "Prepare task: " << TaskId); auto startTime = TInstant::Now(); @@ -836,6 +855,7 @@ private: }; struct TProgramParsed { + std::shared_ptr<TPatternCacheEntry> CompPatternEntry; TRuntimeNode ProgramNode; TVector<TType*> InputItemTypes; TVector<TType*> OutputItemTypes; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index dbeec8cf8d4..710f4e5cf5a 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -9,6 +9,7 @@ #include <ydb/library/yql/dq/runtime/dq_output_consumer.h> #include <ydb/library/yql/dq/runtime/dq_async_input.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h> #include <ydb/library/yql/minikql/mkql_alloc.h> #include <ydb/library/yql/minikql/mkql_function_registry.h> #include <ydb/library/yql/minikql/mkql_node.h> @@ -236,6 +237,7 @@ struct TDqTaskRunnerContext { NKikimr::NMiniKQL::TCallableVisitFuncProvider FuncProvider; NKikimr::NMiniKQL::TScopedAlloc* Alloc = nullptr; NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr; + std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> PatternCache; }; class IDqTaskRunnerExecutionContext { @@ -293,8 +295,7 @@ public: virtual void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(), - const TDqTaskRunnerParameterProvider& parameterProvider = {}, - const std::shared_ptr<NKikimr::NMiniKQL::TPatternWithEnv>& compPattern = {}) = 0; + const TDqTaskRunnerParameterProvider& parameterProvider = {}) = 0; virtual ERunStatus Run() = 0; virtual bool HasEffects() const = 0; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h index 2d8061c7163..994bfc5c3be 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h @@ -277,7 +277,7 @@ struct TComputationNodeFactoryContext { using TComputationNodeFactory = std::function<IComputationNode* (TCallable&, const TComputationNodeFactoryContext&)>; using TStreamEmitter = std::function<void(NUdf::TUnboxedValue&&)>; -struct TPatternWithEnv; +struct TPatternCacheEntry; struct TComputationPatternOpts { TComputationPatternOpts(std::shared_ptr<TInjectedAlloc> cacheAlloc, std::shared_ptr<TTypeEnvironment> cacheEnv) @@ -330,13 +330,8 @@ struct TComputationPatternOpts { SecureParamsProvider = secureParamsProvider; } - void SetPatternEnv(std::shared_ptr<TPatternWithEnv> cacheEnv) { - PatternEnv = std::move(cacheEnv); - } - - mutable std::shared_ptr<TInjectedAlloc> CacheAlloc; - mutable std::shared_ptr<TTypeEnvironment> CacheTypeEnv; - mutable std::shared_ptr<TPatternWithEnv> PatternEnv; + mutable std::shared_ptr<TInjectedAlloc> CacheAlloc; // obsolete + mutable std::shared_ptr<TTypeEnvironment> CacheTypeEnv; // obsolete TAllocState& AllocState; const TTypeEnvironment& Env; @@ -385,83 +380,6 @@ public: virtual size_t GetCacheHits() const = 0; }; -using TPrepareFunc = std::function<IComputationPattern::TPtr(TScopedAlloc &, TTypeEnvironment &)>; - -struct TPatternWithEnv { - TScopedAlloc Alloc; - TTypeEnvironment Env; - IComputationPattern::TPtr Pattern; - - TPatternWithEnv() : Env(Alloc) { - Alloc.Release(); - } - - ~TPatternWithEnv() { - Alloc.Acquire(); - } -}; - -class TComputationPatternLRUCache { - mutable std::mutex Mutex; - - TLRUCache<TString, std::shared_ptr<TPatternWithEnv>> Cache; - std::atomic<size_t> Hits = 0; - std::atomic<size_t> TotalKeysSize = 0; - std::atomic<size_t> TotalValuesSize = 0; -public: - TComputationPatternLRUCache(size_t size = 100) - : Cache(size) - {} - - static std::shared_ptr<TPatternWithEnv> CreateEnv() { - return std::make_shared<TPatternWithEnv>(); - } - - std::shared_ptr<TPatternWithEnv> Find(const TString& serialized) { - auto guard = std::unique_lock<std::mutex>(Mutex); - if (auto it = Cache.Find(serialized); it != Cache.End()) { - ++Hits; - return *it; - } - return {}; - } - - void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternWithEnv> patternWithEnv) { - auto guard = std::unique_lock<std::mutex>(Mutex); - Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern); - TotalKeysSize += serialized.Size(); - TotalValuesSize += patternWithEnv->Alloc.GetAllocated(); - - if (Cache.TotalSize() == Cache.GetMaxSize()) { - auto oldest = Cache.FindOldest(); - Y_VERIFY(oldest != Cache.End()); - TotalKeysSize -= oldest.Key().Size(); - TotalValuesSize -= oldest.Value()->Alloc.GetAllocated(); - Cache.Erase(oldest); - } - - Cache.Insert(serialized, std::move(patternWithEnv)); - } - - void CleanCache() { - auto guard = std::unique_lock<std::mutex>(Mutex); - Cache.Clear(); - } - - size_t GetSize() const { - auto guard = std::unique_lock<std::mutex>(Mutex); - return Cache.TotalSize(); - } - - size_t GetCacheHits() const { - return Hits.load(); - } - - ~TComputationPatternLRUCache() { - Mutex.lock(); - } -}; - std::unique_ptr<NUdf::ISecureParamsProvider> MakeSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams); } // namespace NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp index e44832123d1..3adedf5628c 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp @@ -2,6 +2,7 @@ #include "mkql_value_builder.h" #include "mkql_computation_node_codegen.h" #include <ydb/library/yql/minikql/arrow/mkql_memory_pool.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h> #include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> #include <ydb/library/yql/minikql/mkql_terminator.h> @@ -221,7 +222,7 @@ public: , ValidatePolicy(opts.ValidatePolicy) , GraphPerProcess(opts.GraphPerProcess) , PatternNodes(MakeIntrusive<TPatternNodes>(opts.AllocState)) - , ExternalAlloc(opts.CacheAlloc || opts.PatternEnv) + , ExternalAlloc(opts.CacheAlloc) { PatternNodes->HolderFactory = MakeHolder<THolderFactory>(opts.AllocState, *PatternNodes->MemInfo, &FunctionRegistry); PatternNodes->ValueBuilder = MakeHolder<TDefaultValueBuilder>(*PatternNodes->HolderFactory, ValidatePolicy); @@ -525,7 +526,7 @@ private: NUdf::EValidatePolicy ValidatePolicy; EGraphPerProcess GraphPerProcess; TPatternNodes::TPtr PatternNodes; - const bool ExternalAlloc; + const bool ExternalAlloc; // obsolete, will be removed after YQL-13977 }; class TComputationGraph : public IComputationGraph { @@ -843,11 +844,7 @@ TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNode IComputationPattern::TPtr MakeComputationPattern(TExploringNodeVisitor& explorer, const TRuntimeNode& root, const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) { - auto pattern = MakeComputationPatternImpl(explorer, root, entryPoints, opts); - if (opts.PatternEnv) { - pattern->SetTypeEnv(&opts.PatternEnv->Env); - } - return pattern; + return MakeComputationPatternImpl(explorer, root, entryPoints, opts); } class TComputationPatternCache: public IComputationPatternCache { diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h new file mode 100644 index 00000000000..10deab906b2 --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h @@ -0,0 +1,88 @@ +#pragma once + +#include "mkql_computation_node.h" + +#include <ydb/library/yql/minikql/mkql_node.h> + +#include <memory> + +namespace NKikimr::NMiniKQL { + +struct TPatternCacheEntry { + TScopedAlloc Alloc; + TTypeEnvironment Env; + IComputationPattern::TPtr Pattern; + + TPatternCacheEntry() + : Env(Alloc) + { + Alloc.Release(); + } + + ~TPatternCacheEntry() { + Alloc.Acquire(); + } +}; + +class TComputationPatternLRUCache { + mutable std::mutex Mutex; + + TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache; + std::atomic<size_t> Hits = 0; + std::atomic<size_t> TotalKeysSize = 0; + std::atomic<size_t> TotalValuesSize = 0; +public: + TComputationPatternLRUCache(size_t size = 100) + : Cache(size) + {} + + static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry() { + return std::make_shared<TPatternCacheEntry>(); + } + + std::shared_ptr<TPatternCacheEntry> Find(const TString& serialized) { + auto guard = std::scoped_lock<std::mutex>(Mutex); + if (auto it = Cache.Find(serialized); it != Cache.End()) { + ++Hits; + return *it; + } + return {}; + } + + void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) { + auto guard = std::scoped_lock<std::mutex>(Mutex); + Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern); + TotalKeysSize += serialized.Size(); + TotalValuesSize += patternWithEnv->Alloc.GetAllocated(); + + if (Cache.TotalSize() == Cache.GetMaxSize()) { + auto oldest = Cache.FindOldest(); + Y_VERIFY(oldest != Cache.End()); + TotalKeysSize -= oldest.Key().Size(); + TotalValuesSize -= oldest.Value()->Alloc.GetAllocated(); + Cache.Erase(oldest); + } + + Cache.Insert(serialized, std::move(patternWithEnv)); + } + + void CleanCache() { + auto guard = std::scoped_lock<std::mutex>(Mutex); + Cache.Clear(); + } + + size_t GetSize() const { + auto guard = std::scoped_lock<std::mutex>(Mutex); + return Cache.TotalSize(); + } + + size_t GetCacheHits() const { + return Hits.load(); + } + + ~TComputationPatternLRUCache() { + Mutex.lock(); + } +}; + +} // namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp index fee939dae74..81d6901daff 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -1,6 +1,7 @@ #include "library/cpp/threading/local_executor/local_executor.h" #include "ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h" #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> #include <ydb/library/yql/minikql/mkql_node_serialization.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -439,9 +440,9 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { TComputationPatternLRUCache cache(cacheSize); auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); - std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv(); - TScopedAlloc &alloc = patternEnv->Alloc; - TTypeEnvironment &typeEnv = patternEnv->Env; + auto entry = std::make_shared<TPatternCacheEntry>(); + TScopedAlloc &alloc = entry->Alloc; + TTypeEnvironment &typeEnv = entry->Env; TProgramBuilder pb(typeEnv, *functionRegistry); @@ -456,10 +457,10 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi); { - auto guard = patternEnv->Env.BindAllocator(); - patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts); + auto guard = entry->Env.BindAllocator(); + entry->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts); } - cache.EmplacePattern("a", patternEnv); + cache.EmplacePattern("a", entry); auto genData = [&]() { std::vector<ui64> data; data.reserve(vecSize); @@ -482,13 +483,13 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { auto timeProvider = CreateDeterministicTimeProvider(10000000); TScopedAlloc graphAlloc; - auto patternEnv = cache.Find(key); + auto entry = cache.Find(key); - TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetListTestFactory(), + TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetListTestFactory(), functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi); - auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); + auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); TUnboxedValue* items = nullptr; graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items)); @@ -572,9 +573,9 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); for (ui32 i = 0; i < cacheSize; ++i) { - std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv(); - TScopedAlloc& alloc = patternEnv->Alloc; - TTypeEnvironment& typeEnv = patternEnv->Env; + auto entry = std::make_shared<TPatternCacheEntry>(); + TScopedAlloc& alloc = entry->Alloc; + TTypeEnvironment& typeEnv = entry->Env; TProgramBuilder pb(typeEnv, *functionRegistry); @@ -588,10 +589,10 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { "OFF", EGraphPerProcess::Multi); { - auto guard = patternEnv->Env.BindAllocator(); - patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts); + auto guard = entry->Env.BindAllocator(); + entry->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts); } - cache.EmplacePattern(TString((char)('a' + i)), patternEnv); + cache.EmplacePattern(TString((char)('a' + i)), entry); } for (ui32 i = 0; i < cacheSize; ++i) { @@ -600,13 +601,13 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { auto randomProvider = CreateDeterministicRandomProvider(1); auto timeProvider = CreateDeterministicTimeProvider(10000000); TScopedAlloc graphAlloc; - auto patternEnv = cache.Find(key); - UNIT_ASSERT(patternEnv); - TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetBuiltinFactory(), + auto entry = cache.Find(key); + UNIT_ASSERT(entry); + TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetBuiltinFactory(), functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi); - auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); + auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); auto value = graph->GetValue(); UNIT_ASSERT_EQUAL(value.AsStringRef(), NYql::NUdf::TStringRef("qwerty")); } diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 2eb74891ae7..23a9eca9ff4 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1450,8 +1450,7 @@ public: } void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&, - const std::shared_ptr<NKikimr::NMiniKQL::TPatternWithEnv>&) override + const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&) override { Y_UNUSED(memoryLimits); Y_UNUSED(execCtx); |