aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaksim Kita <kitaetoya@gmail.com>2023-08-14 17:19:17 +0300
committermaksim-kita <maksim-kita@yandex-team.com>2023-08-15 00:17:35 +0300
commit226a03bca496b97f4e309091b41e2f92a47cc17a (patch)
tree6a8928176cbac656734e56385e97ad46d0912801
parent87b28fbd7cfe91a115f8c5998cb30c325f75858b (diff)
downloadydb-226a03bca496b97f4e309091b41e2f92a47cc17a.tar.gz
JIT enable by default
JIT enable by default Pull Request resolved: #311
-rw-r--r--ydb/core/kqp/common/simple/services.h5
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp130
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.h3
-rw-r--r--ydb/core/kqp/compile_service/ya.make1
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp4
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h4
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp16
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp18
-rw-r--r--ydb/core/kqp/ut/service/kqp_service_ut.cpp96
-rw-r--r--ydb/core/protos/config.proto7
-rw-r--r--ydb/library/services/services.proto2
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.h6
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp220
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp145
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h194
-rw-r--r--ydb/library/yql/minikql/computation/ya.make.inc1
24 files changed, 593 insertions, 267 deletions
diff --git a/ydb/core/kqp/common/simple/services.h b/ydb/core/kqp/common/simple/services.h
index 3d598c95d6..80002d604f 100644
--- a/ydb/core/kqp/common/simple/services.h
+++ b/ydb/core/kqp/common/simple/services.h
@@ -36,4 +36,9 @@ inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) {
return NActors::TActorId(nodeId, TStringBuf(name, 12));
}
+inline NActors::TActorId MakeKqpCompileComputationPatternServiceID(ui32 nodeId) {
+ const char name[12] = "kqp_comp_cp";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
index b3ac2e0be9..647bd7bc53 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
@@ -22,4 +22,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
)
diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
index e786b9cc55..d02df338a9 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
@@ -23,4 +23,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
)
diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
index e786b9cc55..d02df338a9 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
@@ -23,4 +23,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
)
diff --git a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
index b3ac2e0be9..647bd7bc53 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
@@ -22,4 +22,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
)
diff --git a/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
new file mode 100644
index 0000000000..b41d4bf6d6
--- /dev/null
+++ b/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp
@@ -0,0 +1,130 @@
+#include "kqp_compile_service.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/events.h>
+
+#include <ydb/library/aclib/aclib.h>
+
+#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+namespace {
+
+class TKqpCompileComputationPatternService : public TActorBootstrapped<TKqpCompileComputationPatternService> {
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::KQP_COMPILE_COMPUTATION_PATTERN_SERVICE;
+ }
+
+ TKqpCompileComputationPatternService(const NKikimrConfig::TTableServiceConfig_TCompileComputationPatternServiceConfig& config,
+ TIntrusivePtr<TKqpCounters> counters)
+ : WakeupInterval(TDuration::MilliSeconds(config.GetWakeupIntervalMs()))
+ , Counters(std::move(counters))
+ {}
+
+ void Bootstrap(const TActorContext& ctx) {
+ Become(&TKqpCompileComputationPatternService::MainState);
+ ScheduleWakeup(ctx);
+ }
+private:
+ STFUNC(MainState) {
+ switch (ev->GetTypeRewrite()) {
+ CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ default:
+ Y_FAIL("TKqpCompileComputationPatternService: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ }
+ }
+
+ void HandleWakeup(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_COMPUTATION_PATTERN_SERVICE, "Received wakeup");
+ auto patternCache = GetKqpResourceManager()->GetPatternCache();
+ if (!patternCache) {
+ ScheduleWakeup(ctx);
+ return;
+ }
+
+ LoadPatternsToCompileIfNeeded(patternCache);
+
+ TSimpleTimer timer;
+ i64 compilationIntervalMs = MaxCompilationIntervalMs;
+
+ size_t patternsToCompileSize = PatternsToCompile.size();
+ for (; PatternToCompileIndex < patternsToCompileSize && compilationIntervalMs > 0; ++PatternToCompileIndex) {
+ auto & entry = PatternsToCompile[PatternToCompileIndex];
+ if (!entry->IsInCache.load()) {
+ continue;
+ }
+
+ timer.Reset();
+
+ entry->Pattern->Compile({}, nullptr);
+ Counters->CompiledComputationPatterns->Inc();
+
+ entry = nullptr;
+
+ compilationIntervalMs -= static_cast<i64>(timer.Get().MilliSeconds());
+ }
+
+ if (PatternToCompileIndex == patternsToCompileSize) {
+ PatternsToCompile.clear();
+ }
+
+ ScheduleWakeup(ctx);
+ }
+
+private:
+ void LoadPatternsToCompileIfNeeded(std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> & patternCache) {
+ if (PatternsToCompile.size() != 0) {
+ return;
+ }
+
+ THashMap<TString, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile;
+ patternCache->GetPatternsToCompile(patternsToCompile);
+
+ TVector<std::pair<std::shared_ptr<NMiniKQL::TPatternCacheEntry>, size_t>> patternsToCompileWithAccessSize;
+ for (auto & [_, entry] : patternsToCompile) {
+ patternsToCompileWithAccessSize.emplace_back(entry, entry->AccessTimes.load());
+ }
+
+ std::sort(patternsToCompileWithAccessSize.begin(), patternsToCompileWithAccessSize.end(), [](auto & lhs, auto & rhs) {
+ return lhs.second > rhs.second;
+ });
+
+ PatternsToCompile.reserve(patternsToCompileWithAccessSize.size());
+ for (auto & [entry, _] : patternsToCompileWithAccessSize) {
+ PatternsToCompile.push_back(entry);
+ }
+
+ *Counters->CompileQueueSize = PatternsToCompile.size();
+ PatternToCompileIndex = 0;
+ }
+
+ void ScheduleWakeup(const TActorContext& ctx) {
+ ctx.Schedule(WakeupInterval, new TEvents::TEvWakeup());
+ }
+
+ static constexpr i64 MaxCompilationIntervalMs = 300;
+
+ TDuration WakeupInterval;
+ TIntrusivePtr<TKqpCounters> Counters;
+
+ using PatternsToCompileContainer = TVector<std::shared_ptr<NMiniKQL::TPatternCacheEntry>>;
+ using PatternsToCompileContainerIterator = PatternsToCompileContainer::iterator;
+ PatternsToCompileContainer PatternsToCompile;
+ size_t PatternToCompileIndex = 0;
+};
+
+}
+
+IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableServiceConfig& serviceConfig,
+ TIntrusivePtr<TKqpCounters> counters)
+{
+ return new TKqpCompileComputationPatternService(serviceConfig.GetCompileComputationPatternServiceConfig(), std::move(counters));
+}
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h
index f4dcfe7b8a..e351185d23 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.h
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.h
@@ -14,6 +14,9 @@ IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& servic
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::IHTTPGateway::TPtr httpGateway);
+IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableServiceConfig& serviceConfig,
+ TIntrusivePtr<TKqpCounters> counters);
+
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const NKikimrConfig::TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
diff --git a/ydb/core/kqp/compile_service/ya.make b/ydb/core/kqp/compile_service/ya.make
index 3fadfdad44..8eb0152042 100644
--- a/ydb/core/kqp/compile_service/ya.make
+++ b/ydb/core/kqp/compile_service/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
kqp_compile_actor.cpp
kqp_compile_service.cpp
+ kqp_compile_computation_pattern_service.cpp
)
PEERDIR(
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 3dd3544763..fdf85085c8 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -753,6 +753,10 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
CompileQueueSize = KqpGroup->GetCounter("Compilation/QueueSize", false);
+ /* Compile computation pattern service */
+ CompiledComputationPatterns = KqpGroup->GetCounter("ComputationPatternCompilation/CompiledComputationPatterns");
+ CompileComputationPatternsQueueSize = KqpGroup->GetCounter("ComputationPatternCompilation/CompileComputationPatternsQueueSize");
+
/* Resource Manager */
RmComputeActors = KqpGroup->GetCounter("RM/ComputeActors", false);
RmMemory = KqpGroup->GetCounter("RM/Memory", false);
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index 4be02be974..d638e706d7 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -352,6 +352,10 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr CompileQueryCacheEvicted;
::NMonitoring::TDynamicCounters::TCounterPtr CompileQueueSize;
+ // Compile computation pattern service
+ ::NMonitoring::TDynamicCounters::TCounterPtr CompiledComputationPatterns;
+ ::NMonitoring::TDynamicCounters::TCounterPtr CompileComputationPatternsQueueSize;
+
// Resource Manager
::NMonitoring::TDynamicCounters::TCounterPtr RmComputeActors;
::NMonitoring::TDynamicCounters::TCounterPtr RmMemory;
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 3734154d32..b6e083b14b 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -185,7 +185,7 @@ public:
, KqpProxySharedResources(std::move(kqpProxySharedResources))
{}
- void Bootstrap() {
+ void Bootstrap(const TActorContext &ctx) {
if (TokenAccessorConfig.GetEnabled()) {
TString caContent;
if (const auto& path = TokenAccessorConfig.GetSslCaCert()) {
@@ -241,6 +241,14 @@ public:
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
MakeKqpCompileServiceID(SelfId().NodeId()), CompileService);
+ if (TableServiceConfig.GetEnableAsyncComputationPatternCompilation()) {
+ IActor* ComputationPatternServiceActor = CreateKqpCompileComputationPatternService(TableServiceConfig, Counters);
+ ui32 batchPoolId = AppData(ctx)->BatchPoolId;
+ CompileComputationPatternService = ctx.Register(ComputationPatternServiceActor, TMailboxType::HTSwap, batchPoolId);
+ TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
+ MakeKqpCompileComputationPatternServiceID(SelfId().NodeId()), CompileComputationPatternService);
+ }
+
KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, nullptr, AsyncIoFactory));
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
MakeKqpNodeServiceID(SelfId().NodeId()), KqpNodeService);
@@ -423,6 +431,11 @@ public:
void PassAway() override {
Send(CompileService, new TEvents::TEvPoisonPill());
+
+ if (TableServiceConfig.GetEnableAsyncComputationPatternCompilation()) {
+ Send(CompileComputationPatternService, new TEvents::TEvPoisonPill());
+ }
+
Send(SpillingService, new TEvents::TEvPoison);
Send(KqpNodeService, new TEvents::TEvPoison);
if (BoardPublishActor) {
@@ -1568,6 +1581,7 @@ private:
TActorId BoardLookupActor;
TActorId BoardPublishActor;
TActorId CompileService;
+ TActorId CompileComputationPatternService;
TActorId KqpNodeService;
TActorId SpillingService;
TActorId WhiteBoardService;
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index 743d4e2bc8..14bb7e20a8 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -150,7 +150,7 @@ public:
}
ActorSystem = actorSystem;
SelfId = selfId;
- UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes());
+ UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(), Config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
if (PublishResourcesByExchanger) {
CreateResourceInfoExchanger(Config.GetInfoExchangerSettings());
@@ -619,13 +619,15 @@ public:
ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources);
}
- void UpdatePatternCache(ui64 size) {
- if (size) {
- if (!PatternCache || PatternCache->GetMaxSize() != size) {
- PatternCache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(size, Counters->GetKqpCounters());
- }
- } else {
+ void UpdatePatternCache(ui64 maxSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) {
+ if (maxSizeBytes == 0) {
PatternCache.reset();
+ return;
+ }
+
+ NMiniKQL::TComputationPatternLRUCache::Config config{maxSizeBytes, patternAccessTimesBeforeTryToCompile};
+ if (!PatternCache || PatternCache->GetConfiguration() != config) {
+ PatternCache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(config, Counters->GetKqpCounters());
}
}
@@ -886,7 +888,7 @@ private:
Send(ev->Sender, new NConsole::TEvConsole::TEvConfigNotificationResponse(event), IEventHandle::FlagTrackDelivery, ev->Cookie);
auto& config = *event.MutableConfig()->MutableTableServiceConfig()->MutableResourceManager();
- ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes());
+ ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(), config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
bool enablePublishResourcesByExchanger = config.GetEnablePublishResourcesByExchanger();
if (enablePublishResourcesByExchanger != PublishResourcesByExchanger) {
diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
index 8556a594de..4eed1d3124 100644
--- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
@@ -1,3 +1,4 @@
+#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <library/cpp/threading/local_executor/local_executor.h>
@@ -224,49 +225,78 @@ Y_UNIT_TEST_SUITE(KqpService) {
UNIT_ASSERT_VALUES_EQUAL_C(status.GetStatus(), EStatus::SUCCESS, status.GetIssues().ToString());
}
- Y_UNIT_TEST_TWIN(PatternCache, UseCache) {
+ void ConfigureSettings(TKikimrSettings & settings, bool useCache, bool useAsyncPatternCompilation) {
+ size_t cacheSize = 0;
+ if (useCache) {
+ cacheSize = useAsyncPatternCompilation ? 10_MB : 1_MB;
+ }
+
+ auto * tableServiceConfig = settings.AppConfig.MutableTableServiceConfig();
+ tableServiceConfig->MutableResourceManager()->SetKqpPatternCacheCapacityBytes(cacheSize);
+ tableServiceConfig->SetEnableAsyncComputationPatternCompilation(useAsyncPatternCompilation);
+
+ if (useAsyncPatternCompilation) {
+ tableServiceConfig->MutableCompileComputationPatternServiceConfig()->SetWakeupIntervalMs(1);
+ tableServiceConfig->MutableResourceManager()->SetKqpPatternCachePatternAccessTimesBeforeTryToCompile(0);
+ }
+ }
+
+ Y_UNIT_TEST_QUAD(PatternCache, UseCache, UseAsyncPatternCompilation) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
- size_t cacheSize = UseCache ? 1_MB : 0;
- settings.AppConfig.MutableTableServiceConfig()->MutableResourceManager()->SetKqpPatternCacheCapacityBytes(cacheSize);
+ ConfigureSettings(settings, UseCache, UseAsyncPatternCompilation);
+
auto kikimr = TKikimrRunner{settings};
auto driver = kikimr.GetDriver();
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ static constexpr i64 AsyncPatternCompilationUniqueRequestsSize = 5;
+
+ auto async_compilation_condition = [&]() {
+ if constexpr (UseCache && UseAsyncPatternCompilation) {
+ return *counters.CompiledComputationPatterns < AsyncPatternCompilationUniqueRequestsSize;
+ }
+
+ return false;
+ };
+
size_t InFlight = 10;
NPar::LocalExecutor().RunAdditionalThreads(InFlight);
- NPar::LocalExecutor().ExecRange([&driver](int /*id*/) {
- TTimer t;
+ NPar::LocalExecutor().ExecRange([&](int /*id*/) {
NYdb::NTable::TTableClient db(driver);
auto session = db.CreateSession().GetValueSync().GetSession();
- for (ui32 i = 0; i < 500; ++i) {
+
+ for (ui32 i = 0; i < 500 || async_compilation_condition(); ++i) {
+ ui32 value = UseCache && UseAsyncPatternCompilation ? i % AsyncPatternCompilationUniqueRequestsSize : i / 5;
ui64 total = 100500;
TString request = (TStringBuilder() << R"_(
$data = AsList(
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
-
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value),
-
- AsStruct("aaa" AS Key,)_" << total - 10 * (i / 5) << R"_(u AS Value),
-
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
-
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value),
- AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value)
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value),
+
+ AsStruct("aaa" AS Key,)_" << total - 10 * value << R"_(u AS Value),
+
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value),
+ AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value)
);
SELECT * FROM (
@@ -286,6 +316,10 @@ Y_UNIT_TEST_SUITE(KqpService) {
CompareYson(R"( [ ["aaa";100500u] ])", FormatResultSetYson(result.GetResultSet(0)));
}
}, 0, InFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
+
+ if constexpr (UseCache && UseAsyncPatternCompilation) {
+ UNIT_ASSERT(*counters.CompiledComputationPatterns >= AsyncPatternCompilationUniqueRequestsSize);
+ }
}
// YQL-15582
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index c2f6e6681f..aaf6aa9af0 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1142,6 +1142,7 @@ message TTableServiceConfig {
optional uint64 KqpPatternCacheCapacityBytes = 17 [default = 104857600]; // 100 MiB, 0 is for disable
optional bool EnablePublishResourcesByExchanger = 18 [default = true];
optional TInfoExchangerSettings InfoExchangerSettings = 19;
+ optional uint64 KqpPatternCachePatternAccessTimesBeforeTryToCompile = 20 [default = 5];
}
message TSpillingServiceConfig {
@@ -1162,6 +1163,10 @@ message TTableServiceConfig {
optional TLocalFileConfig LocalFileConfig = 1;
}
+ message TCompileComputationPatternServiceConfig {
+ optional uint64 WakeupIntervalMs = 1 [default = 1000]; // 1 sec
+ }
+
message TQueryPhaseLimits {
optional uint32 AffectedShardsLimit = 1;
optional uint32 ReadsetCountLimit = 2;
@@ -1329,6 +1334,8 @@ message TTableServiceConfig {
optional bool EnableSequentialReads = 39 [default = false];
optional bool EnablePreparedDdl = 42 [default = false];
optional bool EnableSequences = 43 [default = true];
+ optional bool EnableAsyncComputationPatternCompilation = 46 [default = false];
+ optional TCompileComputationPatternServiceConfig CompileComputationPatternServiceConfig = 47;
enum EBindingsMode {
BM_ENABLED = 0;
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index 193a125c07..4e3d0e0296 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -224,6 +224,7 @@ enum EServiceKikimr {
KQP_NODE = 543;
KQP_LOAD_TEST = 544;
KQP_SESSION = 545;
+ KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 546;
TABLET_RESOURCE_BROKER = 540;
@@ -1000,5 +1001,6 @@ message TActivity {
PERSQUEUE_READ_QUOTER = 616;
KAFKA_PRODUCE_ACTOR = 617;
STAT_SERVICE = 618;
+ KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 619;
};
};
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
index 853b83dac4..91af29586a 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
@@ -51,5 +51,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort.cpp
)
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
index 031693fb3a..8b735265f7 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
@@ -52,5 +52,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort.cpp
)
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
index 031693fb3a..8b735265f7 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
@@ -52,5 +52,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort.cpp
)
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
index 853b83dac4..91af29586a 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
@@ -51,5 +51,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort.cpp
)
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h
index 4fbb13a650..25d2c08185 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h
@@ -64,7 +64,7 @@ struct TComputationOptsFull: public TComputationOpts {
, TimeProvider(timeProvider)
, ValidatePolicy(validatePolicy)
, SecureParamsProvider(secureParamsProvider)
- {}
+ {}
TAllocState& AllocState;
TTypeEnvironment* TypeEnv = nullptr;
@@ -116,7 +116,7 @@ struct TComputationContextLLVM {
struct TComputationContext : public TComputationContextLLVM {
IRandomProvider& RandomProvider;
ITimeProvider& TimeProvider;
- bool ExecuteLLVM = true;
+ bool ExecuteLLVM = false;
arrow::MemoryPool& ArrowMemoryPool;
std::vector<NUdf::TUnboxedValue*> WideFields;
TTypeEnvironment* TypeEnv = nullptr;
@@ -405,6 +405,8 @@ public:
typedef TIntrusivePtr<IComputationPattern> TPtr;
virtual ~IComputationPattern() = default;
+ virtual void Compile(TString optLLVM, IStatsRegistry* stats) = 0;
+ virtual bool IsCompiled() const = 0;
virtual THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) = 0;
virtual bool GetSuitableForCache() const = 0;
};
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
index 43b2e4bb2c..454a53b9a4 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
@@ -561,12 +561,13 @@ private:
const bool ExternalAlloc; // obsolete, will be removed after YQL-13977
};
-class TComputationGraph : public IComputationGraph {
+class TComputationGraph final : public IComputationGraph {
public:
- TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts)
+ TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, bool executeLLVM = false)
: PatternNodes(patternNodes)
, MemInfo(MakeIntrusive<TMemoryUsageInfo>("ComputationGraph"))
, CompOpts(compOpts)
+ , ExecuteLLVM(executeLLVM)
{
#ifndef NDEBUG
CompOpts.AllocState.ActiveMemInfo.emplace(MemInfo.Get(), MemInfo);
@@ -597,8 +598,8 @@ public:
CompOpts,
PatternNodes->GetMutables(),
//*ArrowMemoryPool
- *arrow::default_memory_pool()
- ));
+ *arrow::default_memory_pool()));
+ Ctx->ExecuteLLVM = ExecuteLLVM;
ValueBuilder->SetCalleePositionHolder(Ctx->CalleePosition);
for (auto& node : PatternNodes->GetNodes()) {
node->InitNode(*Ctx);
@@ -711,6 +712,7 @@ public:
bool SetExecuteLLVM(bool value) override {
const bool old = Ctx->ExecuteLLVM;
Ctx->ExecuteLLVM = value;
+ ExecuteLLVM = value;
return old;
}
@@ -761,12 +763,11 @@ private:
THolder<TComputationContext> Ctx;
TComputationOptsFull CompOpts;
bool IsPrepared = false;
+ bool ExecuteLLVM = false;
std::optional<TArrowKernelsTopology> KernelsTopology;
};
-} // namespace
-
-class TComputationPatternImpl : public IComputationPattern {
+class TComputationPatternImpl final : public IComputationPattern {
public:
TComputationPatternImpl(THolder<TComputationGraphBuildingVisitor>&& builder, const TComputationPatternOpts& opts)
#if defined(MKQL_DISABLE_CODEGEN)
@@ -782,114 +783,141 @@ public:
node->PrepareStageOne();
for (const auto& node : nodes)
node->PrepareStageTwo();
+
MKQL_ADD_STAT(opts.Stats, Mkql_TotalNodes, nodes.size());
-#ifndef MKQL_DISABLE_CODEGEN
+ PatternNodes = builder->GetPatternNodes();
+
if (Codegen) {
- TStatTimer timerFull(CodeGen_FullTime);
- timerFull.Acquire();
- bool hasCode = false;
- {
- TStatTimer timerGen(CodeGen_GenerateTime);
- timerGen.Acquire();
- for (auto it = nodes.crbegin(); nodes.crend() != it; ++it) {
- if (const auto codegen = dynamic_cast<ICodegeneratorRootNode*>(it->Get())) {
- try {
- codegen->GenerateFunctions(Codegen);
- hasCode = true;
- } catch (const TNoCodegen&) {
- hasCode = false;
- break;
- }
+ Compile(opts.OptLLVM, opts.Stats);
+ }
+ }
+
+ ~TComputationPatternImpl() {
+ if (TypeEnv) {
+ auto guard = TypeEnv->BindAllocator();
+ PatternNodes.Reset();
+ }
+ }
+
+ void Compile(TString optLLVM, IStatsRegistry* stats) {
+ if (IsPatternCompiled.load())
+ return;
+
+#ifndef MKQL_DISABLE_CODEGEN
+ if (!Codegen)
+ Codegen = NYql::NCodegen::ICodegen::Make(NYql::NCodegen::ETarget::Native);
+
+ const auto& nodes = PatternNodes->GetNodes();
+
+ TStatTimer timerFull(CodeGen_FullTime);
+ timerFull.Acquire();
+ bool hasCode = false;
+ {
+ TStatTimer timerGen(CodeGen_GenerateTime);
+ timerGen.Acquire();
+ for (auto it = nodes.crbegin(); nodes.crend() != it; ++it) {
+ if (const auto codegen = dynamic_cast<ICodegeneratorRootNode*>(it->Get())) {
+ try {
+ codegen->GenerateFunctions(Codegen);
+ hasCode = true;
+ } catch (const TNoCodegen&) {
+ hasCode = false;
+ break;
}
}
- timerGen.Release();
- timerGen.Report(opts.Stats);
}
+ timerGen.Release();
+ timerGen.Report(stats);
+ }
- if (hasCode) {
- if (opts.OptLLVM.Contains("--dump-generated")) {
- Cerr << "############### Begin generated module ###############" << Endl;
- Codegen->GetModule().print(llvm::errs(), nullptr);
- Cerr << "################ End generated module ################" << Endl;
- }
+ if (hasCode) {
+ if (optLLVM.Contains("--dump-generated")) {
+ Cerr << "############### Begin generated module ###############" << Endl;
+ Codegen->GetModule().print(llvm::errs(), nullptr);
+ Cerr << "################ End generated module ################" << Endl;
+ }
- TStatTimer timerComp(CodeGen_CompileTime);
- timerComp.Acquire();
-
- NYql::NCodegen::TCodegenStats codegenStats;
- Codegen->GetStats(codegenStats);
- MKQL_ADD_STAT(opts.Stats, CodeGen_TotalFunctions, codegenStats.TotalFunctions);
- MKQL_ADD_STAT(opts.Stats, CodeGen_TotalInstructions, codegenStats.TotalInstructions);
- MKQL_SET_MAX_STAT(opts.Stats, CodeGen_MaxFunctionInstructions, codegenStats.MaxFunctionInstructions);
- if (opts.OptLLVM.Contains("--dump-stats")) {
- Cerr << "TotalFunctions: " << codegenStats.TotalFunctions << Endl;
- Cerr << "TotalInstructions: " << codegenStats.TotalInstructions << Endl;
- Cerr << "MaxFunctionInstructions: " << codegenStats.MaxFunctionInstructions << Endl;
- }
+ TStatTimer timerComp(CodeGen_CompileTime);
+ timerComp.Acquire();
+
+ NYql::NCodegen::TCodegenStats codegenStats;
+ Codegen->GetStats(codegenStats);
+ MKQL_ADD_STAT(stats, CodeGen_TotalFunctions, codegenStats.TotalFunctions);
+ MKQL_ADD_STAT(stats, CodeGen_TotalInstructions, codegenStats.TotalInstructions);
+ MKQL_SET_MAX_STAT(stats, CodeGen_MaxFunctionInstructions, codegenStats.MaxFunctionInstructions);
+ if (optLLVM.Contains("--dump-stats")) {
+ Cerr << "TotalFunctions: " << codegenStats.TotalFunctions << Endl;
+ Cerr << "TotalInstructions: " << codegenStats.TotalInstructions << Endl;
+ Cerr << "MaxFunctionInstructions: " << codegenStats.MaxFunctionInstructions << Endl;
+ }
- if (opts.OptLLVM.Contains("--dump-perf-map")) {
- Codegen->TogglePerfJITEventListener();
- }
+ if (optLLVM.Contains("--dump-perf-map")) {
+ Codegen->TogglePerfJITEventListener();
+ }
- if (codegenStats.TotalFunctions >= TotalFunctionsLimit ||
- codegenStats.TotalInstructions >= TotalInstructionsLimit ||
- codegenStats.MaxFunctionInstructions >= MaxFunctionInstructionsLimit) {
- Codegen.reset();
- } else {
- Codegen->Verify();
- NYql::NCodegen::TCompileStats compileStats;
- Codegen->Compile(GetCompileOptions(opts.OptLLVM), &compileStats);
- MKQL_ADD_STAT(opts.Stats, CodeGen_ModulePassTime, compileStats.ModulePassTime);
- MKQL_ADD_STAT(opts.Stats, CodeGen_FinalizeTime, compileStats.FinalizeTime);
- }
+ if (codegenStats.TotalFunctions >= TotalFunctionsLimit ||
+ codegenStats.TotalInstructions >= TotalInstructionsLimit ||
+ codegenStats.MaxFunctionInstructions >= MaxFunctionInstructionsLimit) {
+ Codegen.reset();
+ } else {
+ Codegen->Verify();
+ NYql::NCodegen::TCompileStats compileStats;
+ Codegen->Compile(GetCompileOptions(optLLVM), &compileStats);
- timerComp.Release();
- timerComp.Report(opts.Stats);
+ MKQL_ADD_STAT(stats, CodeGen_ModulePassTime, compileStats.ModulePassTime);
+ MKQL_ADD_STAT(stats, CodeGen_FinalizeTime, compileStats.FinalizeTime);
+ }
- if (Codegen) {
- if (opts.OptLLVM.Contains("--dump-compiled")) {
- Cerr << "############### Begin compiled module ###############" << Endl;
- Codegen->GetModule().print(llvm::errs(), nullptr);
- Cerr << "################ End compiled module ################" << Endl;
- }
+ timerComp.Release();
+ timerComp.Report(stats);
- if (opts.OptLLVM.Contains("--asm-compiled")) {
- Cerr << "############### Begin compiled asm ###############" << Endl;
- Codegen->ShowGeneratedFunctions(&Cerr);
- Cerr << "################ End compiled asm ################" << Endl;
- }
+ if (Codegen) {
+ if (optLLVM.Contains("--dump-compiled")) {
+ Cerr << "############### Begin compiled module ###############" << Endl;
+ Codegen->GetModule().print(llvm::errs(), nullptr);
+ Cerr << "################ End compiled module ################" << Endl;
+ }
- auto count = 0U;
- for (const auto& node : nodes) {
- if (const auto codegen = dynamic_cast<ICodegeneratorRootNode*>(node.Get())) {
- codegen->FinalizeFunctions(Codegen);
- ++count;
- }
+ if (optLLVM.Contains("--asm-compiled")) {
+ Cerr << "############### Begin compiled asm ###############" << Endl;
+ Codegen->ShowGeneratedFunctions(&Cerr);
+ Cerr << "################ End compiled asm ################" << Endl;
+ }
+
+ ui64 count = 0U;
+ for (const auto& node : nodes) {
+ if (const auto codegen = dynamic_cast<ICodegeneratorRootNode*>(node.Get())) {
+ codegen->FinalizeFunctions(Codegen);
+ ++count;
}
+ }
- if (count)
- MKQL_ADD_STAT(opts.Stats, Mkql_CodegenFunctions, count);
+ if (count) {
+ MKQL_ADD_STAT(stats, Mkql_CodegenFunctions, count);
}
}
-
- timerFull.Release();
- timerFull.Report(opts.Stats);
}
+
+ timerFull.Release();
+ timerFull.Report(stats);
+
+ IsPatternCompiled.store(true);
#endif
- PatternNodes = builder->GetPatternNodes();
}
- ~TComputationPatternImpl() {
- if (TypeEnv) {
- auto guard = TypeEnv->BindAllocator();
- PatternNodes.Reset();
- }
+ bool IsCompiled() const {
+ return IsPatternCompiled.load();
}
- void SetTypeEnv(TTypeEnvironment* typeEnv) {
- TypeEnv = typeEnv;
+ THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) {
+ return MakeHolder<TComputationGraph>(PatternNodes, compOpts, IsPatternCompiled.load());
}
+ bool GetSuitableForCache() const {
+ return PatternNodes->GetSuitableForCache();
+ }
+
+private:
TStringBuf GetCompileOptions(const TString& s) {
const TString flag = "--compile-options";
auto lpos = s.rfind(flag);
@@ -903,18 +931,10 @@ public:
return TStringBuf(s, lpos, rpos - lpos);
};
- THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) final {
- return MakeHolder<TComputationGraph>(PatternNodes, compOpts);
- }
-
- bool GetSuitableForCache() const final {
- return PatternNodes->GetSuitableForCache();
- }
-
-private:
TTypeEnvironment* TypeEnv = nullptr;
TPatternNodes::TPtr PatternNodes;
NYql::NCodegen::ICodegen::TPtr Codegen;
+ std::atomic<bool> IsPatternCompiled = false;
};
@@ -962,6 +982,8 @@ TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNode
return MakeIntrusive<TComputationPatternImpl>(std::move(builder), opts);
}
+} // namespace
+
IComputationPattern::TPtr MakeComputationPattern(TExploringNodeVisitor& explorer, const TRuntimeNode& root,
const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) {
return MakeComputationPatternImpl(explorer, root, entryPoints, opts);
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
new file mode 100644
index 0000000000..a50c72e686
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp
@@ -0,0 +1,145 @@
+#include "mkql_computation_pattern_cache.h"
+
+namespace NKikimr::NMiniKQL {
+
+TComputationPatternLRUCache::TComputationPatternLRUCache(TComputationPatternLRUCache::Config configuration, NMonitoring::TDynamicCounterPtr counters)
+ : Cache(CacheMaxElementsSize)
+ , Configuration(configuration)
+ , Hits(counters->GetCounter("PatternCache/Hits", true))
+ , Waits(counters->GetCounter("PatternCache/Waits", true))
+ , Misses(counters->GetCounter("PatternCache/Misses", true))
+ , NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true))
+ , SizeItems(counters->GetCounter("PatternCache/SizeItems", false))
+ , SizeBytes(counters->GetCounter("PatternCache/SizeBytes", false))
+ , MaxSizeBytesCounter(counters->GetCounter("PatternCache/MaxSizeBytes", false))
+{
+ *MaxSizeBytesCounter = Configuration.MaxSizeBytes;
+}
+
+std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serialized) {
+ std::lock_guard<std::mutex> lock(Mutex);
+ if (auto it = Cache.Find(serialized); it != Cache.End()) {
+ ++*Hits;
+ return *it;
+ }
+
+ ++*Misses;
+ return {};
+}
+
+TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serialized) {
+ std::lock_guard lock(Mutex);
+ if (auto it = Cache.Find(serialized); it != Cache.End()) {
+ ++*Hits;
+ AccessPattern(serialized, *it);
+ return TTicket(serialized, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr);
+ }
+
+ auto [notifyIt, isNew] = Notify.emplace(serialized, Nothing());
+ if (isNew) {
+ ++*Misses;
+ return TTicket(serialized, true, {}, this);
+ }
+
+ ++*Waits;
+ auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>();
+ auto& subscribers = notifyIt->second;
+ if (!subscribers) {
+ subscribers.ConstructInPlace();
+ }
+
+ subscribers->push_back(promise);
+ return TTicket(serialized, false, promise, nullptr);
+}
+
+void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
+ TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
+ {
+ std::lock_guard<std::mutex> lock(Mutex);
+ auto notifyIt = Notify.find(serialized);
+ if (notifyIt != Notify.end()) {
+ subscribers.swap(notifyIt->second);
+ Notify.erase(notifyIt);
+ }
+ }
+
+ if (subscribers) {
+ for (auto& subscriber : *subscribers) {
+ subscriber.SetValue(nullptr);
+ }
+ }
+}
+
+void TComputationPatternLRUCache::EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
+ Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern);
+ TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
+ {
+ std::lock_guard<std::mutex> lock(Mutex);
+ // normally remove only one old cache entry by iteration to prevent bursts
+ if (CurrentSizeBytes > Configuration.MaxSizeBytes) {
+ RemoveOldest();
+ }
+ // to prevent huge memory overusage remove as much as needed
+ while (CurrentSizeBytes > 2 * Configuration.MaxSizeBytes) {
+ RemoveOldest();
+ }
+
+ patternWithEnv->UpdateSizeForCache();
+ CurrentSizeBytes += patternWithEnv->SizeForCache;
+
+ Cache.Insert(serialized, patternWithEnv);
+ patternWithEnv->IsInCache.store(true);
+ auto notifyIt = Notify.find(serialized);
+ if (notifyIt != Notify.end()) {
+ subscribers.swap(notifyIt->second);
+ Notify.erase(notifyIt);
+ }
+
+ *SizeItems = Cache.Size();
+ *SizeBytes = CurrentSizeBytes;
+ }
+
+ if (subscribers) {
+ for (auto& subscriber : *subscribers) {
+ subscriber.SetValue(patternWithEnv);
+ }
+ }
+}
+
+void TComputationPatternLRUCache::CleanCache() {
+ *SizeItems = 0;
+ *SizeBytes = 0;
+ *MaxSizeBytesCounter = 0;
+ std::lock_guard lock(Mutex);
+ CurrentSizeBytes = 0;
+ PatternsToCompile.clear();
+ auto CacheEnd = Cache.End();
+ for (auto PatternIt = Cache.Begin(); PatternIt != CacheEnd; ++PatternIt) {
+ const_cast<std::shared_ptr<TPatternCacheEntry> &>(PatternIt.Value())->IsInCache.store(false);
+ }
+ Cache.Clear();
+}
+
+void TComputationPatternLRUCache::RemoveOldest() {
+ auto oldest = Cache.FindOldest();
+ Y_VERIFY_DEBUG(oldest != Cache.End());
+ CurrentSizeBytes -= oldest.Value()->SizeForCache;
+ oldest.Value()->IsInCache.store(false);
+ PatternsToCompile.erase(oldest.Key());
+ Cache.Erase(oldest);
+}
+
+void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
+ if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) {
+ return;
+ }
+
+ size_t PatternAccessTimes = entry->AccessTimes.fetch_add(1) + 1;
+ if (PatternAccessTimes == *Configuration.PatternAccessTimesBeforeTryToCompile ||
+ (*Configuration.PatternAccessTimesBeforeTryToCompile == 0 && PatternAccessTimes == 1)) {
+ PatternsToCompile.emplace(serializedProgram, entry);
+ }
+}
+
+
+} // namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
index 1cb848bb93..86ffa65963 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h
@@ -27,6 +27,8 @@ struct TPatternCacheEntry {
TStructType* ParamsStruct;
IComputationPattern::TPtr Pattern;
size_t SizeForCache = 0; // set only by cache to lock the size, which can slightly vary when pattern is used
+ std::atomic<size_t> AccessTimes = 0; // set only by cache
+ std::atomic<bool> IsInCache = false; // set only by cache
void UpdateSizeForCache() {
Y_VERIFY_DEBUG(!SizeForCache);
@@ -50,22 +52,8 @@ struct TPatternCacheEntry {
}
}
};
-class TComputationPatternLRUCache {
- mutable std::mutex Mutex;
-
- THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
- TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache;
- size_t CurrentSizeBytes = 0;
- const size_t MaxSizeBytes = 0;
-public:
- NMonitoring::TDynamicCounters::TCounterPtr Hits;
- NMonitoring::TDynamicCounters::TCounterPtr Waits;
- NMonitoring::TDynamicCounters::TCounterPtr Misses;
- NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern;
- NMonitoring::TDynamicCounters::TCounterPtr SizeItems;
- NMonitoring::TDynamicCounters::TCounterPtr SizeBytes;
- NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter;
+class TComputationPatternLRUCache {
public:
class TTicket : private TNonCopyable {
public:
@@ -102,138 +90,63 @@ public:
TComputationPatternLRUCache* Cache;
};
- TComputationPatternLRUCache(size_t sizeBytes, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>())
- : Cache(10000)
- , MaxSizeBytes(sizeBytes)
- , Hits(counters->GetCounter("PatternCache/Hits", true))
- , Waits(counters->GetCounter("PatternCache/Waits", true))
- , Misses(counters->GetCounter("PatternCache/Misses", true))
- , NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true))
- , SizeItems(counters->GetCounter("PatternCache/SizeItems", false))
- , SizeBytes(counters->GetCounter("PatternCache/SizeBytes", false))
- , MaxSizeBytesCounter(counters->GetCounter("PatternCache/MaxSizeBytes", false))
- {
- *MaxSizeBytesCounter = MaxSizeBytes;
- }
+ struct Config {
+ Config(size_t maxSizeBytes)
+ : MaxSizeBytes(maxSizeBytes)
+ {}
- static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) {
- return std::make_shared<TPatternCacheEntry>(useAlloc);
- }
+ Config(size_t maxSizeBytes, size_t patternAccessTimesBeforeTryToCompile)
+ : MaxSizeBytes(maxSizeBytes)
+ , PatternAccessTimesBeforeTryToCompile(patternAccessTimesBeforeTryToCompile)
+ {}
- std::shared_ptr<TPatternCacheEntry> Find(const TString& serialized) {
- auto guard = std::scoped_lock<std::mutex>(Mutex);
- if (auto it = Cache.Find(serialized); it != Cache.End()) {
- ++*Hits;
- return *it;
- } else {
- ++*Misses;
- return {};
- }
- }
+ size_t MaxSizeBytes = 0;
+ std::optional<size_t> PatternAccessTimesBeforeTryToCompile;
- TTicket FindOrSubscribe(const TString& serialized) {
- auto guard = std::scoped_lock<std::mutex>(Mutex);
- if (auto it = Cache.Find(serialized); it != Cache.End()) {
- ++*Hits;
- return TTicket(serialized, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr);
+ bool operator==(const Config & rhs) {
+ return std::tie(MaxSizeBytes, PatternAccessTimesBeforeTryToCompile) ==
+ std::tie(rhs.MaxSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile);
}
- auto [notifyIt, isNew] = Notify.emplace(serialized, Nothing());
- if (isNew) {
- ++*Misses;
- return TTicket(serialized, true, {}, this);
+ bool operator!=(const Config & rhs) {
+ return !(*this == rhs);
}
+ };
- ++*Waits;
- auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>();
- auto& subscribers = Notify[serialized];
- if (!subscribers) {
- subscribers.ConstructInPlace();
- }
+ TComputationPatternLRUCache(Config configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>());
- subscribers->push_back(promise);
- return TTicket(serialized, false, promise, nullptr);
+ ~TComputationPatternLRUCache() {
+ CleanCache();
}
- void RemoveOldest() {
- auto oldest = Cache.FindOldest();
- Y_VERIFY_DEBUG(oldest != Cache.End());
- CurrentSizeBytes -= oldest.Value()->SizeForCache;
- Cache.Erase(oldest);
+ static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) {
+ return std::make_shared<TPatternCacheEntry>(useAlloc);
}
- void NotifyMissing(const TString& serialized) {
- TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
- {
- auto guard = std::scoped_lock<std::mutex>(Mutex);
- auto notifyIt = Notify.find(serialized);
- if (notifyIt != Notify.end()) {
- subscribers.swap(notifyIt->second);
- Notify.erase(notifyIt);
- }
- }
+ std::shared_ptr<TPatternCacheEntry> Find(const TString& serialized);
- if (subscribers) {
- for (auto& s : *subscribers) {
- s.SetValue(nullptr);
- }
- }
- }
+ TTicket FindOrSubscribe(const TString& serialized);
- void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
- Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern);
- TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
- {
- auto guard = std::scoped_lock<std::mutex>(Mutex);
- // normally remove only one old cache entry by iteration to prevent bursts
- if (CurrentSizeBytes > MaxSizeBytes) {
- RemoveOldest();
- }
- // to prevent huge memory overusage remove as much as needed
- while (CurrentSizeBytes > 2 * MaxSizeBytes) {
- RemoveOldest();
- }
-
- patternWithEnv->UpdateSizeForCache();
- CurrentSizeBytes += patternWithEnv->SizeForCache;
-
- Cache.Insert(serialized, patternWithEnv);
- auto notifyIt = Notify.find(serialized);
- if (notifyIt != Notify.end()) {
- subscribers.swap(notifyIt->second);
- Notify.erase(notifyIt);
- }
-
- *SizeItems = Cache.Size();
- *SizeBytes = CurrentSizeBytes;
- }
+ void NotifyMissing(const TString& serialized);
- if (subscribers) {
- for (auto& s : *subscribers) {
- s.SetValue(patternWithEnv);
- }
- }
- }
+ void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
- void CleanCache() {
- *SizeItems = 0;
- *SizeBytes = 0;
- *MaxSizeBytesCounter = 0;
- auto guard = std::scoped_lock<std::mutex>(Mutex);
- CurrentSizeBytes = 0;
- Cache.Clear();
- }
+ void CleanCache();
size_t GetSize() const {
- auto guard = std::scoped_lock<std::mutex>(Mutex);
+ std::lock_guard lock(Mutex);
return Cache.Size();
}
- size_t GetMaxSize() const {
- auto guard = std::scoped_lock<std::mutex>(Mutex);
- return MaxSizeBytes;
+ Config GetConfiguration() const {
+ std::lock_guard lock(Mutex);
+ return Configuration;
}
+ size_t GetMaxSizeBytes() const {
+ std::lock_guard lock(Mutex);
+ return Configuration.MaxSizeBytes;
+ }
i64 GetCacheHits() const {
return *Hits;
@@ -243,9 +156,38 @@ public:
++*NotSuitablePattern;
}
- ~TComputationPatternLRUCache() {
- CleanCache();
+ size_t GetPatternsToCompileSize() const {
+ std::lock_guard lock(Mutex);
+ return PatternsToCompile.size();
+ }
+
+ void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) {
+ std::lock_guard lock(Mutex);
+ result.swap(PatternsToCompile);
}
+
+private:
+ void RemoveOldest();
+
+ void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
+
+ static constexpr size_t CacheMaxElementsSize = 10000;
+
+ mutable std::mutex Mutex;
+ THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
+ TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache;
+ size_t CurrentSizeBytes = 0;
+ THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
+
+ const Config Configuration;
+
+ NMonitoring::TDynamicCounters::TCounterPtr Hits;
+ NMonitoring::TDynamicCounters::TCounterPtr Waits;
+ NMonitoring::TDynamicCounters::TCounterPtr Misses;
+ NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern;
+ NMonitoring::TDynamicCounters::TCounterPtr SizeItems;
+ NMonitoring::TDynamicCounters::TCounterPtr SizeBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter;
};
} // namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/ya.make.inc b/ydb/library/yql/minikql/computation/ya.make.inc
index be501a3097..d8c67bba61 100644
--- a/ydb/library/yql/minikql/computation/ya.make.inc
+++ b/ydb/library/yql/minikql/computation/ya.make.inc
@@ -16,6 +16,7 @@ SRCS(
mkql_custom_list.cpp
mkql_validate.cpp
mkql_value_builder.cpp
+ mkql_computation_pattern_cache.cpp
presort.cpp
)