diff options
author | Maksim Kita <kitaetoya@gmail.com> | 2023-08-14 17:19:17 +0300 |
---|---|---|
committer | maksim-kita <maksim-kita@yandex-team.com> | 2023-08-15 00:17:35 +0300 |
commit | 226a03bca496b97f4e309091b41e2f92a47cc17a (patch) | |
tree | 6a8928176cbac656734e56385e97ad46d0912801 | |
parent | 87b28fbd7cfe91a115f8c5998cb30c325f75858b (diff) | |
download | ydb-226a03bca496b97f4e309091b41e2f92a47cc17a.tar.gz |
JIT enable by default
JIT enable by default
Pull Request resolved: #311
24 files changed, 593 insertions, 267 deletions
diff --git a/ydb/core/kqp/common/simple/services.h b/ydb/core/kqp/common/simple/services.h index 3d598c95d6..80002d604f 100644 --- a/ydb/core/kqp/common/simple/services.h +++ b/ydb/core/kqp/common/simple/services.h @@ -36,4 +36,9 @@ inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) { return NActors::TActorId(nodeId, TStringBuf(name, 12)); } +inline NActors::TActorId MakeKqpCompileComputationPatternServiceID(ui32 nodeId) { + const char name[12] = "kqp_comp_cp"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt index b3ac2e0be9..647bd7bc53 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt @@ -22,4 +22,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp ) diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt index e786b9cc55..d02df338a9 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt @@ -23,4 +23,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp ) diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt index e786b9cc55..d02df338a9 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt @@ -23,4 +23,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp ) diff --git a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt index b3ac2e0be9..647bd7bc53 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt @@ -22,4 +22,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp ) diff --git a/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp new file mode 100644 index 0000000000..b41d4bf6d6 --- /dev/null +++ b/ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp @@ -0,0 +1,130 @@ +#include "kqp_compile_service.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/events.h> + +#include <ydb/library/aclib/aclib.h> + +#include <ydb/core/kqp/rm_service/kqp_rm_service.h> + +namespace NKikimr { +namespace NKqp { + +namespace { + +class TKqpCompileComputationPatternService : public TActorBootstrapped<TKqpCompileComputationPatternService> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::KQP_COMPILE_COMPUTATION_PATTERN_SERVICE; + } + + TKqpCompileComputationPatternService(const NKikimrConfig::TTableServiceConfig_TCompileComputationPatternServiceConfig& config, + TIntrusivePtr<TKqpCounters> counters) + : WakeupInterval(TDuration::MilliSeconds(config.GetWakeupIntervalMs())) + , Counters(std::move(counters)) + {} + + void Bootstrap(const TActorContext& ctx) { + Become(&TKqpCompileComputationPatternService::MainState); + ScheduleWakeup(ctx); + } +private: + STFUNC(MainState) { + switch (ev->GetTypeRewrite()) { + CFunc(TEvents::TSystem::Wakeup, HandleWakeup); + cFunc(TEvents::TEvPoison::EventType, PassAway); + default: + Y_FAIL("TKqpCompileComputationPatternService: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + + void HandleWakeup(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_COMPUTATION_PATTERN_SERVICE, "Received wakeup"); + auto patternCache = GetKqpResourceManager()->GetPatternCache(); + if (!patternCache) { + ScheduleWakeup(ctx); + return; + } + + LoadPatternsToCompileIfNeeded(patternCache); + + TSimpleTimer timer; + i64 compilationIntervalMs = MaxCompilationIntervalMs; + + size_t patternsToCompileSize = PatternsToCompile.size(); + for (; PatternToCompileIndex < patternsToCompileSize && compilationIntervalMs > 0; ++PatternToCompileIndex) { + auto & entry = PatternsToCompile[PatternToCompileIndex]; + if (!entry->IsInCache.load()) { + continue; + } + + timer.Reset(); + + entry->Pattern->Compile({}, nullptr); + Counters->CompiledComputationPatterns->Inc(); + + entry = nullptr; + + compilationIntervalMs -= static_cast<i64>(timer.Get().MilliSeconds()); + } + + if (PatternToCompileIndex == patternsToCompileSize) { + PatternsToCompile.clear(); + } + + ScheduleWakeup(ctx); + } + +private: + void LoadPatternsToCompileIfNeeded(std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> & patternCache) { + if (PatternsToCompile.size() != 0) { + return; + } + + THashMap<TString, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile; + patternCache->GetPatternsToCompile(patternsToCompile); + + TVector<std::pair<std::shared_ptr<NMiniKQL::TPatternCacheEntry>, size_t>> patternsToCompileWithAccessSize; + for (auto & [_, entry] : patternsToCompile) { + patternsToCompileWithAccessSize.emplace_back(entry, entry->AccessTimes.load()); + } + + std::sort(patternsToCompileWithAccessSize.begin(), patternsToCompileWithAccessSize.end(), [](auto & lhs, auto & rhs) { + return lhs.second > rhs.second; + }); + + PatternsToCompile.reserve(patternsToCompileWithAccessSize.size()); + for (auto & [entry, _] : patternsToCompileWithAccessSize) { + PatternsToCompile.push_back(entry); + } + + *Counters->CompileQueueSize = PatternsToCompile.size(); + PatternToCompileIndex = 0; + } + + void ScheduleWakeup(const TActorContext& ctx) { + ctx.Schedule(WakeupInterval, new TEvents::TEvWakeup()); + } + + static constexpr i64 MaxCompilationIntervalMs = 300; + + TDuration WakeupInterval; + TIntrusivePtr<TKqpCounters> Counters; + + using PatternsToCompileContainer = TVector<std::shared_ptr<NMiniKQL::TPatternCacheEntry>>; + using PatternsToCompileContainerIterator = PatternsToCompileContainer::iterator; + PatternsToCompileContainer PatternsToCompile; + size_t PatternToCompileIndex = 0; +}; + +} + +IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableServiceConfig& serviceConfig, + TIntrusivePtr<TKqpCounters> counters) +{ + return new TKqpCompileComputationPatternService(serviceConfig.GetCompileComputationPatternServiceConfig(), std::move(counters)); +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index f4dcfe7b8a..e351185d23 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -14,6 +14,9 @@ IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& servic NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NYql::IHTTPGateway::TPtr httpGateway); +IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableServiceConfig& serviceConfig, + TIntrusivePtr<TKqpCounters> counters); + IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const NKikimrConfig::TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, diff --git a/ydb/core/kqp/compile_service/ya.make b/ydb/core/kqp/compile_service/ya.make index 3fadfdad44..8eb0152042 100644 --- a/ydb/core/kqp/compile_service/ya.make +++ b/ydb/core/kqp/compile_service/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( kqp_compile_actor.cpp kqp_compile_service.cpp + kqp_compile_computation_pattern_service.cpp ) PEERDIR( diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 3dd3544763..fdf85085c8 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -753,6 +753,10 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co CompileQueueSize = KqpGroup->GetCounter("Compilation/QueueSize", false); + /* Compile computation pattern service */ + CompiledComputationPatterns = KqpGroup->GetCounter("ComputationPatternCompilation/CompiledComputationPatterns"); + CompileComputationPatternsQueueSize = KqpGroup->GetCounter("ComputationPatternCompilation/CompileComputationPatternsQueueSize"); + /* Resource Manager */ RmComputeActors = KqpGroup->GetCounter("RM/ComputeActors", false); RmMemory = KqpGroup->GetCounter("RM/Memory", false); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 4be02be974..d638e706d7 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -352,6 +352,10 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr CompileQueryCacheEvicted; ::NMonitoring::TDynamicCounters::TCounterPtr CompileQueueSize; + // Compile computation pattern service + ::NMonitoring::TDynamicCounters::TCounterPtr CompiledComputationPatterns; + ::NMonitoring::TDynamicCounters::TCounterPtr CompileComputationPatternsQueueSize; + // Resource Manager ::NMonitoring::TDynamicCounters::TCounterPtr RmComputeActors; ::NMonitoring::TDynamicCounters::TCounterPtr RmMemory; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 3734154d32..b6e083b14b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -185,7 +185,7 @@ public: , KqpProxySharedResources(std::move(kqpProxySharedResources)) {} - void Bootstrap() { + void Bootstrap(const TActorContext &ctx) { if (TokenAccessorConfig.GetEnabled()) { TString caContent; if (const auto& path = TokenAccessorConfig.GetSslCaCert()) { @@ -241,6 +241,14 @@ public: TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpCompileServiceID(SelfId().NodeId()), CompileService); + if (TableServiceConfig.GetEnableAsyncComputationPatternCompilation()) { + IActor* ComputationPatternServiceActor = CreateKqpCompileComputationPatternService(TableServiceConfig, Counters); + ui32 batchPoolId = AppData(ctx)->BatchPoolId; + CompileComputationPatternService = ctx.Register(ComputationPatternServiceActor, TMailboxType::HTSwap, batchPoolId); + TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( + MakeKqpCompileComputationPatternServiceID(SelfId().NodeId()), CompileComputationPatternService); + } + KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, nullptr, AsyncIoFactory)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpNodeServiceID(SelfId().NodeId()), KqpNodeService); @@ -423,6 +431,11 @@ public: void PassAway() override { Send(CompileService, new TEvents::TEvPoisonPill()); + + if (TableServiceConfig.GetEnableAsyncComputationPatternCompilation()) { + Send(CompileComputationPatternService, new TEvents::TEvPoisonPill()); + } + Send(SpillingService, new TEvents::TEvPoison); Send(KqpNodeService, new TEvents::TEvPoison); if (BoardPublishActor) { @@ -1568,6 +1581,7 @@ private: TActorId BoardLookupActor; TActorId BoardPublishActor; TActorId CompileService; + TActorId CompileComputationPatternService; TActorId KqpNodeService; TActorId SpillingService; TActorId WhiteBoardService; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 743d4e2bc8..14bb7e20a8 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -150,7 +150,7 @@ public: } ActorSystem = actorSystem; SelfId = selfId; - UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes()); + UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(), Config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); if (PublishResourcesByExchanger) { CreateResourceInfoExchanger(Config.GetInfoExchangerSettings()); @@ -619,13 +619,15 @@ public: ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources); } - void UpdatePatternCache(ui64 size) { - if (size) { - if (!PatternCache || PatternCache->GetMaxSize() != size) { - PatternCache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(size, Counters->GetKqpCounters()); - } - } else { + void UpdatePatternCache(ui64 maxSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) { + if (maxSizeBytes == 0) { PatternCache.reset(); + return; + } + + NMiniKQL::TComputationPatternLRUCache::Config config{maxSizeBytes, patternAccessTimesBeforeTryToCompile}; + if (!PatternCache || PatternCache->GetConfiguration() != config) { + PatternCache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(config, Counters->GetKqpCounters()); } } @@ -886,7 +888,7 @@ private: Send(ev->Sender, new NConsole::TEvConsole::TEvConfigNotificationResponse(event), IEventHandle::FlagTrackDelivery, ev->Cookie); auto& config = *event.MutableConfig()->MutableTableServiceConfig()->MutableResourceManager(); - ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes()); + ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(), config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); bool enablePublishResourcesByExchanger = config.GetEnablePublishResourcesByExchanger(); if (enablePublishResourcesByExchanger != PublishResourcesByExchanger) { diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp index 8556a594de..4eed1d3124 100644 --- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp @@ -1,3 +1,4 @@ +#include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <library/cpp/threading/local_executor/local_executor.h> @@ -224,49 +225,78 @@ Y_UNIT_TEST_SUITE(KqpService) { UNIT_ASSERT_VALUES_EQUAL_C(status.GetStatus(), EStatus::SUCCESS, status.GetIssues().ToString()); } - Y_UNIT_TEST_TWIN(PatternCache, UseCache) { + void ConfigureSettings(TKikimrSettings & settings, bool useCache, bool useAsyncPatternCompilation) { + size_t cacheSize = 0; + if (useCache) { + cacheSize = useAsyncPatternCompilation ? 10_MB : 1_MB; + } + + auto * tableServiceConfig = settings.AppConfig.MutableTableServiceConfig(); + tableServiceConfig->MutableResourceManager()->SetKqpPatternCacheCapacityBytes(cacheSize); + tableServiceConfig->SetEnableAsyncComputationPatternCompilation(useAsyncPatternCompilation); + + if (useAsyncPatternCompilation) { + tableServiceConfig->MutableCompileComputationPatternServiceConfig()->SetWakeupIntervalMs(1); + tableServiceConfig->MutableResourceManager()->SetKqpPatternCachePatternAccessTimesBeforeTryToCompile(0); + } + } + + Y_UNIT_TEST_QUAD(PatternCache, UseCache, UseAsyncPatternCompilation) { auto settings = TKikimrSettings() .SetWithSampleTables(false); - size_t cacheSize = UseCache ? 1_MB : 0; - settings.AppConfig.MutableTableServiceConfig()->MutableResourceManager()->SetKqpPatternCacheCapacityBytes(cacheSize); + ConfigureSettings(settings, UseCache, UseAsyncPatternCompilation); + auto kikimr = TKikimrRunner{settings}; auto driver = kikimr.GetDriver(); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + static constexpr i64 AsyncPatternCompilationUniqueRequestsSize = 5; + + auto async_compilation_condition = [&]() { + if constexpr (UseCache && UseAsyncPatternCompilation) { + return *counters.CompiledComputationPatterns < AsyncPatternCompilationUniqueRequestsSize; + } + + return false; + }; + size_t InFlight = 10; NPar::LocalExecutor().RunAdditionalThreads(InFlight); - NPar::LocalExecutor().ExecRange([&driver](int /*id*/) { - TTimer t; + NPar::LocalExecutor().ExecRange([&](int /*id*/) { NYdb::NTable::TTableClient db(driver); auto session = db.CreateSession().GetValueSync().GetSession(); - for (ui32 i = 0; i < 500; ++i) { + + for (ui32 i = 0; i < 500 || async_compilation_condition(); ++i) { + ui32 value = UseCache && UseAsyncPatternCompilation ? i % AsyncPatternCompilationUniqueRequestsSize : i / 5; ui64 total = 100500; TString request = (TStringBuilder() << R"_( $data = AsList( - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("aaa" AS Key,)_" << i / 5 << R"_(u AS Value), - - AsStruct("aaa" AS Key,)_" << total - 10 * (i / 5) << R"_(u AS Value), - - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value), - AsStruct("bbb" AS Key,)_" << i / 5 << R"_(u AS Value) + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + AsStruct("aaa" AS Key,)_" << value << R"_(u AS Value), + + AsStruct("aaa" AS Key,)_" << total - 10 * value << R"_(u AS Value), + + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value), + AsStruct("bbb" AS Key,)_" << value << R"_(u AS Value) ); SELECT * FROM ( @@ -286,6 +316,10 @@ Y_UNIT_TEST_SUITE(KqpService) { CompareYson(R"( [ ["aaa";100500u] ])", FormatResultSetYson(result.GetResultSet(0))); } }, 0, InFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + + if constexpr (UseCache && UseAsyncPatternCompilation) { + UNIT_ASSERT(*counters.CompiledComputationPatterns >= AsyncPatternCompilationUniqueRequestsSize); + } } // YQL-15582 diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c2f6e6681f..aaf6aa9af0 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1142,6 +1142,7 @@ message TTableServiceConfig { optional uint64 KqpPatternCacheCapacityBytes = 17 [default = 104857600]; // 100 MiB, 0 is for disable optional bool EnablePublishResourcesByExchanger = 18 [default = true]; optional TInfoExchangerSettings InfoExchangerSettings = 19; + optional uint64 KqpPatternCachePatternAccessTimesBeforeTryToCompile = 20 [default = 5]; } message TSpillingServiceConfig { @@ -1162,6 +1163,10 @@ message TTableServiceConfig { optional TLocalFileConfig LocalFileConfig = 1; } + message TCompileComputationPatternServiceConfig { + optional uint64 WakeupIntervalMs = 1 [default = 1000]; // 1 sec + } + message TQueryPhaseLimits { optional uint32 AffectedShardsLimit = 1; optional uint32 ReadsetCountLimit = 2; @@ -1329,6 +1334,8 @@ message TTableServiceConfig { optional bool EnableSequentialReads = 39 [default = false]; optional bool EnablePreparedDdl = 42 [default = false]; optional bool EnableSequences = 43 [default = true]; + optional bool EnableAsyncComputationPatternCompilation = 46 [default = false]; + optional TCompileComputationPatternServiceConfig CompileComputationPatternServiceConfig = 47; enum EBindingsMode { BM_ENABLED = 0; diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 193a125c07..4e3d0e0296 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -224,6 +224,7 @@ enum EServiceKikimr { KQP_NODE = 543; KQP_LOAD_TEST = 544; KQP_SESSION = 545; + KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 546; TABLET_RESOURCE_BROKER = 540; @@ -1000,5 +1001,6 @@ message TActivity { PERSQUEUE_READ_QUOTER = 616; KAFKA_PRODUCE_ACTOR = 617; STAT_SERVICE = 618; + KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 619; }; }; diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt index 853b83dac4..91af29586a 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt @@ -51,5 +51,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort.cpp ) diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt index 031693fb3a..8b735265f7 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt @@ -52,5 +52,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort.cpp ) diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt index 031693fb3a..8b735265f7 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt @@ -52,5 +52,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort.cpp ) diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt index 853b83dac4..91af29586a 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt @@ -51,5 +51,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort.cpp ) diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h index 4fbb13a650..25d2c08185 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h @@ -64,7 +64,7 @@ struct TComputationOptsFull: public TComputationOpts { , TimeProvider(timeProvider) , ValidatePolicy(validatePolicy) , SecureParamsProvider(secureParamsProvider) - {} + {} TAllocState& AllocState; TTypeEnvironment* TypeEnv = nullptr; @@ -116,7 +116,7 @@ struct TComputationContextLLVM { struct TComputationContext : public TComputationContextLLVM { IRandomProvider& RandomProvider; ITimeProvider& TimeProvider; - bool ExecuteLLVM = true; + bool ExecuteLLVM = false; arrow::MemoryPool& ArrowMemoryPool; std::vector<NUdf::TUnboxedValue*> WideFields; TTypeEnvironment* TypeEnv = nullptr; @@ -405,6 +405,8 @@ public: typedef TIntrusivePtr<IComputationPattern> TPtr; virtual ~IComputationPattern() = default; + virtual void Compile(TString optLLVM, IStatsRegistry* stats) = 0; + virtual bool IsCompiled() const = 0; virtual THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) = 0; virtual bool GetSuitableForCache() const = 0; }; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp index 43b2e4bb2c..454a53b9a4 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp @@ -561,12 +561,13 @@ private: const bool ExternalAlloc; // obsolete, will be removed after YQL-13977 }; -class TComputationGraph : public IComputationGraph { +class TComputationGraph final : public IComputationGraph { public: - TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts) + TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, bool executeLLVM = false) : PatternNodes(patternNodes) , MemInfo(MakeIntrusive<TMemoryUsageInfo>("ComputationGraph")) , CompOpts(compOpts) + , ExecuteLLVM(executeLLVM) { #ifndef NDEBUG CompOpts.AllocState.ActiveMemInfo.emplace(MemInfo.Get(), MemInfo); @@ -597,8 +598,8 @@ public: CompOpts, PatternNodes->GetMutables(), //*ArrowMemoryPool - *arrow::default_memory_pool() - )); + *arrow::default_memory_pool())); + Ctx->ExecuteLLVM = ExecuteLLVM; ValueBuilder->SetCalleePositionHolder(Ctx->CalleePosition); for (auto& node : PatternNodes->GetNodes()) { node->InitNode(*Ctx); @@ -711,6 +712,7 @@ public: bool SetExecuteLLVM(bool value) override { const bool old = Ctx->ExecuteLLVM; Ctx->ExecuteLLVM = value; + ExecuteLLVM = value; return old; } @@ -761,12 +763,11 @@ private: THolder<TComputationContext> Ctx; TComputationOptsFull CompOpts; bool IsPrepared = false; + bool ExecuteLLVM = false; std::optional<TArrowKernelsTopology> KernelsTopology; }; -} // namespace - -class TComputationPatternImpl : public IComputationPattern { +class TComputationPatternImpl final : public IComputationPattern { public: TComputationPatternImpl(THolder<TComputationGraphBuildingVisitor>&& builder, const TComputationPatternOpts& opts) #if defined(MKQL_DISABLE_CODEGEN) @@ -782,114 +783,141 @@ public: node->PrepareStageOne(); for (const auto& node : nodes) node->PrepareStageTwo(); + MKQL_ADD_STAT(opts.Stats, Mkql_TotalNodes, nodes.size()); -#ifndef MKQL_DISABLE_CODEGEN + PatternNodes = builder->GetPatternNodes(); + if (Codegen) { - TStatTimer timerFull(CodeGen_FullTime); - timerFull.Acquire(); - bool hasCode = false; - { - TStatTimer timerGen(CodeGen_GenerateTime); - timerGen.Acquire(); - for (auto it = nodes.crbegin(); nodes.crend() != it; ++it) { - if (const auto codegen = dynamic_cast<ICodegeneratorRootNode*>(it->Get())) { - try { - codegen->GenerateFunctions(Codegen); - hasCode = true; - } catch (const TNoCodegen&) { - hasCode = false; - break; - } + Compile(opts.OptLLVM, opts.Stats); + } + } + + ~TComputationPatternImpl() { + if (TypeEnv) { + auto guard = TypeEnv->BindAllocator(); + PatternNodes.Reset(); + } + } + + void Compile(TString optLLVM, IStatsRegistry* stats) { + if (IsPatternCompiled.load()) + return; + +#ifndef MKQL_DISABLE_CODEGEN + if (!Codegen) + Codegen = NYql::NCodegen::ICodegen::Make(NYql::NCodegen::ETarget::Native); + + const auto& nodes = PatternNodes->GetNodes(); + + TStatTimer timerFull(CodeGen_FullTime); + timerFull.Acquire(); + bool hasCode = false; + { + TStatTimer timerGen(CodeGen_GenerateTime); + timerGen.Acquire(); + for (auto it = nodes.crbegin(); nodes.crend() != it; ++it) { + if (const auto codegen = dynamic_cast<ICodegeneratorRootNode*>(it->Get())) { + try { + codegen->GenerateFunctions(Codegen); + hasCode = true; + } catch (const TNoCodegen&) { + hasCode = false; + break; } } - timerGen.Release(); - timerGen.Report(opts.Stats); } + timerGen.Release(); + timerGen.Report(stats); + } - if (hasCode) { - if (opts.OptLLVM.Contains("--dump-generated")) { - Cerr << "############### Begin generated module ###############" << Endl; - Codegen->GetModule().print(llvm::errs(), nullptr); - Cerr << "################ End generated module ################" << Endl; - } + if (hasCode) { + if (optLLVM.Contains("--dump-generated")) { + Cerr << "############### Begin generated module ###############" << Endl; + Codegen->GetModule().print(llvm::errs(), nullptr); + Cerr << "################ End generated module ################" << Endl; + } - TStatTimer timerComp(CodeGen_CompileTime); - timerComp.Acquire(); - - NYql::NCodegen::TCodegenStats codegenStats; - Codegen->GetStats(codegenStats); - MKQL_ADD_STAT(opts.Stats, CodeGen_TotalFunctions, codegenStats.TotalFunctions); - MKQL_ADD_STAT(opts.Stats, CodeGen_TotalInstructions, codegenStats.TotalInstructions); - MKQL_SET_MAX_STAT(opts.Stats, CodeGen_MaxFunctionInstructions, codegenStats.MaxFunctionInstructions); - if (opts.OptLLVM.Contains("--dump-stats")) { - Cerr << "TotalFunctions: " << codegenStats.TotalFunctions << Endl; - Cerr << "TotalInstructions: " << codegenStats.TotalInstructions << Endl; - Cerr << "MaxFunctionInstructions: " << codegenStats.MaxFunctionInstructions << Endl; - } + TStatTimer timerComp(CodeGen_CompileTime); + timerComp.Acquire(); + + NYql::NCodegen::TCodegenStats codegenStats; + Codegen->GetStats(codegenStats); + MKQL_ADD_STAT(stats, CodeGen_TotalFunctions, codegenStats.TotalFunctions); + MKQL_ADD_STAT(stats, CodeGen_TotalInstructions, codegenStats.TotalInstructions); + MKQL_SET_MAX_STAT(stats, CodeGen_MaxFunctionInstructions, codegenStats.MaxFunctionInstructions); + if (optLLVM.Contains("--dump-stats")) { + Cerr << "TotalFunctions: " << codegenStats.TotalFunctions << Endl; + Cerr << "TotalInstructions: " << codegenStats.TotalInstructions << Endl; + Cerr << "MaxFunctionInstructions: " << codegenStats.MaxFunctionInstructions << Endl; + } - if (opts.OptLLVM.Contains("--dump-perf-map")) { - Codegen->TogglePerfJITEventListener(); - } + if (optLLVM.Contains("--dump-perf-map")) { + Codegen->TogglePerfJITEventListener(); + } - if (codegenStats.TotalFunctions >= TotalFunctionsLimit || - codegenStats.TotalInstructions >= TotalInstructionsLimit || - codegenStats.MaxFunctionInstructions >= MaxFunctionInstructionsLimit) { - Codegen.reset(); - } else { - Codegen->Verify(); - NYql::NCodegen::TCompileStats compileStats; - Codegen->Compile(GetCompileOptions(opts.OptLLVM), &compileStats); - MKQL_ADD_STAT(opts.Stats, CodeGen_ModulePassTime, compileStats.ModulePassTime); - MKQL_ADD_STAT(opts.Stats, CodeGen_FinalizeTime, compileStats.FinalizeTime); - } + if (codegenStats.TotalFunctions >= TotalFunctionsLimit || + codegenStats.TotalInstructions >= TotalInstructionsLimit || + codegenStats.MaxFunctionInstructions >= MaxFunctionInstructionsLimit) { + Codegen.reset(); + } else { + Codegen->Verify(); + NYql::NCodegen::TCompileStats compileStats; + Codegen->Compile(GetCompileOptions(optLLVM), &compileStats); - timerComp.Release(); - timerComp.Report(opts.Stats); + MKQL_ADD_STAT(stats, CodeGen_ModulePassTime, compileStats.ModulePassTime); + MKQL_ADD_STAT(stats, CodeGen_FinalizeTime, compileStats.FinalizeTime); + } - if (Codegen) { - if (opts.OptLLVM.Contains("--dump-compiled")) { - Cerr << "############### Begin compiled module ###############" << Endl; - Codegen->GetModule().print(llvm::errs(), nullptr); - Cerr << "################ End compiled module ################" << Endl; - } + timerComp.Release(); + timerComp.Report(stats); - if (opts.OptLLVM.Contains("--asm-compiled")) { - Cerr << "############### Begin compiled asm ###############" << Endl; - Codegen->ShowGeneratedFunctions(&Cerr); - Cerr << "################ End compiled asm ################" << Endl; - } + if (Codegen) { + if (optLLVM.Contains("--dump-compiled")) { + Cerr << "############### Begin compiled module ###############" << Endl; + Codegen->GetModule().print(llvm::errs(), nullptr); + Cerr << "################ End compiled module ################" << Endl; + } - auto count = 0U; - for (const auto& node : nodes) { - if (const auto codegen = dynamic_cast<ICodegeneratorRootNode*>(node.Get())) { - codegen->FinalizeFunctions(Codegen); - ++count; - } + if (optLLVM.Contains("--asm-compiled")) { + Cerr << "############### Begin compiled asm ###############" << Endl; + Codegen->ShowGeneratedFunctions(&Cerr); + Cerr << "################ End compiled asm ################" << Endl; + } + + ui64 count = 0U; + for (const auto& node : nodes) { + if (const auto codegen = dynamic_cast<ICodegeneratorRootNode*>(node.Get())) { + codegen->FinalizeFunctions(Codegen); + ++count; } + } - if (count) - MKQL_ADD_STAT(opts.Stats, Mkql_CodegenFunctions, count); + if (count) { + MKQL_ADD_STAT(stats, Mkql_CodegenFunctions, count); } } - - timerFull.Release(); - timerFull.Report(opts.Stats); } + + timerFull.Release(); + timerFull.Report(stats); + + IsPatternCompiled.store(true); #endif - PatternNodes = builder->GetPatternNodes(); } - ~TComputationPatternImpl() { - if (TypeEnv) { - auto guard = TypeEnv->BindAllocator(); - PatternNodes.Reset(); - } + bool IsCompiled() const { + return IsPatternCompiled.load(); } - void SetTypeEnv(TTypeEnvironment* typeEnv) { - TypeEnv = typeEnv; + THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) { + return MakeHolder<TComputationGraph>(PatternNodes, compOpts, IsPatternCompiled.load()); } + bool GetSuitableForCache() const { + return PatternNodes->GetSuitableForCache(); + } + +private: TStringBuf GetCompileOptions(const TString& s) { const TString flag = "--compile-options"; auto lpos = s.rfind(flag); @@ -903,18 +931,10 @@ public: return TStringBuf(s, lpos, rpos - lpos); }; - THolder<IComputationGraph> Clone(const TComputationOptsFull& compOpts) final { - return MakeHolder<TComputationGraph>(PatternNodes, compOpts); - } - - bool GetSuitableForCache() const final { - return PatternNodes->GetSuitableForCache(); - } - -private: TTypeEnvironment* TypeEnv = nullptr; TPatternNodes::TPtr PatternNodes; NYql::NCodegen::ICodegen::TPtr Codegen; + std::atomic<bool> IsPatternCompiled = false; }; @@ -962,6 +982,8 @@ TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNode return MakeIntrusive<TComputationPatternImpl>(std::move(builder), opts); } +} // namespace + IComputationPattern::TPtr MakeComputationPattern(TExploringNodeVisitor& explorer, const TRuntimeNode& root, const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) { return MakeComputationPatternImpl(explorer, root, entryPoints, opts); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp new file mode 100644 index 0000000000..a50c72e686 --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp @@ -0,0 +1,145 @@ +#include "mkql_computation_pattern_cache.h" + +namespace NKikimr::NMiniKQL { + +TComputationPatternLRUCache::TComputationPatternLRUCache(TComputationPatternLRUCache::Config configuration, NMonitoring::TDynamicCounterPtr counters) + : Cache(CacheMaxElementsSize) + , Configuration(configuration) + , Hits(counters->GetCounter("PatternCache/Hits", true)) + , Waits(counters->GetCounter("PatternCache/Waits", true)) + , Misses(counters->GetCounter("PatternCache/Misses", true)) + , NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true)) + , SizeItems(counters->GetCounter("PatternCache/SizeItems", false)) + , SizeBytes(counters->GetCounter("PatternCache/SizeBytes", false)) + , MaxSizeBytesCounter(counters->GetCounter("PatternCache/MaxSizeBytes", false)) +{ + *MaxSizeBytesCounter = Configuration.MaxSizeBytes; +} + +std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serialized) { + std::lock_guard<std::mutex> lock(Mutex); + if (auto it = Cache.Find(serialized); it != Cache.End()) { + ++*Hits; + return *it; + } + + ++*Misses; + return {}; +} + +TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serialized) { + std::lock_guard lock(Mutex); + if (auto it = Cache.Find(serialized); it != Cache.End()) { + ++*Hits; + AccessPattern(serialized, *it); + return TTicket(serialized, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr); + } + + auto [notifyIt, isNew] = Notify.emplace(serialized, Nothing()); + if (isNew) { + ++*Misses; + return TTicket(serialized, true, {}, this); + } + + ++*Waits; + auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>(); + auto& subscribers = notifyIt->second; + if (!subscribers) { + subscribers.ConstructInPlace(); + } + + subscribers->push_back(promise); + return TTicket(serialized, false, promise, nullptr); +} + +void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) { + TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; + { + std::lock_guard<std::mutex> lock(Mutex); + auto notifyIt = Notify.find(serialized); + if (notifyIt != Notify.end()) { + subscribers.swap(notifyIt->second); + Notify.erase(notifyIt); + } + } + + if (subscribers) { + for (auto& subscriber : *subscribers) { + subscriber.SetValue(nullptr); + } + } +} + +void TComputationPatternLRUCache::EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) { + Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern); + TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; + { + std::lock_guard<std::mutex> lock(Mutex); + // normally remove only one old cache entry by iteration to prevent bursts + if (CurrentSizeBytes > Configuration.MaxSizeBytes) { + RemoveOldest(); + } + // to prevent huge memory overusage remove as much as needed + while (CurrentSizeBytes > 2 * Configuration.MaxSizeBytes) { + RemoveOldest(); + } + + patternWithEnv->UpdateSizeForCache(); + CurrentSizeBytes += patternWithEnv->SizeForCache; + + Cache.Insert(serialized, patternWithEnv); + patternWithEnv->IsInCache.store(true); + auto notifyIt = Notify.find(serialized); + if (notifyIt != Notify.end()) { + subscribers.swap(notifyIt->second); + Notify.erase(notifyIt); + } + + *SizeItems = Cache.Size(); + *SizeBytes = CurrentSizeBytes; + } + + if (subscribers) { + for (auto& subscriber : *subscribers) { + subscriber.SetValue(patternWithEnv); + } + } +} + +void TComputationPatternLRUCache::CleanCache() { + *SizeItems = 0; + *SizeBytes = 0; + *MaxSizeBytesCounter = 0; + std::lock_guard lock(Mutex); + CurrentSizeBytes = 0; + PatternsToCompile.clear(); + auto CacheEnd = Cache.End(); + for (auto PatternIt = Cache.Begin(); PatternIt != CacheEnd; ++PatternIt) { + const_cast<std::shared_ptr<TPatternCacheEntry> &>(PatternIt.Value())->IsInCache.store(false); + } + Cache.Clear(); +} + +void TComputationPatternLRUCache::RemoveOldest() { + auto oldest = Cache.FindOldest(); + Y_VERIFY_DEBUG(oldest != Cache.End()); + CurrentSizeBytes -= oldest.Value()->SizeForCache; + oldest.Value()->IsInCache.store(false); + PatternsToCompile.erase(oldest.Key()); + Cache.Erase(oldest); +} + +void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) { + if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) { + return; + } + + size_t PatternAccessTimes = entry->AccessTimes.fetch_add(1) + 1; + if (PatternAccessTimes == *Configuration.PatternAccessTimesBeforeTryToCompile || + (*Configuration.PatternAccessTimesBeforeTryToCompile == 0 && PatternAccessTimes == 1)) { + PatternsToCompile.emplace(serializedProgram, entry); + } +} + + +} // namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h index 1cb848bb93..86ffa65963 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h @@ -27,6 +27,8 @@ struct TPatternCacheEntry { TStructType* ParamsStruct; IComputationPattern::TPtr Pattern; size_t SizeForCache = 0; // set only by cache to lock the size, which can slightly vary when pattern is used + std::atomic<size_t> AccessTimes = 0; // set only by cache + std::atomic<bool> IsInCache = false; // set only by cache void UpdateSizeForCache() { Y_VERIFY_DEBUG(!SizeForCache); @@ -50,22 +52,8 @@ struct TPatternCacheEntry { } } }; -class TComputationPatternLRUCache { - mutable std::mutex Mutex; - - THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify; - TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache; - size_t CurrentSizeBytes = 0; - const size_t MaxSizeBytes = 0; -public: - NMonitoring::TDynamicCounters::TCounterPtr Hits; - NMonitoring::TDynamicCounters::TCounterPtr Waits; - NMonitoring::TDynamicCounters::TCounterPtr Misses; - NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern; - NMonitoring::TDynamicCounters::TCounterPtr SizeItems; - NMonitoring::TDynamicCounters::TCounterPtr SizeBytes; - NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter; +class TComputationPatternLRUCache { public: class TTicket : private TNonCopyable { public: @@ -102,138 +90,63 @@ public: TComputationPatternLRUCache* Cache; }; - TComputationPatternLRUCache(size_t sizeBytes, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>()) - : Cache(10000) - , MaxSizeBytes(sizeBytes) - , Hits(counters->GetCounter("PatternCache/Hits", true)) - , Waits(counters->GetCounter("PatternCache/Waits", true)) - , Misses(counters->GetCounter("PatternCache/Misses", true)) - , NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true)) - , SizeItems(counters->GetCounter("PatternCache/SizeItems", false)) - , SizeBytes(counters->GetCounter("PatternCache/SizeBytes", false)) - , MaxSizeBytesCounter(counters->GetCounter("PatternCache/MaxSizeBytes", false)) - { - *MaxSizeBytesCounter = MaxSizeBytes; - } + struct Config { + Config(size_t maxSizeBytes) + : MaxSizeBytes(maxSizeBytes) + {} - static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) { - return std::make_shared<TPatternCacheEntry>(useAlloc); - } + Config(size_t maxSizeBytes, size_t patternAccessTimesBeforeTryToCompile) + : MaxSizeBytes(maxSizeBytes) + , PatternAccessTimesBeforeTryToCompile(patternAccessTimesBeforeTryToCompile) + {} - std::shared_ptr<TPatternCacheEntry> Find(const TString& serialized) { - auto guard = std::scoped_lock<std::mutex>(Mutex); - if (auto it = Cache.Find(serialized); it != Cache.End()) { - ++*Hits; - return *it; - } else { - ++*Misses; - return {}; - } - } + size_t MaxSizeBytes = 0; + std::optional<size_t> PatternAccessTimesBeforeTryToCompile; - TTicket FindOrSubscribe(const TString& serialized) { - auto guard = std::scoped_lock<std::mutex>(Mutex); - if (auto it = Cache.Find(serialized); it != Cache.End()) { - ++*Hits; - return TTicket(serialized, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr); + bool operator==(const Config & rhs) { + return std::tie(MaxSizeBytes, PatternAccessTimesBeforeTryToCompile) == + std::tie(rhs.MaxSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile); } - auto [notifyIt, isNew] = Notify.emplace(serialized, Nothing()); - if (isNew) { - ++*Misses; - return TTicket(serialized, true, {}, this); + bool operator!=(const Config & rhs) { + return !(*this == rhs); } + }; - ++*Waits; - auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>(); - auto& subscribers = Notify[serialized]; - if (!subscribers) { - subscribers.ConstructInPlace(); - } + TComputationPatternLRUCache(Config configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>()); - subscribers->push_back(promise); - return TTicket(serialized, false, promise, nullptr); + ~TComputationPatternLRUCache() { + CleanCache(); } - void RemoveOldest() { - auto oldest = Cache.FindOldest(); - Y_VERIFY_DEBUG(oldest != Cache.End()); - CurrentSizeBytes -= oldest.Value()->SizeForCache; - Cache.Erase(oldest); + static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) { + return std::make_shared<TPatternCacheEntry>(useAlloc); } - void NotifyMissing(const TString& serialized) { - TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; - { - auto guard = std::scoped_lock<std::mutex>(Mutex); - auto notifyIt = Notify.find(serialized); - if (notifyIt != Notify.end()) { - subscribers.swap(notifyIt->second); - Notify.erase(notifyIt); - } - } + std::shared_ptr<TPatternCacheEntry> Find(const TString& serialized); - if (subscribers) { - for (auto& s : *subscribers) { - s.SetValue(nullptr); - } - } - } + TTicket FindOrSubscribe(const TString& serialized); - void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv) { - Y_VERIFY_DEBUG(patternWithEnv && patternWithEnv->Pattern); - TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers; - { - auto guard = std::scoped_lock<std::mutex>(Mutex); - // normally remove only one old cache entry by iteration to prevent bursts - if (CurrentSizeBytes > MaxSizeBytes) { - RemoveOldest(); - } - // to prevent huge memory overusage remove as much as needed - while (CurrentSizeBytes > 2 * MaxSizeBytes) { - RemoveOldest(); - } - - patternWithEnv->UpdateSizeForCache(); - CurrentSizeBytes += patternWithEnv->SizeForCache; - - Cache.Insert(serialized, patternWithEnv); - auto notifyIt = Notify.find(serialized); - if (notifyIt != Notify.end()) { - subscribers.swap(notifyIt->second); - Notify.erase(notifyIt); - } - - *SizeItems = Cache.Size(); - *SizeBytes = CurrentSizeBytes; - } + void NotifyMissing(const TString& serialized); - if (subscribers) { - for (auto& s : *subscribers) { - s.SetValue(patternWithEnv); - } - } - } + void EmplacePattern(const TString& serialized, std::shared_ptr<TPatternCacheEntry> patternWithEnv); - void CleanCache() { - *SizeItems = 0; - *SizeBytes = 0; - *MaxSizeBytesCounter = 0; - auto guard = std::scoped_lock<std::mutex>(Mutex); - CurrentSizeBytes = 0; - Cache.Clear(); - } + void CleanCache(); size_t GetSize() const { - auto guard = std::scoped_lock<std::mutex>(Mutex); + std::lock_guard lock(Mutex); return Cache.Size(); } - size_t GetMaxSize() const { - auto guard = std::scoped_lock<std::mutex>(Mutex); - return MaxSizeBytes; + Config GetConfiguration() const { + std::lock_guard lock(Mutex); + return Configuration; } + size_t GetMaxSizeBytes() const { + std::lock_guard lock(Mutex); + return Configuration.MaxSizeBytes; + } i64 GetCacheHits() const { return *Hits; @@ -243,9 +156,38 @@ public: ++*NotSuitablePattern; } - ~TComputationPatternLRUCache() { - CleanCache(); + size_t GetPatternsToCompileSize() const { + std::lock_guard lock(Mutex); + return PatternsToCompile.size(); + } + + void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) { + std::lock_guard lock(Mutex); + result.swap(PatternsToCompile); } + +private: + void RemoveOldest(); + + void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry); + + static constexpr size_t CacheMaxElementsSize = 10000; + + mutable std::mutex Mutex; + THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify; + TLRUCache<TString, std::shared_ptr<TPatternCacheEntry>> Cache; + size_t CurrentSizeBytes = 0; + THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile; + + const Config Configuration; + + NMonitoring::TDynamicCounters::TCounterPtr Hits; + NMonitoring::TDynamicCounters::TCounterPtr Waits; + NMonitoring::TDynamicCounters::TCounterPtr Misses; + NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern; + NMonitoring::TDynamicCounters::TCounterPtr SizeItems; + NMonitoring::TDynamicCounters::TCounterPtr SizeBytes; + NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter; }; } // namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/ya.make.inc b/ydb/library/yql/minikql/computation/ya.make.inc index be501a3097..d8c67bba61 100644 --- a/ydb/library/yql/minikql/computation/ya.make.inc +++ b/ydb/library/yql/minikql/computation/ya.make.inc @@ -16,6 +16,7 @@ SRCS( mkql_custom_list.cpp mkql_validate.cpp mkql_value_builder.cpp + mkql_computation_pattern_cache.cpp presort.cpp ) |