diff options
author | va-kuznecov <[email protected]> | 2022-07-11 20:02:18 +0300 |
---|---|---|
committer | va-kuznecov <[email protected]> | 2022-07-11 20:02:18 +0300 |
commit | 1e6a044b49116e4a3e72eafd9efb6ef679830562 (patch) | |
tree | 35d82e9564f3c4fcf9bd6fd5d958badc6762860f | |
parent | 8bdf39d3f18717d5b0d0ce8325fdf428bd3ae728 (diff) |
Per node computation pattern cache (LiteralExecuter)
use Singleton
Implement ComputationPattern cache in literal executer
-rw-r--r-- | ydb/core/kqp/executer/kqp_literal_executer.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 56 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_service_ut.cpp | 50 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 43 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/minikql/aligned_page_pool.cpp | 9 | ||||
-rw-r--r-- | ydb/library/yql/minikql/computation/mkql_computation_node.h | 91 | ||||
-rw-r--r-- | ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp | 23 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_alloc.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp | 3 |
11 files changed, 249 insertions, 56 deletions
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp index b5c1a34feaf..090a6381b58 100644 --- a/ydb/core/kqp/executer/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp @@ -7,6 +7,7 @@ #include <ydb/core/kqp/runtime/kqp_tasks_runner.h> #include <ydb/core/kqp/runtime/kqp_transport.h> #include <ydb/core/kqp/prepare/kqp_query_plan.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> namespace NKikimr { namespace NKqp { @@ -250,8 +251,28 @@ private: auto taskRunner = CreateKqpTaskRunner(context, settings, log); TaskRunners.emplace_back(taskRunner); + + std::shared_ptr<NMiniKQL::TPatternWithEnv> patternEnv; + bool useCache = AppData()->FeatureFlags.GetEnableKqpPatternCacheLiteral(); + bool foundInCache = false; + if (useCache) { + auto *cache = Singleton<NMiniKQL::TComputationPatternLRUCache>(); + + patternEnv = cache->Find(protoTask.GetProgram().GetRaw()); + if (patternEnv) { + foundInCache = true; + } else { + patternEnv = cache->CreateEnv(); + } + } + taskRunner->Prepare(protoTask, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext(), - parameterProvider); + parameterProvider, patternEnv); + + if (useCache && !foundInCache) { + auto *cache = Singleton<NMiniKQL::TComputationPatternLRUCache>(); + cache->EmplacePattern(protoTask.GetProgram().GetRaw(), std::move(patternEnv)); + } auto status = taskRunner->Run(); YQL_ENSURE(status == ERunStatus::Finished); diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index 3a18e1e79f1..1e5856790ee 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -8,27 +8,65 @@ using namespace NYdb; using namespace NYdb::NTable; Y_UNIT_TEST_SUITE(KqpNewEngine) { - Y_UNIT_TEST_TWIN(SimpleSelect, UseSessionActor) { - auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); + Y_UNIT_TEST_TWIN(Select1, UseSessionActor) { + auto settings = TKikimrSettings() + .SetEnableKqpSessionActor(UseSessionActor) + .SetWithSampleTables(false); + auto kikimr = TKikimrRunner{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); auto result = session.ExecuteDataQuery(R"( PRAGMA kikimr.UseNewEngine = "true"; - SELECT Value1, Value2, Key FROM `/Root/TwoShard` WHERE Value2 != 0 ORDER BY Key DESC; + SELECT 1; )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); AssertSuccessResult(result); CompareYson(R"( - [ - [["BigThree"];[1];[4000000003u]]; - [["BigOne"];[-1];[4000000001u]]; - [["Three"];[1];[3u]]; - [["One"];[-1];[1u]] - ] + [ [1]; ] )", FormatResultSetYson(result.GetResultSet(0))); } + Y_UNIT_TEST_TWIN(SimpleUpsertSelect, UseSessionActor) { + auto settings = TKikimrSettings() + .SetEnableKqpSessionActor(UseSessionActor) + .SetWithSampleTables(false); + auto kikimr = TKikimrRunner{settings}; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE `KeyValue` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + AssertSuccessResult(session.ExecuteDataQuery(R"( + --!syntax_v1 + REPLACE INTO `KeyValue` (Key, Value) VALUES + (1u, "One"), + (2u, "Two"), + (3u, "Three"); + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync()); + + auto selectResult = session.ExecuteDataQuery(R"( + --!syntax_v1 + SELECT * FROM `KeyValue`; + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + + AssertSuccessResult(selectResult); + + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]]; + [[3u];["Three"]] + ])", FormatResultSetYson(selectResult.GetResultSet(0))); + } + Y_UNIT_TEST_TWIN(PkSelect1, UseSessionActor) { auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/kqp/ut/kqp_service_ut.cpp b/ydb/core/kqp/ut/kqp_service_ut.cpp index 0754085e114..ce77ad2a461 100644 --- a/ydb/core/kqp/ut/kqp_service_ut.cpp +++ b/ydb/core/kqp/ut/kqp_service_ut.cpp @@ -228,37 +228,65 @@ Y_UNIT_TEST_SUITE(KqpService) { UNIT_ASSERT_VALUES_EQUAL_C(status.GetStatus(), EStatus::SUCCESS, status.GetIssues().ToString()); } - Y_UNIT_TEST_TWIN(PatternCache, UseLLVM) { + Y_UNIT_TEST_TWIN(PatternCache, UseCache) { auto settings = TKikimrSettings() .SetWithSampleTables(false); + settings.FeatureFlags.SetEnableKqpPatternCacheLiteral(UseCache); auto kikimr = TKikimrRunner{settings}; auto driver = kikimr.GetDriver(); - size_t InFlight = 3; + size_t InFlight = 10; NPar::LocalExecutor().RunAdditionalThreads(InFlight); NPar::LocalExecutor().ExecRange([&driver](int /*id*/) { + TTimer t; NYdb::NTable::TTableClient db(driver); auto session = db.CreateSession().GetValueSync().GetSession(); - for (ui32 i = 0; i < 100; ++i) { + for (ui32 i = 0; i < 500; ++i) { ui64 total = 100500; TString request = (TStringBuilder() << R"_( $data = AsList( AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << total - 2 * (i / 5) << R"_(u AS Value) + AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), + + AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), + + AsStruct("aaa" AS Key,)_" << total - 10 * (i / 5) << R"_(u AS Value), + + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value) ); - SELECT Key, SUM(Value) as Sum FROM AS_TABLE($data) GROUP BY Key; + SELECT * FROM ( + SELECT Key, SUM(Value) as Sum FROM ( + SELECT * FROM AS_TABLE($data) + ) GROUP BY Key + ) WHERE Key == "aaa"; )_"); - auto result = session.ExecuteDataQuery(request, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.KeepInQueryCache(true); + + auto result = session.ExecuteDataQuery(request, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync(); AssertSuccessResult(result); - CompareYson(R"( - [ - ["aaa";100500u] - ] - )", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"( [ ["aaa";100500u] ])", FormatResultSetYson(result.GetResultSet(0))); } }, 0, InFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 28ffdfb058b..4df06352d4d 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -702,6 +702,7 @@ message TFeatureFlags { optional bool EnableKqpScanQueryStreamLookup = 66 [default = false]; optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false]; optional bool EnablePredicateExtractForDataQueries = 68 [default = false]; + optional bool EnableKqpPatternCacheLiteral = 69 [default = false]; } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 9a1f02fbc18..92bbbb0557d 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -249,13 +249,15 @@ public: return TaskId; } - void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider) { + void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider, + const std::shared_ptr<TPatternWithEnv>& compPattern) + { LOG(TStringBuilder() << "Build task: " << TaskId); auto startTime = TInstant::Now(); auto& typeEnv = TypeEnv(); - auto& program = task.GetProgram(); + const NDqProto::TProgram &program = task.GetProgram(); YQL_ENSURE(program.GetRuntimeVersion()); YQL_ENSURE(program.GetRuntimeVersion() <= NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); @@ -342,15 +344,31 @@ public: } auto validatePolicy = Settings.TerminateOnError ? NUdf::EValidatePolicy::Fail : NUdf::EValidatePolicy::Exception; - TComputationPatternOpts opts(Alloc().Ref(), typeEnv, Context.ComputationFactory, Context.FuncRegistry, - NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, - EGraphPerProcess::Multi, ProgramParsed.StatsRegistry.Get()); - SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams); - opts.SecureParamsProvider = SecureParamsProvider.get(); - ProgramParsed.CompPattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts); - ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone( - opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider)); + auto& compPatternAlloc = compPattern ? compPattern->Alloc.Ref() : Alloc().Ref(); + auto& compPatternEnv = compPattern ? compPattern->Env : typeEnv; + + TComputationPatternOpts opts(compPatternAlloc, compPatternEnv, Context.ComputationFactory, + Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi, + ProgramParsed.StatsRegistry.Get()); + + SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams); + opts.SecureParamsProvider = SecureParamsProvider.get(); + + if (compPattern) { + if (!compPattern->Pattern) { + auto guard = compPattern->Env.BindAllocator(); + compPattern->Pattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts); + } + ProgramParsed.CompPattern = compPattern->Pattern; + // clone pattern using alloc from current scope + ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone( + opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &Alloc().Ref())); + } else { + ProgramParsed.CompPattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts); + ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone( + opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider)); + } TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); @@ -396,10 +414,11 @@ public: } void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider) override + const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider, + const std::shared_ptr<TPatternWithEnv>& compPattern) override { TaskId = task.GetId(); - BuildTask(task, parameterProvider); + BuildTask(task, parameterProvider, compPattern); LOG(TStringBuilder() << "Prepare task: " << TaskId); auto startTime = TInstant::Now(); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index aa7c32458dc..4fbefc752b9 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -293,7 +293,8 @@ public: virtual void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(), - const TDqTaskRunnerParameterProvider& parameterProvider = {}) = 0; + const TDqTaskRunnerParameterProvider& parameterProvider = {}, + const std::shared_ptr<NKikimr::NMiniKQL::TPatternWithEnv>& compPattern = {}) = 0; virtual ERunStatus Run() = 0; virtual bool HasEffects() const = 0; diff --git a/ydb/library/yql/minikql/aligned_page_pool.cpp b/ydb/library/yql/minikql/aligned_page_pool.cpp index d63d6a693b3..b603ed8c1e7 100644 --- a/ydb/library/yql/minikql/aligned_page_pool.cpp +++ b/ydb/library/yql/minikql/aligned_page_pool.cpp @@ -1,4 +1,5 @@ #include "aligned_page_pool.h" +#include "util/string/builder.h" #include <library/cpp/actors/util/intrinsics.h> #include <util/generic/yexception.h> @@ -122,8 +123,9 @@ TAlignedPagePoolCounters::TAlignedPagePoolCounters(::NMonitoring::TDynamicCounte TAlignedPagePool::~TAlignedPagePool() { if (CheckLostMem && !UncaughtException()) { Y_VERIFY_DEBUG(TotalAllocated == FreePages.size() * POOL_PAGE_SIZE, - "Expected %ld, actual %ld (%ld page(s), %ld offloaded)", TotalAllocated, - FreePages.size() * POOL_PAGE_SIZE, FreePages.size(), OffloadedActiveBytes); + "Expected %ld, actual %ld (%ld page(s), %ld offloaded)", + TotalAllocated, FreePages.size() * POOL_PAGE_SIZE, + FreePages.size(), OffloadedActiveBytes); Y_VERIFY_DEBUG(OffloadedActiveBytes == 0, "offloaded: %ld", OffloadedActiveBytes); } @@ -251,7 +253,8 @@ void* TAlignedPagePool::GetPage() { } void TAlignedPagePool::ReturnPage(void* addr) noexcept { - Y_VERIFY_DEBUG(AllPages.find(addr) != AllPages.end()); + auto str = TStringBuilder() << __func__ << " this: " << (void*)this << " addr: " << addr << Endl; + Y_VERIFY_DEBUG(AllPages.find(addr) != AllPages.end(), "%s", str.Data()); #ifdef PROFILE_MEMORY_ALLOCATIONS ReturnBlock(addr, POOL_PAGE_SIZE); #else diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h index a5d11b180b0..c93a13c3512 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h @@ -15,6 +15,7 @@ #include <ydb/library/yql/public/udf/udf_validate.h> #include <ydb/library/yql/public/udf/udf_value_builder.h> +#include <library/cpp/cache/cache.h> #include <library/cpp/random_provider/random_provider.h> #include <library/cpp/time_provider/time_provider.h> @@ -254,12 +255,14 @@ struct TComputationNodeFactoryContext { using TComputationNodeFactory = std::function<IComputationNode* (TCallable&, const TComputationNodeFactoryContext&)>; using TStreamEmitter = std::function<void(NUdf::TUnboxedValue&&)>; +struct TPatternWithEnv; + struct TComputationPatternOpts { TComputationPatternOpts(const std::shared_ptr<TInjectedAlloc>& cacheAlloc, const std::shared_ptr<TTypeEnvironment>& cacheEnv) : CacheAlloc(std::move(cacheAlloc)) - , CacheEnv(std::move(cacheEnv)) + , CacheTypeEnv(std::move(cacheEnv)) , AllocState(CacheAlloc->InjectedState()) - , Env(*CacheEnv) + , Env(*CacheTypeEnv) {} TComputationPatternOpts(TAllocState& allocState, const TTypeEnvironment& env) @@ -305,8 +308,13 @@ struct TComputationPatternOpts { SecureParamsProvider = secureParamsProvider; } + void SetPatternEnv(std::shared_ptr<TPatternWithEnv> cacheEnv) { + PatternEnv = std::move(cacheEnv); + } + mutable std::shared_ptr<TInjectedAlloc> CacheAlloc; - mutable std::shared_ptr<TTypeEnvironment> CacheEnv; + mutable std::shared_ptr<TTypeEnvironment> CacheTypeEnv; + mutable std::shared_ptr<TPatternWithEnv> PatternEnv; TAllocState& AllocState; const TTypeEnvironment& Env; @@ -355,6 +363,83 @@ public: virtual size_t GetCacheHits() const = 0; }; +using TPrepareFunc = std::function<IComputationPattern::TPtr(TScopedAlloc &, TTypeEnvironment &)>; + +struct TPatternWithEnv { + TScopedAlloc Alloc; + TTypeEnvironment Env; + IComputationPattern::TPtr Pattern; + + TPatternWithEnv() : Env(Alloc) { + Alloc.Release(); + } + + ~TPatternWithEnv() { + Alloc.Acquire(); + } +}; + +class TComputationPatternLRUCache { + mutable std::mutex Mutex; + + TLRUCache<TString, std::shared_ptr<TPatternWithEnv>> Cache; + std::atomic<size_t> Hits = 0; + std::atomic<size_t> TotalKeysSize = 0; + std::atomic<size_t> TotalValuesSize = 0; +public: + TComputationPatternLRUCache(size_t size = 100) + : Cache(size) + {} + + static std::shared_ptr<TPatternWithEnv> CreateEnv() { + return std::make_shared<TPatternWithEnv>(); + } + + std::shared_ptr<TPatternWithEnv> Find(const TString& serialized) { + auto guard = std::unique_lock<std::mutex>(Mutex); + if (auto it = Cache.Find(serialized); it != Cache.End()) { + ++Hits; + return *it; + } + return {}; + } + + void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternWithEnv> patternWithEnv) { + auto guard = std::unique_lock<std::mutex>(Mutex); + Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern); + TotalKeysSize += serialized.Size(); + TotalValuesSize += patternWithEnv->Alloc.GetAllocated(); + + if (Cache.TotalSize() == Cache.GetMaxSize()) { + auto oldest = Cache.FindOldest(); + Y_VERIFY(oldest != Cache.End()); + TotalKeysSize -= oldest.Key().Size(); + TotalValuesSize -= oldest.Value()->Alloc.GetAllocated(); + Cache.Erase(oldest); + } + + Cache.Insert(serialized, std::move(patternWithEnv)); + } + + void CleanCache() { + auto guard = std::unique_lock<std::mutex>(Mutex); + Cache.Clear(); + } + + size_t GetSize() const { + auto guard = std::unique_lock<std::mutex>(Mutex); + return Cache.TotalSize(); + } + + size_t GetCacheHits() const { + return Hits.load(); + } + + ~TComputationPatternLRUCache() { + Mutex.lock(); + } +}; + std::unique_ptr<NUdf::ISecureParamsProvider> MakeSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams); } // namespace NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp index 7b916cecba2..e44832123d1 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp @@ -221,7 +221,7 @@ public: , ValidatePolicy(opts.ValidatePolicy) , GraphPerProcess(opts.GraphPerProcess) , PatternNodes(MakeIntrusive<TPatternNodes>(opts.AllocState)) - , ExternalAlloc(opts.CacheAlloc) + , ExternalAlloc(opts.CacheAlloc || opts.PatternEnv) { PatternNodes->HolderFactory = MakeHolder<THolderFactory>(opts.AllocState, *PatternNodes->MemInfo, &FunctionRegistry); PatternNodes->ValueBuilder = MakeHolder<TDefaultValueBuilder>(*PatternNodes->HolderFactory, ValidatePolicy); @@ -778,18 +778,13 @@ public: } ~TComputationPatternImpl() { - if (Alloc) { - Alloc->Acquire(); - } - PatternNodes.Reset(); - TypeEnv.reset(); - if (Alloc) { - Alloc->Release(); + if (TypeEnv) { + auto guard = TypeEnv->BindAllocator(); + PatternNodes.Reset(); } } - void HoldInternals(const std::shared_ptr<TScopedAlloc>& alloc, const std::shared_ptr<TTypeEnvironment>& typeEnv) { - Alloc = alloc; + void SetTypeEnv(TTypeEnvironment* typeEnv) { TypeEnv = typeEnv; } @@ -809,9 +804,9 @@ public: THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) final { return MakeHolder<TComputationGraph>(PatternNodes, compOpts); } + private: - std::shared_ptr<TScopedAlloc> Alloc; - std::shared_ptr<TTypeEnvironment> TypeEnv; + TTypeEnvironment* TypeEnv = nullptr; TPatternNodes::TPtr PatternNodes; NYql::NCodegen::ICodegen::TPtr Codegen; }; @@ -849,8 +844,8 @@ TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNode IComputationPattern::TPtr MakeComputationPattern(TExploringNodeVisitor& explorer, const TRuntimeNode& root, const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) { auto pattern = MakeComputationPatternImpl(explorer, root, entryPoints, opts); - if (opts.CacheAlloc) { - pattern->HoldInternals(opts.CacheAlloc->InjectedScopeAlloc(), opts.CacheEnv); + if (opts.PatternEnv) { + pattern->SetTypeEnv(&opts.PatternEnv->Env); } return pattern; } diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index fed14ef943c..7df4adc3e95 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -416,7 +416,8 @@ struct TWithMiniKQLAlloc { template <typename T, typename... Args> T* AllocateOn(TAllocState* state, Args&&... args) { - return ::new(MKQLAllocFastWithSize(sizeof(T), state)) T(std::forward<Args>(args)...); + void* addr = MKQLAllocFastWithSize(sizeof(T), state); + return ::new(addr) T(std::forward<Args>(args)...); static_assert(std::is_base_of<TWithMiniKQLAlloc, T>::value, "Class must inherit TWithMiniKQLAlloc."); } diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 45e980b21f0..eda0fa7fd17 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1421,7 +1421,8 @@ public: } void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&) override + const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&, + const std::shared_ptr<NKikimr::NMiniKQL::TPatternWithEnv>&) override { Y_UNUSED(memoryLimits); Y_UNUSED(execCtx); |