summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <[email protected]>2022-07-11 20:02:18 +0300
committerva-kuznecov <[email protected]>2022-07-11 20:02:18 +0300
commit1e6a044b49116e4a3e72eafd9efb6ef679830562 (patch)
tree35d82e9564f3c4fcf9bd6fd5d958badc6762860f
parent8bdf39d3f18717d5b0d0ce8325fdf428bd3ae728 (diff)
Per node computation pattern cache (LiteralExecuter)
use Singleton Implement ComputationPattern cache in literal executer
-rw-r--r--ydb/core/kqp/executer/kqp_literal_executer.cpp23
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp56
-rw-r--r--ydb/core/kqp/ut/kqp_service_ut.cpp50
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp43
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h3
-rw-r--r--ydb/library/yql/minikql/aligned_page_pool.cpp9
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.h91
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp23
-rw-r--r--ydb/library/yql/minikql/mkql_alloc.h3
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp3
11 files changed, 249 insertions, 56 deletions
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp
index b5c1a34feaf..090a6381b58 100644
--- a/ydb/core/kqp/executer/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/kqp/runtime/kqp_tasks_runner.h>
#include <ydb/core/kqp/runtime/kqp_transport.h>
#include <ydb/core/kqp/prepare/kqp_query_plan.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
namespace NKikimr {
namespace NKqp {
@@ -250,8 +251,28 @@ private:
auto taskRunner = CreateKqpTaskRunner(context, settings, log);
TaskRunners.emplace_back(taskRunner);
+
+ std::shared_ptr<NMiniKQL::TPatternWithEnv> patternEnv;
+ bool useCache = AppData()->FeatureFlags.GetEnableKqpPatternCacheLiteral();
+ bool foundInCache = false;
+ if (useCache) {
+ auto *cache = Singleton<NMiniKQL::TComputationPatternLRUCache>();
+
+ patternEnv = cache->Find(protoTask.GetProgram().GetRaw());
+ if (patternEnv) {
+ foundInCache = true;
+ } else {
+ patternEnv = cache->CreateEnv();
+ }
+ }
+
taskRunner->Prepare(protoTask, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext(),
- parameterProvider);
+ parameterProvider, patternEnv);
+
+ if (useCache && !foundInCache) {
+ auto *cache = Singleton<NMiniKQL::TComputationPatternLRUCache>();
+ cache->EmplacePattern(protoTask.GetProgram().GetRaw(), std::move(patternEnv));
+ }
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index 3a18e1e79f1..1e5856790ee 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -8,27 +8,65 @@ using namespace NYdb;
using namespace NYdb::NTable;
Y_UNIT_TEST_SUITE(KqpNewEngine) {
- Y_UNIT_TEST_TWIN(SimpleSelect, UseSessionActor) {
- auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
+ Y_UNIT_TEST_TWIN(Select1, UseSessionActor) {
+ auto settings = TKikimrSettings()
+ .SetEnableKqpSessionActor(UseSessionActor)
+ .SetWithSampleTables(false);
+ auto kikimr = TKikimrRunner{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
auto result = session.ExecuteDataQuery(R"(
PRAGMA kikimr.UseNewEngine = "true";
- SELECT Value1, Value2, Key FROM `/Root/TwoShard` WHERE Value2 != 0 ORDER BY Key DESC;
+ SELECT 1;
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
AssertSuccessResult(result);
CompareYson(R"(
- [
- [["BigThree"];[1];[4000000003u]];
- [["BigOne"];[-1];[4000000001u]];
- [["Three"];[1];[3u]];
- [["One"];[-1];[1u]]
- ]
+ [ [1]; ]
)", FormatResultSetYson(result.GetResultSet(0)));
}
+ Y_UNIT_TEST_TWIN(SimpleUpsertSelect, UseSessionActor) {
+ auto settings = TKikimrSettings()
+ .SetEnableKqpSessionActor(UseSessionActor)
+ .SetWithSampleTables(false);
+ auto kikimr = TKikimrRunner{settings};
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+
+ CREATE TABLE `KeyValue` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync());
+
+ AssertSuccessResult(session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ REPLACE INTO `KeyValue` (Key, Value) VALUES
+ (1u, "One"),
+ (2u, "Two"),
+ (3u, "Three");
+ )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync());
+
+ auto selectResult = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ SELECT * FROM `KeyValue`;
+ )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
+
+ AssertSuccessResult(selectResult);
+
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]];
+ [[3u];["Three"]]
+ ])", FormatResultSetYson(selectResult.GetResultSet(0)));
+ }
+
Y_UNIT_TEST_TWIN(PkSelect1, UseSessionActor) {
auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
auto db = kikimr.GetTableClient();
diff --git a/ydb/core/kqp/ut/kqp_service_ut.cpp b/ydb/core/kqp/ut/kqp_service_ut.cpp
index 0754085e114..ce77ad2a461 100644
--- a/ydb/core/kqp/ut/kqp_service_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_service_ut.cpp
@@ -228,37 +228,65 @@ Y_UNIT_TEST_SUITE(KqpService) {
UNIT_ASSERT_VALUES_EQUAL_C(status.GetStatus(), EStatus::SUCCESS, status.GetIssues().ToString());
}
- Y_UNIT_TEST_TWIN(PatternCache, UseLLVM) {
+ Y_UNIT_TEST_TWIN(PatternCache, UseCache) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
+ settings.FeatureFlags.SetEnableKqpPatternCacheLiteral(UseCache);
auto kikimr = TKikimrRunner{settings};
auto driver = kikimr.GetDriver();
- size_t InFlight = 3;
+ size_t InFlight = 10;
NPar::LocalExecutor().RunAdditionalThreads(InFlight);
NPar::LocalExecutor().ExecRange([&driver](int /*id*/) {
+ TTimer t;
NYdb::NTable::TTableClient db(driver);
auto session = db.CreateSession().GetValueSync().GetSession();
- for (ui32 i = 0; i < 100; ++i) {
+ for (ui32 i = 0; i < 500; ++i) {
ui64 total = 100500;
TString request = (TStringBuilder() << R"_(
$data = AsList(
AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << total - 2 * (i / 5) << R"_(u AS Value)
+ AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
+
+ AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
+
+ AsStruct("aaa" AS Key,)_" << total - 10 * (i / 5) << R"_(u AS Value),
+
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value)
);
- SELECT Key, SUM(Value) as Sum FROM AS_TABLE($data) GROUP BY Key;
+ SELECT * FROM (
+ SELECT Key, SUM(Value) as Sum FROM (
+ SELECT * FROM AS_TABLE($data)
+ ) GROUP BY Key
+ ) WHERE Key == "aaa";
)_");
- auto result = session.ExecuteDataQuery(request, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.KeepInQueryCache(true);
+
+ auto result = session.ExecuteDataQuery(request,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync();
AssertSuccessResult(result);
- CompareYson(R"(
- [
- ["aaa";100500u]
- ]
- )", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"( [ ["aaa";100500u] ])", FormatResultSetYson(result.GetResultSet(0)));
}
}, 0, InFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 28ffdfb058b..4df06352d4d 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -702,6 +702,7 @@ message TFeatureFlags {
optional bool EnableKqpScanQueryStreamLookup = 66 [default = false];
optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false];
optional bool EnablePredicateExtractForDataQueries = 68 [default = false];
+ optional bool EnableKqpPatternCacheLiteral = 69 [default = false];
}
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 9a1f02fbc18..92bbbb0557d 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -249,13 +249,15 @@ public:
return TaskId;
}
- void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider) {
+ void BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider,
+ const std::shared_ptr<TPatternWithEnv>& compPattern)
+ {
LOG(TStringBuilder() << "Build task: " << TaskId);
auto startTime = TInstant::Now();
auto& typeEnv = TypeEnv();
- auto& program = task.GetProgram();
+ const NDqProto::TProgram &program = task.GetProgram();
YQL_ENSURE(program.GetRuntimeVersion());
YQL_ENSURE(program.GetRuntimeVersion() <= NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
@@ -342,15 +344,31 @@ public:
}
auto validatePolicy = Settings.TerminateOnError ? NUdf::EValidatePolicy::Fail : NUdf::EValidatePolicy::Exception;
- TComputationPatternOpts opts(Alloc().Ref(), typeEnv, Context.ComputationFactory, Context.FuncRegistry,
- NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM,
- EGraphPerProcess::Multi, ProgramParsed.StatsRegistry.Get());
- SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams);
- opts.SecureParamsProvider = SecureParamsProvider.get();
- ProgramParsed.CompPattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts);
- ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone(
- opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider));
+ auto& compPatternAlloc = compPattern ? compPattern->Alloc.Ref() : Alloc().Ref();
+ auto& compPatternEnv = compPattern ? compPattern->Env : typeEnv;
+
+ TComputationPatternOpts opts(compPatternAlloc, compPatternEnv, Context.ComputationFactory,
+ Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi,
+ ProgramParsed.StatsRegistry.Get());
+
+ SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams);
+ opts.SecureParamsProvider = SecureParamsProvider.get();
+
+ if (compPattern) {
+ if (!compPattern->Pattern) {
+ auto guard = compPattern->Env.BindAllocator();
+ compPattern->Pattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts);
+ }
+ ProgramParsed.CompPattern = compPattern->Pattern;
+ // clone pattern using alloc from current scope
+ ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone(
+ opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &Alloc().Ref()));
+ } else {
+ ProgramParsed.CompPattern = MakeComputationPattern(programExplorer, programRoot, ProgramParsed.EntryPoints, opts);
+ ProgramParsed.CompGraph = ProgramParsed.CompPattern->Clone(
+ opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider));
+ }
TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
@@ -396,10 +414,11 @@ public:
}
void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
- const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider) override
+ const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider,
+ const std::shared_ptr<TPatternWithEnv>& compPattern) override
{
TaskId = task.GetId();
- BuildTask(task, parameterProvider);
+ BuildTask(task, parameterProvider, compPattern);
LOG(TStringBuilder() << "Prepare task: " << TaskId);
auto startTime = TInstant::Now();
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index aa7c32458dc..4fbefc752b9 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -293,7 +293,8 @@ public:
virtual void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(),
- const TDqTaskRunnerParameterProvider& parameterProvider = {}) = 0;
+ const TDqTaskRunnerParameterProvider& parameterProvider = {},
+ const std::shared_ptr<NKikimr::NMiniKQL::TPatternWithEnv>& compPattern = {}) = 0;
virtual ERunStatus Run() = 0;
virtual bool HasEffects() const = 0;
diff --git a/ydb/library/yql/minikql/aligned_page_pool.cpp b/ydb/library/yql/minikql/aligned_page_pool.cpp
index d63d6a693b3..b603ed8c1e7 100644
--- a/ydb/library/yql/minikql/aligned_page_pool.cpp
+++ b/ydb/library/yql/minikql/aligned_page_pool.cpp
@@ -1,4 +1,5 @@
#include "aligned_page_pool.h"
+#include "util/string/builder.h"
#include <library/cpp/actors/util/intrinsics.h>
#include <util/generic/yexception.h>
@@ -122,8 +123,9 @@ TAlignedPagePoolCounters::TAlignedPagePoolCounters(::NMonitoring::TDynamicCounte
TAlignedPagePool::~TAlignedPagePool() {
if (CheckLostMem && !UncaughtException()) {
Y_VERIFY_DEBUG(TotalAllocated == FreePages.size() * POOL_PAGE_SIZE,
- "Expected %ld, actual %ld (%ld page(s), %ld offloaded)", TotalAllocated,
- FreePages.size() * POOL_PAGE_SIZE, FreePages.size(), OffloadedActiveBytes);
+ "Expected %ld, actual %ld (%ld page(s), %ld offloaded)",
+ TotalAllocated, FreePages.size() * POOL_PAGE_SIZE,
+ FreePages.size(), OffloadedActiveBytes);
Y_VERIFY_DEBUG(OffloadedActiveBytes == 0, "offloaded: %ld", OffloadedActiveBytes);
}
@@ -251,7 +253,8 @@ void* TAlignedPagePool::GetPage() {
}
void TAlignedPagePool::ReturnPage(void* addr) noexcept {
- Y_VERIFY_DEBUG(AllPages.find(addr) != AllPages.end());
+ auto str = TStringBuilder() << __func__ << " this: " << (void*)this << " addr: " << addr << Endl;
+ Y_VERIFY_DEBUG(AllPages.find(addr) != AllPages.end(), "%s", str.Data());
#ifdef PROFILE_MEMORY_ALLOCATIONS
ReturnBlock(addr, POOL_PAGE_SIZE);
#else
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h
index a5d11b180b0..c93a13c3512 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h
@@ -15,6 +15,7 @@
#include <ydb/library/yql/public/udf/udf_validate.h>
#include <ydb/library/yql/public/udf/udf_value_builder.h>
+#include <library/cpp/cache/cache.h>
#include <library/cpp/random_provider/random_provider.h>
#include <library/cpp/time_provider/time_provider.h>
@@ -254,12 +255,14 @@ struct TComputationNodeFactoryContext {
using TComputationNodeFactory = std::function<IComputationNode* (TCallable&, const TComputationNodeFactoryContext&)>;
using TStreamEmitter = std::function<void(NUdf::TUnboxedValue&&)>;
+struct TPatternWithEnv;
+
struct TComputationPatternOpts {
TComputationPatternOpts(const std::shared_ptr<TInjectedAlloc>& cacheAlloc, const std::shared_ptr<TTypeEnvironment>& cacheEnv)
: CacheAlloc(std::move(cacheAlloc))
- , CacheEnv(std::move(cacheEnv))
+ , CacheTypeEnv(std::move(cacheEnv))
, AllocState(CacheAlloc->InjectedState())
- , Env(*CacheEnv)
+ , Env(*CacheTypeEnv)
{}
TComputationPatternOpts(TAllocState& allocState, const TTypeEnvironment& env)
@@ -305,8 +308,13 @@ struct TComputationPatternOpts {
SecureParamsProvider = secureParamsProvider;
}
+ void SetPatternEnv(std::shared_ptr<TPatternWithEnv> cacheEnv) {
+ PatternEnv = std::move(cacheEnv);
+ }
+
mutable std::shared_ptr<TInjectedAlloc> CacheAlloc;
- mutable std::shared_ptr<TTypeEnvironment> CacheEnv;
+ mutable std::shared_ptr<TTypeEnvironment> CacheTypeEnv;
+ mutable std::shared_ptr<TPatternWithEnv> PatternEnv;
TAllocState& AllocState;
const TTypeEnvironment& Env;
@@ -355,6 +363,83 @@ public:
virtual size_t GetCacheHits() const = 0;
};
+using TPrepareFunc = std::function<IComputationPattern::TPtr(TScopedAlloc &, TTypeEnvironment &)>;
+
+struct TPatternWithEnv {
+ TScopedAlloc Alloc;
+ TTypeEnvironment Env;
+ IComputationPattern::TPtr Pattern;
+
+ TPatternWithEnv() : Env(Alloc) {
+ Alloc.Release();
+ }
+
+ ~TPatternWithEnv() {
+ Alloc.Acquire();
+ }
+};
+
+class TComputationPatternLRUCache {
+ mutable std::mutex Mutex;
+
+ TLRUCache<TString, std::shared_ptr<TPatternWithEnv>> Cache;
+ std::atomic<size_t> Hits = 0;
+ std::atomic<size_t> TotalKeysSize = 0;
+ std::atomic<size_t> TotalValuesSize = 0;
+public:
+ TComputationPatternLRUCache(size_t size = 100)
+ : Cache(size)
+ {}
+
+ static std::shared_ptr<TPatternWithEnv> CreateEnv() {
+ return std::make_shared<TPatternWithEnv>();
+ }
+
+ std::shared_ptr<TPatternWithEnv> Find(const TString& serialized) {
+ auto guard = std::unique_lock<std::mutex>(Mutex);
+ if (auto it = Cache.Find(serialized); it != Cache.End()) {
+ ++Hits;
+ return *it;
+ }
+ return {};
+ }
+
+ void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternWithEnv> patternWithEnv) {
+ auto guard = std::unique_lock<std::mutex>(Mutex);
+ Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern);
+ TotalKeysSize += serialized.Size();
+ TotalValuesSize += patternWithEnv->Alloc.GetAllocated();
+
+ if (Cache.TotalSize() == Cache.GetMaxSize()) {
+ auto oldest = Cache.FindOldest();
+ Y_VERIFY(oldest != Cache.End());
+ TotalKeysSize -= oldest.Key().Size();
+ TotalValuesSize -= oldest.Value()->Alloc.GetAllocated();
+ Cache.Erase(oldest);
+ }
+
+ Cache.Insert(serialized, std::move(patternWithEnv));
+ }
+
+ void CleanCache() {
+ auto guard = std::unique_lock<std::mutex>(Mutex);
+ Cache.Clear();
+ }
+
+ size_t GetSize() const {
+ auto guard = std::unique_lock<std::mutex>(Mutex);
+ return Cache.TotalSize();
+ }
+
+ size_t GetCacheHits() const {
+ return Hits.load();
+ }
+
+ ~TComputationPatternLRUCache() {
+ Mutex.lock();
+ }
+};
+
std::unique_ptr<NUdf::ISecureParamsProvider> MakeSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams);
} // namespace NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
index 7b916cecba2..e44832123d1 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
@@ -221,7 +221,7 @@ public:
, ValidatePolicy(opts.ValidatePolicy)
, GraphPerProcess(opts.GraphPerProcess)
, PatternNodes(MakeIntrusive<TPatternNodes>(opts.AllocState))
- , ExternalAlloc(opts.CacheAlloc)
+ , ExternalAlloc(opts.CacheAlloc || opts.PatternEnv)
{
PatternNodes->HolderFactory = MakeHolder<THolderFactory>(opts.AllocState, *PatternNodes->MemInfo, &FunctionRegistry);
PatternNodes->ValueBuilder = MakeHolder<TDefaultValueBuilder>(*PatternNodes->HolderFactory, ValidatePolicy);
@@ -778,18 +778,13 @@ public:
}
~TComputationPatternImpl() {
- if (Alloc) {
- Alloc->Acquire();
- }
- PatternNodes.Reset();
- TypeEnv.reset();
- if (Alloc) {
- Alloc->Release();
+ if (TypeEnv) {
+ auto guard = TypeEnv->BindAllocator();
+ PatternNodes.Reset();
}
}
- void HoldInternals(const std::shared_ptr<TScopedAlloc>& alloc, const std::shared_ptr<TTypeEnvironment>& typeEnv) {
- Alloc = alloc;
+ void SetTypeEnv(TTypeEnvironment* typeEnv) {
TypeEnv = typeEnv;
}
@@ -809,9 +804,9 @@ public:
THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) final {
return MakeHolder<TComputationGraph>(PatternNodes, compOpts);
}
+
private:
- std::shared_ptr<TScopedAlloc> Alloc;
- std::shared_ptr<TTypeEnvironment> TypeEnv;
+ TTypeEnvironment* TypeEnv = nullptr;
TPatternNodes::TPtr PatternNodes;
NYql::NCodegen::ICodegen::TPtr Codegen;
};
@@ -849,8 +844,8 @@ TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNode
IComputationPattern::TPtr MakeComputationPattern(TExploringNodeVisitor& explorer, const TRuntimeNode& root,
const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) {
auto pattern = MakeComputationPatternImpl(explorer, root, entryPoints, opts);
- if (opts.CacheAlloc) {
- pattern->HoldInternals(opts.CacheAlloc->InjectedScopeAlloc(), opts.CacheEnv);
+ if (opts.PatternEnv) {
+ pattern->SetTypeEnv(&opts.PatternEnv->Env);
}
return pattern;
}
diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h
index fed14ef943c..7df4adc3e95 100644
--- a/ydb/library/yql/minikql/mkql_alloc.h
+++ b/ydb/library/yql/minikql/mkql_alloc.h
@@ -416,7 +416,8 @@ struct TWithMiniKQLAlloc {
template <typename T, typename... Args>
T* AllocateOn(TAllocState* state, Args&&... args)
{
- return ::new(MKQLAllocFastWithSize(sizeof(T), state)) T(std::forward<Args>(args)...);
+ void* addr = MKQLAllocFastWithSize(sizeof(T), state);
+ return ::new(addr) T(std::forward<Args>(args)...);
static_assert(std::is_base_of<TWithMiniKQLAlloc, T>::value, "Class must inherit TWithMiniKQLAlloc.");
}
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 45e980b21f0..eda0fa7fd17 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1421,7 +1421,8 @@ public:
}
void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
- const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&) override
+ const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&,
+ const std::shared_ptr<NKikimr::NMiniKQL::TPatternWithEnv>&) override
{
Y_UNUSED(memoryLimits);
Y_UNUSED(execCtx);