diff options
author | Evgeniy Ivanov <eivanov89@ydb.tech> | 2024-11-29 17:18:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-29 19:18:48 +0300 |
commit | 1b51ba97193c30e07e53536bd8f0e587b3335dc7 (patch) | |
tree | c872f6169737ea00890d95da496098180b42ed90 | |
parent | b8297aebf9585e53bdfd7f30450c7aceb7fa8134 (diff) | |
download | ydb-1b51ba97193c30e07e53536bd8f0e587b3335dc7.tar.gz |
Use shared compile cache to speedup queries (#11806)
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 666 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.h | 134 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.cpp | 89 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 12 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 57 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.h | 3 |
7 files changed, 641 insertions, 329 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index aa93ee7cf3..fab575f53b 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -28,240 +28,6 @@ using namespace NKikimrConfig; using namespace NYql; -class TKqpQueryCache { -public: - TKqpQueryCache(size_t size, TDuration ttl) - : List(size) - , Ttl(ttl) {} - - void InsertQuery(const TKqpCompileResult::TConstPtr& compileResult) { - Y_ENSURE(compileResult->Query); - auto& query = *compileResult->Query; - - YQL_ENSURE(compileResult->PreparedQuery); - - auto queryIt = QueryIndex.emplace(query, compileResult->Uid); - if (!queryIt.second) { - EraseByUid(compileResult->Uid); - QueryIndex.erase(query); - } - Y_ENSURE(queryIt.second); - } - - void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) { - Y_ENSURE(compileResult->Query); - Y_ENSURE(compileResult->GetAst()); - - AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->GetAst()), compileResult->Uid); - } - - bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution) { - if (!isPerStatementExecution) { - InsertQuery(compileResult); - } - if (isEnableAstCache && compileResult->GetAst()) { - InsertAst(compileResult); - } - - auto it = Index.emplace(compileResult->Uid, TCacheEntry{compileResult, TAppData::TimeProvider->Now() + Ttl}); - Y_ABORT_UNLESS(it.second); - - TItem* item = &const_cast<TItem&>(*it.first); - auto removedItem = List.Insert(item); - - IncBytes(item->Value.CompileResult->PreparedQuery->ByteSize()); - - if (removedItem) { - DecBytes(removedItem->Value.CompileResult->PreparedQuery->ByteSize()); - - auto queryId = *removedItem->Value.CompileResult->Query; - QueryIndex.erase(queryId); - if (removedItem->Value.CompileResult->GetAst()) { - AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->GetAst())); - } - auto indexIt = Index.find(*removedItem); - if (indexIt != Index.end()) { - Index.erase(indexIt); - } - } - - Y_ABORT_UNLESS(List.GetSize() == Index.size()); - - return removedItem != nullptr; - } - - void AttachReplayMessage(const TString uid, TString replayMessage) { - auto it = Index.find(TItem(uid)); - if (it != Index.end()) { - TItem* item = &const_cast<TItem&>(*it); - DecBytes(item->Value.ReplayMessage.size()); - item->Value.ReplayMessage = replayMessage; - item->Value.LastReplayTime = TInstant::Now(); - IncBytes(replayMessage.size()); - } - } - - TString ReplayMessageByUid(const TString uid, TDuration timeout) { - auto it = Index.find(TItem(uid)); - if (it != Index.end()) { - TInstant& lastReplayTime = const_cast<TItem&>(*it).Value.LastReplayTime; - TInstant now = TInstant::Now(); - if (lastReplayTime + timeout < now) { - lastReplayTime = now; - return it->Value.ReplayMessage; - } - } - return ""; - } - - TKqpCompileResult::TConstPtr FindByUid(const TString& uid, bool promote) { - auto it = Index.find(TItem(uid)); - if (it != Index.end()) { - TItem* item = &const_cast<TItem&>(*it); - if (promote) { - item->Value.ExpiredAt = TAppData::TimeProvider->Now() + Ttl; - List.Promote(item); - } - - return item->Value.CompileResult; - } - - return nullptr; - } - - void Replace(const TKqpCompileResult::TConstPtr& compileResult) { - auto it = Index.find(TItem(compileResult->Uid)); - if (it != Index.end()) { - TItem& item = const_cast<TItem&>(*it); - item.Value.CompileResult = compileResult; - } - } - - TKqpQueryId GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast) { - Y_ABORT_UNLESS(ast.Root); - std::shared_ptr<std::map<TString, Ydb::Type>> astPgParams; - if (query.QueryParameterTypes || ast.PgAutoParamValues) { - astPgParams = std::make_shared<std::map<TString, Ydb::Type>>(); - if (query.QueryParameterTypes) { - for (const auto& [name, param] : *query.QueryParameterTypes) { - astPgParams->insert({name, param}); - } - } - if (ast.PgAutoParamValues) { - const auto& params = dynamic_cast<TKqpAutoParamBuilder*>(ast.PgAutoParamValues.Get())->Values; - for (const auto& [name, param] : params) { - astPgParams->insert({name, param.Gettype()}); - } - } - } - return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings}; - } - - TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) { - auto uid = QueryIndex.FindPtr(query); - if (!uid) { - return nullptr; - } - - return FindByUid(*uid, promote); - } - - TKqpCompileResult::TConstPtr FindByAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote) { - auto uid = AstIndex.FindPtr(GetQueryIdWithAst(query, ast)); - if (!uid) { - return nullptr; - } - - return FindByUid(*uid, promote); - } - - bool EraseByUid(const TString& uid) { - auto it = Index.find(TItem(uid)); - if (it == Index.end()) { - return false; - } - - TItem* item = &const_cast<TItem&>(*it); - List.Erase(item); - - DecBytes(item->Value.CompileResult->PreparedQuery->ByteSize()); - DecBytes(item->Value.ReplayMessage.size()); - - Y_ABORT_UNLESS(item->Value.CompileResult); - Y_ABORT_UNLESS(item->Value.CompileResult->Query); - auto queryId = *item->Value.CompileResult->Query; - QueryIndex.erase(queryId); - if (item->Value.CompileResult->GetAst()) { - AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->GetAst())); - } - - Index.erase(it); - - Y_ABORT_UNLESS(List.GetSize() == Index.size()); - return true; - } - - size_t Size() const { - return Index.size(); - } - - ui64 Bytes() const { - return ByteSize; - } - - size_t EraseExpiredQueries() { - auto prevSize = Size(); - - auto now = TAppData::TimeProvider->Now(); - while (List.GetSize() && List.GetOldest()->Value.ExpiredAt <= now) { - EraseByUid(List.GetOldest()->Key); - } - - Y_ABORT_UNLESS(List.GetSize() == Index.size()); - return prevSize - Size(); - } - - void Clear() { - List = TList(List.GetMaxSize()); - Index.clear(); - QueryIndex.clear(); - AstIndex.clear(); - ByteSize = 0; - } - -private: - void DecBytes(ui64 bytes) { - if (bytes > ByteSize) { - ByteSize = 0; - } else { - ByteSize -= bytes; - } - } - - void IncBytes(ui64 bytes) { - ByteSize += bytes; - } - -private: - struct TCacheEntry { - TKqpCompileResult::TConstPtr CompileResult; - TInstant ExpiredAt; - TString ReplayMessage = ""; - TInstant LastReplayTime = TInstant::Zero(); - }; - - using TList = TLRUList<TString, TCacheEntry>; - using TItem = TList::TItem; - -private: - TList List; - THashSet<TItem, TItem::THash> Index; - THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> QueryIndex; - THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> AstIndex; - ui64 ByteSize = 0; - TDuration Ttl; -}; - struct TKqpCompileSettings { TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE) @@ -449,18 +215,20 @@ public: return NKikimrServices::TActivity::KQP_COMPILE_SERVICE; } - TKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig, + TKqpCompileService( + TKqpQueryCachePtr queryCache, + const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup ) - : TableServiceConfig(tableServiceConfig) + : QueryCache(std::move(queryCache)) + , TableServiceConfig(tableServiceConfig) , QueryServiceConfig(queryServiceConfig) , KqpSettings(kqpSettings) , ModuleResolverState(moduleResolverState) , Counters(counters) - , QueryCache(TableServiceConfig.GetCompileQueryCacheSize(), TDuration::Seconds(TableServiceConfig.GetCompileQueryCacheTTLSec())) , RequestsQueue(TableServiceConfig.GetCompileRequestQueueSize()) , QueryReplayFactory(std::move(queryReplayFactory)) , FederatedQuerySetup(federatedQuerySetup) @@ -578,7 +346,7 @@ private: TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams || TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution) { - QueryCache.Clear(); + QueryCache->Clear(); LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Query cache was invalidated due to config change"); @@ -632,24 +400,24 @@ private: << ", split: " << request.Split << *request.UserRequestContext); - *Counters->CompileQueryCacheSize = QueryCache.Size(); - *Counters->CompileQueryCacheBytes = QueryCache.Bytes(); - auto userSid = request.UserToken->GetUserSID(); auto dbCounters = request.DbCounters; - if (request.Uid) { - Counters->ReportCompileRequestGet(dbCounters); + auto compileResult = QueryCache->Find( + request.Uid, + request.Query, + request.TempTablesState, + request.KeepInCache, + request.UserToken->GetUserSID(), + Counters, + dbCounters, + ev->Sender, + ctx); - auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache); - if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { - compileResult = nullptr; - } + if (request.Uid) { if (compileResult) { Y_ENSURE(compileResult->Query); if (compileResult->Query->UserSid == userSid) { - Counters->ReportQueryCacheHit(dbCounters, true); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid" << ", sender: " << ev->Sender << ", queryUid: " << *request.Uid); @@ -663,38 +431,18 @@ private: << ", expected sid: " << compileResult->Query->UserSid << ", actual sid: " << userSid); } - } - - Counters->ReportQueryCacheHit(dbCounters, false); - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Query not found" - << ", sender: " << ev->Sender - << ", queryUid: " << *request.Uid); - - NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid); - ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); - return; - } - - Counters->ReportCompileRequestCompile(dbCounters); - - Y_ENSURE(request.Query); - auto& query = *request.Query; - - if (query.UserSid.empty()) { - query.UserSid = userSid; - } else { - Y_ENSURE(query.UserSid == userSid); - } - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by queryId, queryId: " << query.SerializeToString()); - auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache); - if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { - compileResult = nullptr; - } + } else { + Counters->ReportQueryCacheHit(dbCounters, false); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Query not found" + << ", sender: " << ev->Sender + << ", queryUid: " << *request.Uid); - if (compileResult) { - Counters->ReportQueryCacheHit(dbCounters, true); + NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid); + ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); + return; + } + } else if (compileResult) { + Y_ENSURE(request.Query); LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" << ", sender: " << ev->Sender @@ -704,6 +452,8 @@ private: return; } + Counters->ReportCompileRequestCompile(dbCounters); + CollectDiagnostics = request.CollectDiagnostics; LWTRACK(KqpCompileServiceEnqueued, @@ -768,7 +518,7 @@ private: auto dbCounters = request.DbCounters; Counters->ReportRecompileRequestGet(dbCounters); - TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false); + TKqpCompileResult::TConstPtr compileResult = QueryCache->FindByUid(request.Uid, false); if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { compileResult = nullptr; } @@ -879,7 +629,7 @@ private: if (ev->Get()->ReplayMessage && !QueryReplayBackend->IsNull()) { QueryReplayBackend->Collect(*ev->Get()->ReplayMessage); - QueryCache.AttachReplayMessage(compileRequest.Uid, *ev->Get()->ReplayMessage); + QueryCache->AttachReplayMessage(compileRequest.Uid, *ev->Get()->ReplayMessage); } auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query); @@ -890,8 +640,8 @@ private: } } else { if (!hasTempTablesNameClashes) { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.EraseByUid(compileResult->Uid); + if (QueryCache->FindByUid(compileResult->Uid, false)) { + QueryCache->EraseByUid(compileResult->Uid); } } } @@ -929,13 +679,13 @@ private: auto dbCounters = request.DbCounters; Counters->ReportCompileRequestInvalidate(dbCounters); - QueryCache.EraseByUid(request.Uid); + QueryCache->EraseByUid(request.Uid); } void HandleTtlTimer(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Received check queries TTL timeout"); - auto evicted = QueryCache.EraseExpiredQueries(); + auto evicted = QueryCache->EraseExpiredQueries(); if (evicted != 0) { Counters->CompileQueryCacheEvicted->Add(evicted); } @@ -943,30 +693,17 @@ private: StartCheckQueriesTtlTimer(); } - bool HasTempTablesNameClashes( - TKqpCompileResult::TConstPtr compileResult, - TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId = false) { - if (!compileResult) { - return false; - } - if (!compileResult->PreparedQuery) { - return false; - } - - return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId); - } - void UpdateQueryCache(const TActorContext& ctx, TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare, bool isPerStatementExecution) { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.Replace(compileResult); + if (QueryCache->FindByUid(compileResult->Uid, false)) { + QueryCache->Replace(compileResult); } else if (keepInCache) { if (compileResult->Query) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert query into compile cache, queryId: " << compileResult->Query->SerializeToString()); - if (QueryCache.FindByQuery(*compileResult->Query, keepInCache)) { + if (QueryCache->FindByQuery(*compileResult->Query, keepInCache)) { LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Trying to insert query into compile cache when it is already there"); } } - if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) { + if (QueryCache->Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) { Counters->CompileQueryCacheEvicted->Inc(); } if (compileResult->Query && isQueryActionPrepare) { @@ -983,7 +720,7 @@ private: YQL_ENSURE(queryAst.Ast->Root); LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by ast, queryId: " << compileRequest.Query.SerializeToString() << ", ast: " << queryAst.Ast->Root->ToString()); - auto compileResult = QueryCache.FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache); + auto compileResult = QueryCache->FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache); if (!compileRequest.FindInCache || HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) { compileResult = nullptr; @@ -1067,17 +804,17 @@ private: queryParameterTypes->insert({param.GetName(), paramType}); } query.QueryParameterTypes = queryParameterTypes; - if (QueryCache.FindByQuery(query, keepInCache)) { + if (QueryCache->FindByQuery(query, keepInCache)) { return false; } - if (compileResult->GetAst() && QueryCache.FindByAst(query, *compileResult->GetAst(), keepInCache)) { + if (compileResult->GetAst() && QueryCache->FindByAst(query, *compileResult->GetAst(), keepInCache)) { return false; } auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst); newCompileResult->AllowCache = compileResult->AllowCache; newCompileResult->PreparedQuery = compileResult->PreparedQuery; LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString()); - return QueryCache.Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution); + return QueryCache->Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution); } void ProcessQueue(const TActorContext& ctx) { @@ -1158,7 +895,7 @@ private: TKqpStatsCompile stats; stats.FromCache = true; - if (auto replayMessage = QueryCache.ReplayMessageByUid(compileResult->Uid, TDuration::Seconds(TableServiceConfig.GetQueryReplayCacheUploadTTLSec()))) { + if (auto replayMessage = QueryCache->ReplayMessageByUid(compileResult->Uid, TDuration::Seconds(TableServiceConfig.GetQueryReplayCacheUploadTTLSec()))) { QueryReplayBackend->Collect(replayMessage); } @@ -1219,6 +956,8 @@ private: } private: + TKqpQueryCachePtr QueryCache; + TTableServiceConfig TableServiceConfig; TQueryServiceConfig QueryServiceConfig; TKqpSettings::TConstPtr KqpSettings; @@ -1226,7 +965,6 @@ private: TIntrusivePtr<TKqpCounters> Counters; THolder<IQueryReplayBackend> QueryReplayBackend; - TKqpQueryCache QueryCache; TKqpRequestsQueue RequestsQueue; std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory; std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; @@ -1234,13 +972,315 @@ private: bool CollectDiagnostics = false; }; -IActor* CreateKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig, +// QueryCache + +bool TKqpQueryCache::Insert( + const TKqpCompileResult::TConstPtr& compileResult, + bool isEnableAstCache, + bool isPerStatementExecution) +{ + TGuard<TAdaptiveLock> guard(Lock); + + if (!isPerStatementExecution) { + InsertQuery(compileResult); + } + if (isEnableAstCache && compileResult->GetAst()) { + InsertAst(compileResult); + } + + auto it = Index.emplace(compileResult->Uid, TCacheEntry{compileResult, TAppData::TimeProvider->Now() + Ttl}); + Y_ABORT_UNLESS(it.second); + + TItem* item = &const_cast<TItem&>(*it.first); + auto removedItem = List.Insert(item); + + IncBytes(item->Value.CompileResult->PreparedQuery->ByteSize()); + + if (removedItem) { + DecBytes(removedItem->Value.CompileResult->PreparedQuery->ByteSize()); + + auto queryId = *removedItem->Value.CompileResult->Query; + QueryIndex.erase(queryId); + if (removedItem->Value.CompileResult->GetAst()) { + AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->GetAst())); + } + auto indexIt = Index.find(*removedItem); + if (indexIt != Index.end()) { + Index.erase(indexIt); + } + } + + Y_ABORT_UNLESS(List.GetSize() == Index.size()); + + return removedItem != nullptr; +} + +void TKqpQueryCache::AttachReplayMessage(const TString uid, TString replayMessage) { + TGuard<TAdaptiveLock> guard(Lock); + + auto it = Index.find(TItem(uid)); + if (it != Index.end()) { + TItem* item = &const_cast<TItem&>(*it); + DecBytes(item->Value.ReplayMessage.size()); + item->Value.ReplayMessage = replayMessage; + item->Value.LastReplayTime = TInstant::Now(); + IncBytes(replayMessage.size()); + } +} + +TString TKqpQueryCache::ReplayMessageByUid(const TString uid, TDuration timeout) { + TGuard<TAdaptiveLock> guard(Lock); + + auto it = Index.find(TItem(uid)); + if (it != Index.end()) { + TInstant& lastReplayTime = const_cast<TItem&>(*it).Value.LastReplayTime; + TInstant now = TInstant::Now(); + if (lastReplayTime + timeout < now) { + lastReplayTime = now; + return it->Value.ReplayMessage; + } + } + return ""; +} + +TKqpCompileResult::TConstPtr TKqpQueryCache::FindByUidImpl(const TString& uid, bool promote) { + auto it = Index.find(TItem(uid)); + if (it != Index.end()) { + TItem* item = &const_cast<TItem&>(*it); + if (promote) { + item->Value.ExpiredAt = TAppData::TimeProvider->Now() + Ttl; + List.Promote(item); + } + + return item->Value.CompileResult; + } + + return nullptr; +} + +TKqpCompileResult::TConstPtr TKqpQueryCache::FindByQueryImpl(const TKqpQueryId& query, bool promote) { + auto uid = QueryIndex.FindPtr(query); + if (!uid) { + return nullptr; + } + + // we're holding read and assume it's recursive + return FindByUidImpl(*uid, promote); +} + +bool TKqpQueryCache::EraseByUidImpl(const TString& uid) { + auto it = Index.find(TItem(uid)); + if (it == Index.end()) { + return false; + } + + TItem* item = &const_cast<TItem&>(*it); + List.Erase(item); + + DecBytes(item->Value.CompileResult->PreparedQuery->ByteSize()); + DecBytes(item->Value.ReplayMessage.size()); + + Y_ABORT_UNLESS(item->Value.CompileResult); + Y_ABORT_UNLESS(item->Value.CompileResult->Query); + auto queryId = *item->Value.CompileResult->Query; + QueryIndex.erase(queryId); + if (item->Value.CompileResult->GetAst()) { + AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->GetAst())); + } + + Index.erase(it); + + Y_ABORT_UNLESS(List.GetSize() == Index.size()); + return true; +} + +void TKqpQueryCache::Replace(const TKqpCompileResult::TConstPtr& compileResult) { + TGuard<TAdaptiveLock> guard(Lock); + + auto it = Index.find(TItem(compileResult->Uid)); + if (it != Index.end()) { + TItem& item = const_cast<TItem&>(*it); + item.Value.CompileResult = compileResult; + } +} + +// find by either uid or query +TKqpCompileResult::TConstPtr TKqpQueryCache::Find( + const TMaybe<TString>& uid, + TMaybe<TKqpQueryId>& query, + TKqpTempTablesState::TConstPtr tempTablesState, + bool promote, + const NACLib::TSID& userSid, + TIntrusivePtr<TKqpCounters> counters, + TKqpDbCountersPtr& dbCounters, + const TActorId& sender, + const TActorContext& ctx) +{ + TGuard<TAdaptiveLock> guard(Lock); + + *counters->CompileQueryCacheSize = SizeImpl(); + *counters->CompileQueryCacheBytes = BytesImpl(); + + if (uid) { + counters->ReportCompileRequestGet(dbCounters); + + auto compileResult = FindByUidImpl(*uid, promote); + if (HasTempTablesNameClashes(compileResult, tempTablesState)) { + compileResult = nullptr; + } + + if (compileResult) { + Y_ENSURE(compileResult->Query); + if (compileResult->Query->UserSid == userSid) { + counters->ReportQueryCacheHit(dbCounters, true); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid" + << ", sender: " << sender + << ", queryUid: " << *uid); + + return compileResult; + } else { + LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" + << ", sender: " << sender + << ", queryUid: " << *uid + << ", expected sid: " << compileResult->Query->UserSid + << ", actual sid: " << userSid); + } + } + + return nullptr; + } + + Y_ENSURE(query); + + if (query->UserSid.empty()) { + query->UserSid = userSid; + } else { + Y_ENSURE(query->UserSid == userSid); + } + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by queryId, queryId: " + << query->SerializeToString()); + auto compileResult = FindByQueryImpl(*query, promote); + if (HasTempTablesNameClashes(compileResult, tempTablesState)) { + return nullptr; + } + + if (compileResult) { + counters->ReportQueryCacheHit(dbCounters, true); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" + << ", sender: " << sender + << ", queryUid: " << compileResult->Uid); + + return compileResult; + } + + // note, we don't report cache miss, because it's up to caller to decide what to do: + // in particular, session actor will go to the compile service, which will actually report the miss. + + return nullptr; +} + +TKqpCompileResult::TConstPtr TKqpQueryCache::FindByAst( + const TKqpQueryId& query, + const NYql::TAstParseResult& ast, + bool promote) +{ + TGuard<TAdaptiveLock> guard(Lock); + + auto uid = AstIndex.FindPtr(GetQueryIdWithAst(query, ast)); + if (!uid) { + return nullptr; + } + + return FindByUidImpl(*uid, promote); +} + +size_t TKqpQueryCache::EraseExpiredQueries() { + TGuard<TAdaptiveLock> guard(Lock); + + auto prevSize = SizeImpl(); + + auto now = TAppData::TimeProvider->Now(); + while (List.GetSize() && List.GetOldest()->Value.ExpiredAt <= now) { + EraseByUidImpl(List.GetOldest()->Key); + } + + Y_ABORT_UNLESS(List.GetSize() == Index.size()); + return prevSize - SizeImpl(); +} + +void TKqpQueryCache::Clear() { + TGuard<TAdaptiveLock> guard(Lock); + + List = TList(List.GetMaxSize()); + Index.clear(); + QueryIndex.clear(); + AstIndex.clear(); + ByteSize = 0; +} + +void TKqpQueryCache::InsertQuery(const TKqpCompileResult::TConstPtr& compileResult) { + Y_ENSURE(compileResult->Query); + auto& query = *compileResult->Query; + + YQL_ENSURE(compileResult->PreparedQuery); + + auto queryIt = QueryIndex.emplace(query, compileResult->Uid); + if (!queryIt.second) { + EraseByUid(compileResult->Uid); + QueryIndex.erase(query); + } + Y_ENSURE(queryIt.second); +} + +TKqpQueryId TKqpQueryCache::GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast) { + Y_ABORT_UNLESS(ast.Root); + std::shared_ptr<std::map<TString, Ydb::Type>> astPgParams; + if (query.QueryParameterTypes || ast.PgAutoParamValues) { + astPgParams = std::make_shared<std::map<TString, Ydb::Type>>(); + if (query.QueryParameterTypes) { + for (const auto& [name, param] : *query.QueryParameterTypes) { + astPgParams->insert({name, param}); + } + } + if (ast.PgAutoParamValues) { + const auto& params = dynamic_cast<TKqpAutoParamBuilder*>(ast.PgAutoParamValues.Get())->Values; + for (const auto& [name, param] : params) { + astPgParams->insert({name, param.Gettype()}); + } + } + } + return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings}; +} + +// + +bool HasTempTablesNameClashes( + TKqpCompileResult::TConstPtr compileResult, + TKqpTempTablesState::TConstPtr tempTablesState, + bool withSessionId) +{ + if (!compileResult) { + return false; + } + if (!compileResult->PreparedQuery) { + return false; + } + + return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId); +} + +IActor* CreateKqpCompileService( + TKqpQueryCachePtr queryCache, + const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, - std::optional<TKqpFederatedQuerySetup> federatedQuerySetup - ) + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup) { - return new TKqpCompileService(tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters, + return new TKqpCompileService( + std::move(queryCache), + tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters, std::move(queryReplayFactory), federatedQuerySetup); } diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index 8da1cdead7..853ba5a359 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -4,17 +4,149 @@ #include <ydb/core/kqp/common/simple/temp_tables.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> +#include <ydb/core/kqp/host/kqp_translate.h> + +#include <util/system/spinlock.h> namespace NKikimr { namespace NKqp { +class TKqpDbCounters; + enum class ECompileActorAction { COMPILE, PARSE, SPLIT, }; -IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, +// Cache is shared between query sessions, compile service and kqp proxy. +// Currently we don't use RW lock, because of cache promotions during lookup: +// in benchmark both versions (RW spinlock and adaptive W spinlock) show same +// performance. Might consider RW lock again in future. +class TKqpQueryCache: public TThrRefBase { +public: + TKqpQueryCache(size_t size, TDuration ttl) + : List(size) + , Ttl(ttl) {} + + bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution); + void AttachReplayMessage(const TString uid, TString replayMessage); + + TString ReplayMessageByUid(const TString uid, TDuration timeout); + + TKqpCompileResult::TConstPtr FindByUid(const TString& uid, bool promote) { + TGuard<TAdaptiveLock> guard(Lock); + return FindByUidImpl(uid, promote); + } + + void Replace(const TKqpCompileResult::TConstPtr& compileResult); + + TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) { + TGuard<TAdaptiveLock> guard(Lock); + return FindByQueryImpl(query, promote); + } + + // find by either uid or query + TKqpCompileResult::TConstPtr Find( + const TMaybe<TString>& uid, + TMaybe<TKqpQueryId>& query, + TKqpTempTablesState::TConstPtr tempTablesState, + bool promote, + const NACLib::TSID& userSid, + TIntrusivePtr<TKqpCounters> counters, + TIntrusivePtr<TKqpDbCounters>& dbCounters, + const TActorId& sender, + const TActorContext& ctx); + + TKqpCompileResult::TConstPtr FindByAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote); + + bool EraseByUid(const TString& uid) { + TGuard<TAdaptiveLock> guard(Lock); + return EraseByUidImpl(uid); + } + + size_t Size() const { + TGuard<TAdaptiveLock> guard(Lock); + return SizeImpl(); + } + + ui64 Bytes() const { + TGuard<TAdaptiveLock> guard(Lock); + return BytesImpl(); + } + + ui64 BytesImpl() const { + return ByteSize; + } + + size_t EraseExpiredQueries(); + + void Clear(); + +private: + TKqpCompileResult::TConstPtr FindByUidImpl(const TString& uid, bool promote); + TKqpCompileResult::TConstPtr FindByQueryImpl(const TKqpQueryId& query, bool promote); + bool EraseByUidImpl(const TString& uid); + + size_t SizeImpl() const { + return Index.size(); + } + + void InsertQuery(const TKqpCompileResult::TConstPtr& compileResult); + + void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) { + Y_ENSURE(compileResult->Query); + Y_ENSURE(compileResult->GetAst()); + + AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->GetAst()), compileResult->Uid); + } + + TKqpQueryId GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast); + + void DecBytes(ui64 bytes) { + if (bytes > ByteSize) { + ByteSize = 0; + } else { + ByteSize -= bytes; + } + } + + void IncBytes(ui64 bytes) { + ByteSize += bytes; + } + +private: + struct TCacheEntry { + TKqpCompileResult::TConstPtr CompileResult; + TInstant ExpiredAt; + TString ReplayMessage = ""; + TInstant LastReplayTime = TInstant::Zero(); + }; + + using TList = TLRUList<TString, TCacheEntry>; + using TItem = TList::TItem; + +private: + TList List; + THashSet<TItem, TItem::THash> Index; + THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> QueryIndex; + THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> AstIndex; + ui64 ByteSize = 0; + TDuration Ttl; + + TAdaptiveLock Lock; +}; + +using TKqpQueryCachePtr = TIntrusivePtr<TKqpQueryCache>; + +bool HasTempTablesNameClashes( + TKqpCompileResult::TConstPtr compileResult, + TKqpTempTablesState::TConstPtr tempTablesState, + bool withSessionId = false); + +IActor* CreateKqpCompileService( + TKqpQueryCachePtr queryCache, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 340cdb4e2c..6d813c248b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -209,6 +209,7 @@ public: , ModuleResolverState() , KqpProxySharedResources(std::move(kqpProxySharedResources)) , S3ActorsFactory(std::move(s3ActorsFactory)) + , QueryCache(new TKqpQueryCache(TableServiceConfig.GetCompileQueryCacheSize(), TDuration::Seconds(TableServiceConfig.GetCompileQueryCacheTTLSec()))) {} void Bootstrap(const TActorContext &ctx) { @@ -276,7 +277,9 @@ public: } // Create compile service - CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, QueryServiceConfig, + CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService( + QueryCache, + TableServiceConfig, QueryServiceConfig, KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), FederatedQuerySetup)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpCompileServiceID(SelfId().NodeId()), CompileService); @@ -1496,7 +1499,7 @@ private: auto config = CreateConfig(KqpSettings, workerSettings); - IActor* sessionActor = CreateKqpSessionActor(SelfId(), ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings, + IActor* sessionActor = CreateKqpSessionActor(SelfId(), QueryCache, ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, QueryServiceConfig, KqpTempTablesAgentActor); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); @@ -1902,6 +1905,8 @@ private: std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources; std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory; + TKqpQueryCachePtr QueryCache; + bool ServerWorkerBalancerComplete = false; std::optional<TString> SelfDataCenterId; TIntrusivePtr<IRandomProvider> RandomProvider; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index f1ff6799af..4c02f46b15 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -1,5 +1,6 @@ #include "kqp_query_state.h" +#include <ydb/core/kqp/compile_service/kqp_compile_service.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> namespace NKikimr::NKqp { @@ -136,27 +137,35 @@ std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildN return std::make_unique<TEvTxProxySchemeCache::TEvNavigateKeySet>(navigate.Release()); } - bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) { + CompileStats = ev->Stats; + if (!SaveAndCheckCompileResult(ev->CompileResult)) { + return false; + } + Orbit = std::move(ev->Orbit); + if (ev->ReplayMessage) { + ReplayMessage = *ev->ReplayMessage; + } + + return true; +} + +bool TKqpQueryState::SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr compileResult) { CompilationRunning = false; - CompileResult = ev->CompileResult; + CompileResult = compileResult; YQL_ENSURE(CompileResult); MaxReadType = CompileResult->MaxReadType; - Orbit = std::move(ev->Orbit); - if (CompileResult->Status != Ydb::StatusIds::SUCCESS) + if (CompileResult->Status != Ydb::StatusIds::SUCCESS) { return false; + } YQL_ENSURE(CompileResult->PreparedQuery); const ui32 compiledVersion = CompileResult->PreparedQuery->GetVersion(); YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1, "Unexpected prepared query version: " << compiledVersion); - CompileStats = ev->Stats; PreparedQuery = CompileResult->PreparedQuery; - if (ev->ReplayMessage) { - ReplayMessage = *ev->ReplayMessage; - } if (!CommandTagName) { CommandTagName = CompileResult->CommandTagName; } @@ -185,6 +194,70 @@ bool TKqpQueryState::SaveAndCheckSplitResult(TEvKqp::TEvSplitResponse* ev) { return ev->Status == Ydb::StatusIds::SUCCESS; } +bool TKqpQueryState::TryGetFromCache( + TKqpQueryCache& cache, + const TGUCSettings::TPtr& gUCSettingsPtr, + TIntrusivePtr<TKqpCounters>& counters, + const TActorId& sender) +{ + TMaybe<TKqpQueryId> query; + TMaybe<TString> uid; + + TKqpQuerySettings settings(GetType()); + settings.DocumentApiRestricted = IsDocumentApiRestricted_; + settings.IsInternalCall = IsInternalCall(); + settings.Syntax = GetSyntax(); + + TGUCSettings gUCSettings = gUCSettingsPtr ? *gUCSettingsPtr : TGUCSettings(); + bool keepInCache = false; + switch (GetAction()) { + case NKikimrKqp::QUERY_ACTION_EXECUTE: + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + keepInCache = GetQueryKeepInCache() && query->IsSql(); + break; + + case NKikimrKqp::QUERY_ACTION_PREPARE: + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + keepInCache = query->IsSql(); + break; + + case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: + uid = GetPreparedQuery(); + keepInCache = GetQueryKeepInCache(); + break; + + case NKikimrKqp::QUERY_ACTION_EXPLAIN: + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + keepInCache = false; + break; + + default: + YQL_ENSURE(false); + } + + CompileStats = {}; + + auto compileResult = cache.Find( + uid, + query, + TempTablesState, + keepInCache, + UserToken->GetUserSID(), + counters, + DbCounters, + sender, + TlsActivationContext->AsActorContext()); + + if (compileResult) { + if (SaveAndCheckCompileResult(compileResult)) { + CompileStats.FromCache = true; + return true; + } + } + + return false; +} + std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) { TMaybe<TKqpQueryId> query; TMaybe<TString> uid; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index da4cc35472..467605df5c 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -26,6 +26,8 @@ namespace NKikimr::NKqp { +class TKqpQueryCache; + // basically it's a state that holds all the context // about the specific query execution. // it holds the unique pointer to the query request, which may include @@ -412,7 +414,7 @@ public: const auto& phyQuery = PreparedQuery->GetPhysicalQuery(); auto tx = PreparedQuery->GetPhyTxOrEmpty(CurrentTx); - if (TxCtx->CanDeferEffects()) { + if (TxCtx->CanDeferEffects()) { // At current time sinks require separate tnx with commit. while (tx && tx->GetHasEffects() && !TxCtx->HasOlapTable) { QueryData->CreateKqpValueMap(tx); @@ -470,9 +472,17 @@ public: // execution. std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> BuildNavigateKeySet(); // same the context of the compiled query to the query state. + bool SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr compileResult); bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev); bool SaveAndCheckParseResult(TEvKqp::TEvParseResponse&& ev); bool SaveAndCheckSplitResult(TEvKqp::TEvSplitResponse* ev); + + bool TryGetFromCache( + TKqpQueryCache& cache, + const TGUCSettings::TPtr& gUCSettingsPtr, + TIntrusivePtr<TKqpCounters>& counters, + const TActorId& sender); + // build the compilation request. std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr); // TODO(gvit): get rid of code duplication in these requests, diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 71600ee6db..9e6db5c533 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -149,6 +149,7 @@ public: } TKqpSessionActor(const TActorId& owner, + TKqpQueryCachePtr queryCache, std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager, std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory> caFactory, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, @@ -159,6 +160,7 @@ public: const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TActorId& kqpTempTablesAgentActor) : Owner(owner) + , QueryCache(std::move(queryCache)) , SessionId(sessionId) , ResourceManager_(std::move(resourceManager)) , CaFactory_(std::move(caFactory)) @@ -515,12 +517,34 @@ public: void CompileQuery() { YQL_ENSURE(QueryState); QueryState->CompilationRunning = true; + + Become(&TKqpSessionActor::ExecuteState); + + // quick path + if (QueryState->TryGetFromCache(*QueryCache, GUCSettings, Counters, SelfId()) && !QueryState->CompileResult->NeedToSplit) { + LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status); + + // even if we have successfully compilation result, it doesn't mean anything + // in terms of current schema version of the table if response of compilation is from the cache. + // because of that, we are forcing to run schema version check + if (QueryState->NeedCheckTableVersions()) { + auto ev = QueryState->BuildNavigateKeySet(); + Send(MakeSchemeCacheID(), ev.release()); + return; + } + + OnSuccessCompileRequest(); + return; + } + + // TODO: in some cases we could reply right here (e.g. there is uid and query is missing), but + // for extra sanity we make extra hop to the compile service, which might handle the issue better + auto ev = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings); LOG_D("Sending CompileQuery request"); Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId, QueryState->KqpSessionSpan.GetTraceId()); - Become(&TKqpSessionActor::ExecuteState); } void CompileSplittedQuery() { @@ -611,8 +635,28 @@ public: } void CompileStatement() { + // quick path + if (QueryState->TryGetFromCache(*QueryCache, GUCSettings, Counters, SelfId()) && !QueryState->CompileResult->NeedToSplit) { + LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status); + + // even if we have successfully compilation result, it doesn't mean anything + // in terms of current schema version of the table if response of compilation is from the cache. + // because of that, we are forcing to run schema version check + if (QueryState->NeedCheckTableVersions()) { + auto ev = QueryState->BuildNavigateKeySet(); + Send(MakeSchemeCacheID(), ev.release()); + return; + } + + OnSuccessCompileRequest(); + return; + } + + // TODO: in some cases we could reply right here (e.g. there is uid and query is missing), but + // for extra sanity we make extra hop to the compile service, which might handle the issue better + auto request = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings); - LOG_D("Sending CompileQuery request"); + LOG_D("Sending CompileQuery request (statement)"); Send(MakeKqpCompileServiceID(SelfId().NodeId()), request.release(), 0, QueryState->QueryId, QueryState->KqpSessionSpan.GetTraceId()); @@ -2633,6 +2677,7 @@ private: private: TActorId Owner; + TKqpQueryCachePtr QueryCache; TString SessionId; std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_; @@ -2673,7 +2718,9 @@ private: } // namespace -IActor* CreateKqpSessionActor(const TActorId& owner, std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager, +IActor* CreateKqpSessionActor(const TActorId& owner, + TKqpQueryCachePtr queryCache, + std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager, std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory> caFactory, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, @@ -2682,7 +2729,9 @@ IActor* CreateKqpSessionActor(const TActorId& owner, std::shared_ptr<NKikimr::NK const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TActorId& kqpTempTablesAgentActor) { - return new TKqpSessionActor(owner, std::move(resourceManager), std::move(caFactory), sessionId, kqpSettings, workerSettings, federatedQuerySetup, + return new TKqpSessionActor( + owner, std::move(queryCache), + std::move(resourceManager), std::move(caFactory), sessionId, kqpSettings, workerSettings, federatedQuerySetup, std::move(asyncIoFactory), std::move(moduleResolverState), counters, queryServiceConfig, kqpTempTablesAgentActor); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index 50ad957d24..46e673c258 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -57,7 +57,10 @@ struct TKqpWorkerSettings { } }; +class TKqpQueryCache; + IActor* CreateKqpSessionActor(const TActorId& owner, + TIntrusivePtr<TKqpQueryCache> queryCache, std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager_, std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory> caFactory_, const TString& sessionId, |