diff options
authorEvgeniy Ivanov <eivanov89@ydb.tech>2024-11-29 17:18:48 +0100
committerGitHub <noreply@github.com>2024-11-29 19:18:48 +0300
commit1b51ba97193c30e07e53536bd8f0e587b3335dc7 (patch)
parentb8297aebf9585e53bdfd7f30450c7aceb7fa8134 (diff)
Use shared compile cache to speedup queries (#11806)
7 files changed, 641 insertions, 329 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index aa93ee7cf3..fab575f53b 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -28,240 +28,6 @@ using namespace NKikimrConfig;
using namespace NYql;
-class TKqpQueryCache {
- TKqpQueryCache(size_t size, TDuration ttl)
- : List(size)
- , Ttl(ttl) {}
- void InsertQuery(const TKqpCompileResult::TConstPtr& compileResult) {
- Y_ENSURE(compileResult->Query);
- auto& query = *compileResult->Query;
- YQL_ENSURE(compileResult->PreparedQuery);
- auto queryIt = QueryIndex.emplace(query, compileResult->Uid);
- if (!queryIt.second) {
- EraseByUid(compileResult->Uid);
- QueryIndex.erase(query);
- }
- Y_ENSURE(queryIt.second);
- }
- void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) {
- Y_ENSURE(compileResult->Query);
- Y_ENSURE(compileResult->GetAst());
- AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->GetAst()), compileResult->Uid);
- }
- bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution) {
- if (!isPerStatementExecution) {
- InsertQuery(compileResult);
- }
- if (isEnableAstCache && compileResult->GetAst()) {
- InsertAst(compileResult);
- }
- auto it = Index.emplace(compileResult->Uid, TCacheEntry{compileResult, TAppData::TimeProvider->Now() + Ttl});
- Y_ABORT_UNLESS(it.second);
- TItem* item = &const_cast<TItem&>(*it.first);
- auto removedItem = List.Insert(item);
- IncBytes(item->Value.CompileResult->PreparedQuery->ByteSize());
- if (removedItem) {
- DecBytes(removedItem->Value.CompileResult->PreparedQuery->ByteSize());
- auto queryId = *removedItem->Value.CompileResult->Query;
- QueryIndex.erase(queryId);
- if (removedItem->Value.CompileResult->GetAst()) {
- AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->GetAst()));
- }
- auto indexIt = Index.find(*removedItem);
- if (indexIt != Index.end()) {
- Index.erase(indexIt);
- }
- }
- Y_ABORT_UNLESS(List.GetSize() == Index.size());
- return removedItem != nullptr;
- }
- void AttachReplayMessage(const TString uid, TString replayMessage) {
- auto it = Index.find(TItem(uid));
- if (it != Index.end()) {
- TItem* item = &const_cast<TItem&>(*it);
- DecBytes(item->Value.ReplayMessage.size());
- item->Value.ReplayMessage = replayMessage;
- item->Value.LastReplayTime = TInstant::Now();
- IncBytes(replayMessage.size());
- }
- }
- TString ReplayMessageByUid(const TString uid, TDuration timeout) {
- auto it = Index.find(TItem(uid));
- if (it != Index.end()) {
- TInstant& lastReplayTime = const_cast<TItem&>(*it).Value.LastReplayTime;
- TInstant now = TInstant::Now();
- if (lastReplayTime + timeout < now) {
- lastReplayTime = now;
- return it->Value.ReplayMessage;
- }
- }
- return "";
- }
- TKqpCompileResult::TConstPtr FindByUid(const TString& uid, bool promote) {
- auto it = Index.find(TItem(uid));
- if (it != Index.end()) {
- TItem* item = &const_cast<TItem&>(*it);
- if (promote) {
- item->Value.ExpiredAt = TAppData::TimeProvider->Now() + Ttl;
- List.Promote(item);
- }
- return item->Value.CompileResult;
- }
- return nullptr;
- }
- void Replace(const TKqpCompileResult::TConstPtr& compileResult) {
- auto it = Index.find(TItem(compileResult->Uid));
- if (it != Index.end()) {
- TItem& item = const_cast<TItem&>(*it);
- item.Value.CompileResult = compileResult;
- }
- }
- TKqpQueryId GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast) {
- Y_ABORT_UNLESS(ast.Root);
- std::shared_ptr<std::map<TString, Ydb::Type>> astPgParams;
- if (query.QueryParameterTypes || ast.PgAutoParamValues) {
- astPgParams = std::make_shared<std::map<TString, Ydb::Type>>();
- if (query.QueryParameterTypes) {
- for (const auto& [name, param] : *query.QueryParameterTypes) {
- astPgParams->insert({name, param});
- }
- }
- if (ast.PgAutoParamValues) {
- const auto& params = dynamic_cast<TKqpAutoParamBuilder*>(ast.PgAutoParamValues.Get())->Values;
- for (const auto& [name, param] : params) {
- astPgParams->insert({name, param.Gettype()});
- }
- }
- }
- return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
- }
- TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) {
- auto uid = QueryIndex.FindPtr(query);
- if (!uid) {
- return nullptr;
- }
- return FindByUid(*uid, promote);
- }
- TKqpCompileResult::TConstPtr FindByAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote) {
- auto uid = AstIndex.FindPtr(GetQueryIdWithAst(query, ast));
- if (!uid) {
- return nullptr;
- }
- return FindByUid(*uid, promote);
- }
- bool EraseByUid(const TString& uid) {
- auto it = Index.find(TItem(uid));
- if (it == Index.end()) {
- return false;
- }
- TItem* item = &const_cast<TItem&>(*it);
- List.Erase(item);
- DecBytes(item->Value.CompileResult->PreparedQuery->ByteSize());
- DecBytes(item->Value.ReplayMessage.size());
- Y_ABORT_UNLESS(item->Value.CompileResult);
- Y_ABORT_UNLESS(item->Value.CompileResult->Query);
- auto queryId = *item->Value.CompileResult->Query;
- QueryIndex.erase(queryId);
- if (item->Value.CompileResult->GetAst()) {
- AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->GetAst()));
- }
- Index.erase(it);
- Y_ABORT_UNLESS(List.GetSize() == Index.size());
- return true;
- }
- size_t Size() const {
- return Index.size();
- }
- ui64 Bytes() const {
- return ByteSize;
- }
- size_t EraseExpiredQueries() {
- auto prevSize = Size();
- auto now = TAppData::TimeProvider->Now();
- while (List.GetSize() && List.GetOldest()->Value.ExpiredAt <= now) {
- EraseByUid(List.GetOldest()->Key);
- }
- Y_ABORT_UNLESS(List.GetSize() == Index.size());
- return prevSize - Size();
- }
- void Clear() {
- List = TList(List.GetMaxSize());
- Index.clear();
- QueryIndex.clear();
- AstIndex.clear();
- ByteSize = 0;
- }
- void DecBytes(ui64 bytes) {
- if (bytes > ByteSize) {
- ByteSize = 0;
- } else {
- ByteSize -= bytes;
- }
- }
- void IncBytes(ui64 bytes) {
- ByteSize += bytes;
- }
- struct TCacheEntry {
- TKqpCompileResult::TConstPtr CompileResult;
- TInstant ExpiredAt;
- TString ReplayMessage = "";
- TInstant LastReplayTime = TInstant::Zero();
- };
- using TList = TLRUList<TString, TCacheEntry>;
- using TItem = TList::TItem;
- TList List;
- THashSet<TItem, TItem::THash> Index;
- THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> QueryIndex;
- THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> AstIndex;
- ui64 ByteSize = 0;
- TDuration Ttl;
struct TKqpCompileSettings {
TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, bool perStatementResult,
const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE)
@@ -449,18 +215,20 @@ public:
return NKikimrServices::TActivity::KQP_COMPILE_SERVICE;
- TKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
+ TKqpCompileService(
+ TKqpQueryCachePtr queryCache,
+ const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
- : TableServiceConfig(tableServiceConfig)
+ : QueryCache(std::move(queryCache))
+ , TableServiceConfig(tableServiceConfig)
, QueryServiceConfig(queryServiceConfig)
, KqpSettings(kqpSettings)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
- , QueryCache(TableServiceConfig.GetCompileQueryCacheSize(), TDuration::Seconds(TableServiceConfig.GetCompileQueryCacheTTLSec()))
, RequestsQueue(TableServiceConfig.GetCompileRequestQueueSize())
, QueryReplayFactory(std::move(queryReplayFactory))
, FederatedQuerySetup(federatedQuerySetup)
@@ -578,7 +346,7 @@ private:
TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams ||
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution) {
- QueryCache.Clear();
+ QueryCache->Clear();
LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE,
"Query cache was invalidated due to config change");
@@ -632,24 +400,24 @@ private:
<< ", split: " << request.Split
<< *request.UserRequestContext);
- *Counters->CompileQueryCacheSize = QueryCache.Size();
- *Counters->CompileQueryCacheBytes = QueryCache.Bytes();
auto userSid = request.UserToken->GetUserSID();
auto dbCounters = request.DbCounters;
- if (request.Uid) {
- Counters->ReportCompileRequestGet(dbCounters);
+ auto compileResult = QueryCache->Find(
+ request.Uid,
+ request.Query,
+ request.TempTablesState,
+ request.KeepInCache,
+ request.UserToken->GetUserSID(),
+ Counters,
+ dbCounters,
+ ev->Sender,
+ ctx);
- auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache);
- if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
- compileResult = nullptr;
- }
+ if (request.Uid) {
if (compileResult) {
if (compileResult->Query->UserSid == userSid) {
- Counters->ReportQueryCacheHit(dbCounters, true);
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid"
<< ", sender: " << ev->Sender
<< ", queryUid: " << *request.Uid);
@@ -663,38 +431,18 @@ private:
<< ", expected sid: " << compileResult->Query->UserSid
<< ", actual sid: " << userSid);
- }
- Counters->ReportQueryCacheHit(dbCounters, false);
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Query not found"
- << ", sender: " << ev->Sender
- << ", queryUid: " << *request.Uid);
- NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid);
- ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
- return;
- }
- Counters->ReportCompileRequestCompile(dbCounters);
- Y_ENSURE(request.Query);
- auto& query = *request.Query;
- if (query.UserSid.empty()) {
- query.UserSid = userSid;
- } else {
- Y_ENSURE(query.UserSid == userSid);
- }
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by queryId, queryId: " << query.SerializeToString());
- auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
- if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
- compileResult = nullptr;
- }
+ } else {
+ Counters->ReportQueryCacheHit(dbCounters, false);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Query not found"
+ << ", sender: " << ev->Sender
+ << ", queryUid: " << *request.Uid);
- if (compileResult) {
- Counters->ReportQueryCacheHit(dbCounters, true);
+ NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid);
+ ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
+ return;
+ }
+ } else if (compileResult) {
+ Y_ENSURE(request.Query);
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text"
<< ", sender: " << ev->Sender
@@ -704,6 +452,8 @@ private:
+ Counters->ReportCompileRequestCompile(dbCounters);
CollectDiagnostics = request.CollectDiagnostics;
@@ -768,7 +518,7 @@ private:
auto dbCounters = request.DbCounters;
- TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false);
+ TKqpCompileResult::TConstPtr compileResult = QueryCache->FindByUid(request.Uid, false);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
@@ -879,7 +629,7 @@ private:
if (ev->Get()->ReplayMessage && !QueryReplayBackend->IsNull()) {
- QueryCache.AttachReplayMessage(compileRequest.Uid, *ev->Get()->ReplayMessage);
+ QueryCache->AttachReplayMessage(compileRequest.Uid, *ev->Get()->ReplayMessage);
auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query);
@@ -890,8 +640,8 @@ private:
} else {
if (!hasTempTablesNameClashes) {
- if (QueryCache.FindByUid(compileResult->Uid, false)) {
- QueryCache.EraseByUid(compileResult->Uid);
+ if (QueryCache->FindByUid(compileResult->Uid, false)) {
+ QueryCache->EraseByUid(compileResult->Uid);
@@ -929,13 +679,13 @@ private:
auto dbCounters = request.DbCounters;
- QueryCache.EraseByUid(request.Uid);
+ QueryCache->EraseByUid(request.Uid);
void HandleTtlTimer(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Received check queries TTL timeout");
- auto evicted = QueryCache.EraseExpiredQueries();
+ auto evicted = QueryCache->EraseExpiredQueries();
if (evicted != 0) {
@@ -943,30 +693,17 @@ private:
- bool HasTempTablesNameClashes(
- TKqpCompileResult::TConstPtr compileResult,
- TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId = false) {
- if (!compileResult) {
- return false;
- }
- if (!compileResult->PreparedQuery) {
- return false;
- }
- return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId);
- }
void UpdateQueryCache(const TActorContext& ctx, TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare, bool isPerStatementExecution) {
- if (QueryCache.FindByUid(compileResult->Uid, false)) {
- QueryCache.Replace(compileResult);
+ if (QueryCache->FindByUid(compileResult->Uid, false)) {
+ QueryCache->Replace(compileResult);
} else if (keepInCache) {
if (compileResult->Query) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert query into compile cache, queryId: " << compileResult->Query->SerializeToString());
- if (QueryCache.FindByQuery(*compileResult->Query, keepInCache)) {
+ if (QueryCache->FindByQuery(*compileResult->Query, keepInCache)) {
LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Trying to insert query into compile cache when it is already there");
- if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) {
+ if (QueryCache->Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) {
if (compileResult->Query && isQueryActionPrepare) {
@@ -983,7 +720,7 @@ private:
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by ast, queryId: " << compileRequest.Query.SerializeToString()
<< ", ast: " << queryAst.Ast->Root->ToString());
- auto compileResult = QueryCache.FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache);
+ auto compileResult = QueryCache->FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache);
if (!compileRequest.FindInCache || HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) {
compileResult = nullptr;
@@ -1067,17 +804,17 @@ private:
queryParameterTypes->insert({param.GetName(), paramType});
query.QueryParameterTypes = queryParameterTypes;
- if (QueryCache.FindByQuery(query, keepInCache)) {
+ if (QueryCache->FindByQuery(query, keepInCache)) {
return false;
- if (compileResult->GetAst() && QueryCache.FindByAst(query, *compileResult->GetAst(), keepInCache)) {
+ if (compileResult->GetAst() && QueryCache->FindByAst(query, *compileResult->GetAst(), keepInCache)) {
return false;
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst);
newCompileResult->AllowCache = compileResult->AllowCache;
newCompileResult->PreparedQuery = compileResult->PreparedQuery;
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString());
- return QueryCache.Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution);
+ return QueryCache->Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution);
void ProcessQueue(const TActorContext& ctx) {
@@ -1158,7 +895,7 @@ private:
TKqpStatsCompile stats;
stats.FromCache = true;
- if (auto replayMessage = QueryCache.ReplayMessageByUid(compileResult->Uid, TDuration::Seconds(TableServiceConfig.GetQueryReplayCacheUploadTTLSec()))) {
+ if (auto replayMessage = QueryCache->ReplayMessageByUid(compileResult->Uid, TDuration::Seconds(TableServiceConfig.GetQueryReplayCacheUploadTTLSec()))) {
@@ -1219,6 +956,8 @@ private:
+ TKqpQueryCachePtr QueryCache;
TTableServiceConfig TableServiceConfig;
TQueryServiceConfig QueryServiceConfig;
TKqpSettings::TConstPtr KqpSettings;
@@ -1226,7 +965,6 @@ private:
TIntrusivePtr<TKqpCounters> Counters;
THolder<IQueryReplayBackend> QueryReplayBackend;
- TKqpQueryCache QueryCache;
TKqpRequestsQueue RequestsQueue;
std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory;
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
@@ -1234,13 +972,315 @@ private:
bool CollectDiagnostics = false;
-IActor* CreateKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
+// QueryCache
+bool TKqpQueryCache::Insert(
+ const TKqpCompileResult::TConstPtr& compileResult,
+ bool isEnableAstCache,
+ bool isPerStatementExecution)
+ TGuard<TAdaptiveLock> guard(Lock);
+ if (!isPerStatementExecution) {
+ InsertQuery(compileResult);
+ }
+ if (isEnableAstCache && compileResult->GetAst()) {
+ InsertAst(compileResult);
+ }
+ auto it = Index.emplace(compileResult->Uid, TCacheEntry{compileResult, TAppData::TimeProvider->Now() + Ttl});
+ Y_ABORT_UNLESS(it.second);
+ TItem* item = &const_cast<TItem&>(*it.first);
+ auto removedItem = List.Insert(item);
+ IncBytes(item->Value.CompileResult->PreparedQuery->ByteSize());
+ if (removedItem) {
+ DecBytes(removedItem->Value.CompileResult->PreparedQuery->ByteSize());
+ auto queryId = *removedItem->Value.CompileResult->Query;
+ QueryIndex.erase(queryId);
+ if (removedItem->Value.CompileResult->GetAst()) {
+ AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->GetAst()));
+ }
+ auto indexIt = Index.find(*removedItem);
+ if (indexIt != Index.end()) {
+ Index.erase(indexIt);
+ }
+ }
+ Y_ABORT_UNLESS(List.GetSize() == Index.size());
+ return removedItem != nullptr;
+void TKqpQueryCache::AttachReplayMessage(const TString uid, TString replayMessage) {
+ TGuard<TAdaptiveLock> guard(Lock);
+ auto it = Index.find(TItem(uid));
+ if (it != Index.end()) {
+ TItem* item = &const_cast<TItem&>(*it);
+ DecBytes(item->Value.ReplayMessage.size());
+ item->Value.ReplayMessage = replayMessage;
+ item->Value.LastReplayTime = TInstant::Now();
+ IncBytes(replayMessage.size());
+ }
+TString TKqpQueryCache::ReplayMessageByUid(const TString uid, TDuration timeout) {
+ TGuard<TAdaptiveLock> guard(Lock);
+ auto it = Index.find(TItem(uid));
+ if (it != Index.end()) {
+ TInstant& lastReplayTime = const_cast<TItem&>(*it).Value.LastReplayTime;
+ TInstant now = TInstant::Now();
+ if (lastReplayTime + timeout < now) {
+ lastReplayTime = now;
+ return it->Value.ReplayMessage;
+ }
+ }
+ return "";
+TKqpCompileResult::TConstPtr TKqpQueryCache::FindByUidImpl(const TString& uid, bool promote) {
+ auto it = Index.find(TItem(uid));
+ if (it != Index.end()) {
+ TItem* item = &const_cast<TItem&>(*it);
+ if (promote) {
+ item->Value.ExpiredAt = TAppData::TimeProvider->Now() + Ttl;
+ List.Promote(item);
+ }
+ return item->Value.CompileResult;
+ }
+ return nullptr;
+TKqpCompileResult::TConstPtr TKqpQueryCache::FindByQueryImpl(const TKqpQueryId& query, bool promote) {
+ auto uid = QueryIndex.FindPtr(query);
+ if (!uid) {
+ return nullptr;
+ }
+ // we're holding read and assume it's recursive
+ return FindByUidImpl(*uid, promote);
+bool TKqpQueryCache::EraseByUidImpl(const TString& uid) {
+ auto it = Index.find(TItem(uid));
+ if (it == Index.end()) {
+ return false;
+ }
+ TItem* item = &const_cast<TItem&>(*it);
+ List.Erase(item);
+ DecBytes(item->Value.CompileResult->PreparedQuery->ByteSize());
+ DecBytes(item->Value.ReplayMessage.size());
+ Y_ABORT_UNLESS(item->Value.CompileResult);
+ Y_ABORT_UNLESS(item->Value.CompileResult->Query);
+ auto queryId = *item->Value.CompileResult->Query;
+ QueryIndex.erase(queryId);
+ if (item->Value.CompileResult->GetAst()) {
+ AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->GetAst()));
+ }
+ Index.erase(it);
+ Y_ABORT_UNLESS(List.GetSize() == Index.size());
+ return true;
+void TKqpQueryCache::Replace(const TKqpCompileResult::TConstPtr& compileResult) {
+ TGuard<TAdaptiveLock> guard(Lock);
+ auto it = Index.find(TItem(compileResult->Uid));
+ if (it != Index.end()) {
+ TItem& item = const_cast<TItem&>(*it);
+ item.Value.CompileResult = compileResult;
+ }
+// find by either uid or query
+TKqpCompileResult::TConstPtr TKqpQueryCache::Find(
+ const TMaybe<TString>& uid,
+ TMaybe<TKqpQueryId>& query,
+ TKqpTempTablesState::TConstPtr tempTablesState,
+ bool promote,
+ const NACLib::TSID& userSid,
+ TIntrusivePtr<TKqpCounters> counters,
+ TKqpDbCountersPtr& dbCounters,
+ const TActorId& sender,
+ const TActorContext& ctx)
+ TGuard<TAdaptiveLock> guard(Lock);
+ *counters->CompileQueryCacheSize = SizeImpl();
+ *counters->CompileQueryCacheBytes = BytesImpl();
+ if (uid) {
+ counters->ReportCompileRequestGet(dbCounters);
+ auto compileResult = FindByUidImpl(*uid, promote);
+ if (HasTempTablesNameClashes(compileResult, tempTablesState)) {
+ compileResult = nullptr;
+ }
+ if (compileResult) {
+ Y_ENSURE(compileResult->Query);
+ if (compileResult->Query->UserSid == userSid) {
+ counters->ReportQueryCacheHit(dbCounters, true);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid"
+ << ", sender: " << sender
+ << ", queryUid: " << *uid);
+ return compileResult;
+ } else {
+ LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
+ << ", sender: " << sender
+ << ", queryUid: " << *uid
+ << ", expected sid: " << compileResult->Query->UserSid
+ << ", actual sid: " << userSid);
+ }
+ }
+ return nullptr;
+ }
+ Y_ENSURE(query);
+ if (query->UserSid.empty()) {
+ query->UserSid = userSid;
+ } else {
+ Y_ENSURE(query->UserSid == userSid);
+ }
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by queryId, queryId: "
+ << query->SerializeToString());
+ auto compileResult = FindByQueryImpl(*query, promote);
+ if (HasTempTablesNameClashes(compileResult, tempTablesState)) {
+ return nullptr;
+ }
+ if (compileResult) {
+ counters->ReportQueryCacheHit(dbCounters, true);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text"
+ << ", sender: " << sender
+ << ", queryUid: " << compileResult->Uid);
+ return compileResult;
+ }
+ // note, we don't report cache miss, because it's up to caller to decide what to do:
+ // in particular, session actor will go to the compile service, which will actually report the miss.
+ return nullptr;
+TKqpCompileResult::TConstPtr TKqpQueryCache::FindByAst(
+ const TKqpQueryId& query,
+ const NYql::TAstParseResult& ast,
+ bool promote)
+ TGuard<TAdaptiveLock> guard(Lock);
+ auto uid = AstIndex.FindPtr(GetQueryIdWithAst(query, ast));
+ if (!uid) {
+ return nullptr;
+ }
+ return FindByUidImpl(*uid, promote);
+size_t TKqpQueryCache::EraseExpiredQueries() {
+ TGuard<TAdaptiveLock> guard(Lock);
+ auto prevSize = SizeImpl();
+ auto now = TAppData::TimeProvider->Now();
+ while (List.GetSize() && List.GetOldest()->Value.ExpiredAt <= now) {
+ EraseByUidImpl(List.GetOldest()->Key);
+ }
+ Y_ABORT_UNLESS(List.GetSize() == Index.size());
+ return prevSize - SizeImpl();
+void TKqpQueryCache::Clear() {
+ TGuard<TAdaptiveLock> guard(Lock);
+ List = TList(List.GetMaxSize());
+ Index.clear();
+ QueryIndex.clear();
+ AstIndex.clear();
+ ByteSize = 0;
+void TKqpQueryCache::InsertQuery(const TKqpCompileResult::TConstPtr& compileResult) {
+ Y_ENSURE(compileResult->Query);
+ auto& query = *compileResult->Query;
+ YQL_ENSURE(compileResult->PreparedQuery);
+ auto queryIt = QueryIndex.emplace(query, compileResult->Uid);
+ if (!queryIt.second) {
+ EraseByUid(compileResult->Uid);
+ QueryIndex.erase(query);
+ }
+ Y_ENSURE(queryIt.second);
+TKqpQueryId TKqpQueryCache::GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast) {
+ Y_ABORT_UNLESS(ast.Root);
+ std::shared_ptr<std::map<TString, Ydb::Type>> astPgParams;
+ if (query.QueryParameterTypes || ast.PgAutoParamValues) {
+ astPgParams = std::make_shared<std::map<TString, Ydb::Type>>();
+ if (query.QueryParameterTypes) {
+ for (const auto& [name, param] : *query.QueryParameterTypes) {
+ astPgParams->insert({name, param});
+ }
+ }
+ if (ast.PgAutoParamValues) {
+ const auto& params = dynamic_cast<TKqpAutoParamBuilder*>(ast.PgAutoParamValues.Get())->Values;
+ for (const auto& [name, param] : params) {
+ astPgParams->insert({name, param.Gettype()});
+ }
+ }
+ }
+ return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
+bool HasTempTablesNameClashes(
+ TKqpCompileResult::TConstPtr compileResult,
+ TKqpTempTablesState::TConstPtr tempTablesState,
+ bool withSessionId)
+ if (!compileResult) {
+ return false;
+ }
+ if (!compileResult->PreparedQuery) {
+ return false;
+ }
+ return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId);
+IActor* CreateKqpCompileService(
+ TKqpQueryCachePtr queryCache,
+ const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
- std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
- )
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup)
- return new TKqpCompileService(tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters,
+ return new TKqpCompileService(
+ std::move(queryCache),
+ tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters,
std::move(queryReplayFactory), federatedQuerySetup);
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h
index 8da1cdead7..853ba5a359 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.h
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.h
@@ -4,17 +4,149 @@
#include <ydb/core/kqp/common/simple/temp_tables.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
+#include <ydb/core/kqp/host/kqp_translate.h>
+#include <util/system/spinlock.h>
namespace NKikimr {
namespace NKqp {
+class TKqpDbCounters;
enum class ECompileActorAction {
-IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
+// Cache is shared between query sessions, compile service and kqp proxy.
+// Currently we don't use RW lock, because of cache promotions during lookup:
+// in benchmark both versions (RW spinlock and adaptive W spinlock) show same
+// performance. Might consider RW lock again in future.
+class TKqpQueryCache: public TThrRefBase {
+ TKqpQueryCache(size_t size, TDuration ttl)
+ : List(size)
+ , Ttl(ttl) {}
+ bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution);
+ void AttachReplayMessage(const TString uid, TString replayMessage);
+ TString ReplayMessageByUid(const TString uid, TDuration timeout);
+ TKqpCompileResult::TConstPtr FindByUid(const TString& uid, bool promote) {
+ TGuard<TAdaptiveLock> guard(Lock);
+ return FindByUidImpl(uid, promote);
+ }
+ void Replace(const TKqpCompileResult::TConstPtr& compileResult);
+ TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) {
+ TGuard<TAdaptiveLock> guard(Lock);
+ return FindByQueryImpl(query, promote);
+ }
+ // find by either uid or query
+ TKqpCompileResult::TConstPtr Find(
+ const TMaybe<TString>& uid,
+ TMaybe<TKqpQueryId>& query,
+ TKqpTempTablesState::TConstPtr tempTablesState,
+ bool promote,
+ const NACLib::TSID& userSid,
+ TIntrusivePtr<TKqpCounters> counters,
+ TIntrusivePtr<TKqpDbCounters>& dbCounters,
+ const TActorId& sender,
+ const TActorContext& ctx);
+ TKqpCompileResult::TConstPtr FindByAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote);
+ bool EraseByUid(const TString& uid) {
+ TGuard<TAdaptiveLock> guard(Lock);
+ return EraseByUidImpl(uid);
+ }
+ size_t Size() const {
+ TGuard<TAdaptiveLock> guard(Lock);
+ return SizeImpl();
+ }
+ ui64 Bytes() const {
+ TGuard<TAdaptiveLock> guard(Lock);
+ return BytesImpl();
+ }
+ ui64 BytesImpl() const {
+ return ByteSize;
+ }
+ size_t EraseExpiredQueries();
+ void Clear();
+ TKqpCompileResult::TConstPtr FindByUidImpl(const TString& uid, bool promote);
+ TKqpCompileResult::TConstPtr FindByQueryImpl(const TKqpQueryId& query, bool promote);
+ bool EraseByUidImpl(const TString& uid);
+ size_t SizeImpl() const {
+ return Index.size();
+ }
+ void InsertQuery(const TKqpCompileResult::TConstPtr& compileResult);
+ void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) {
+ Y_ENSURE(compileResult->Query);
+ Y_ENSURE(compileResult->GetAst());
+ AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->GetAst()), compileResult->Uid);
+ }
+ TKqpQueryId GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast);
+ void DecBytes(ui64 bytes) {
+ if (bytes > ByteSize) {
+ ByteSize = 0;
+ } else {
+ ByteSize -= bytes;
+ }
+ }
+ void IncBytes(ui64 bytes) {
+ ByteSize += bytes;
+ }
+ struct TCacheEntry {
+ TKqpCompileResult::TConstPtr CompileResult;
+ TInstant ExpiredAt;
+ TString ReplayMessage = "";
+ TInstant LastReplayTime = TInstant::Zero();
+ };
+ using TList = TLRUList<TString, TCacheEntry>;
+ using TItem = TList::TItem;
+ TList List;
+ THashSet<TItem, TItem::THash> Index;
+ THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> QueryIndex;
+ THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> AstIndex;
+ ui64 ByteSize = 0;
+ TDuration Ttl;
+ TAdaptiveLock Lock;
+using TKqpQueryCachePtr = TIntrusivePtr<TKqpQueryCache>;
+bool HasTempTablesNameClashes(
+ TKqpCompileResult::TConstPtr compileResult,
+ TKqpTempTablesState::TConstPtr tempTablesState,
+ bool withSessionId = false);
+IActor* CreateKqpCompileService(
+ TKqpQueryCachePtr queryCache,
+ const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 340cdb4e2c..6d813c248b 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -209,6 +209,7 @@ public:
, ModuleResolverState()
, KqpProxySharedResources(std::move(kqpProxySharedResources))
, S3ActorsFactory(std::move(s3ActorsFactory))
+ , QueryCache(new TKqpQueryCache(TableServiceConfig.GetCompileQueryCacheSize(), TDuration::Seconds(TableServiceConfig.GetCompileQueryCacheTTLSec())))
void Bootstrap(const TActorContext &ctx) {
@@ -276,7 +277,9 @@ public:
// Create compile service
- CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, QueryServiceConfig,
+ CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(
+ QueryCache,
+ TableServiceConfig, QueryServiceConfig,
KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), FederatedQuerySetup));
MakeKqpCompileServiceID(SelfId().NodeId()), CompileService);
@@ -1496,7 +1499,7 @@ private:
auto config = CreateConfig(KqpSettings, workerSettings);
- IActor* sessionActor = CreateKqpSessionActor(SelfId(), ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings,
+ IActor* sessionActor = CreateKqpSessionActor(SelfId(), QueryCache, ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings,
FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters,
QueryServiceConfig, KqpTempTablesAgentActor);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
@@ -1902,6 +1905,8 @@ private:
std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources;
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
+ TKqpQueryCachePtr QueryCache;
bool ServerWorkerBalancerComplete = false;
std::optional<TString> SelfDataCenterId;
TIntrusivePtr<IRandomProvider> RandomProvider;
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp
index f1ff6799af..4c02f46b15 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.cpp
+++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp
@@ -1,5 +1,6 @@
#include "kqp_query_state.h"
+#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
namespace NKikimr::NKqp {
@@ -136,27 +137,35 @@ std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildN
return std::make_unique<TEvTxProxySchemeCache::TEvNavigateKeySet>(navigate.Release());
bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
+ CompileStats = ev->Stats;
+ if (!SaveAndCheckCompileResult(ev->CompileResult)) {
+ return false;
+ }
+ Orbit = std::move(ev->Orbit);
+ if (ev->ReplayMessage) {
+ ReplayMessage = *ev->ReplayMessage;
+ }
+ return true;
+bool TKqpQueryState::SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr compileResult) {
CompilationRunning = false;
- CompileResult = ev->CompileResult;
+ CompileResult = compileResult;
MaxReadType = CompileResult->MaxReadType;
- Orbit = std::move(ev->Orbit);
- if (CompileResult->Status != Ydb::StatusIds::SUCCESS)
+ if (CompileResult->Status != Ydb::StatusIds::SUCCESS) {
return false;
+ }
const ui32 compiledVersion = CompileResult->PreparedQuery->GetVersion();
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
"Unexpected prepared query version: " << compiledVersion);
- CompileStats = ev->Stats;
PreparedQuery = CompileResult->PreparedQuery;
- if (ev->ReplayMessage) {
- ReplayMessage = *ev->ReplayMessage;
- }
if (!CommandTagName) {
CommandTagName = CompileResult->CommandTagName;
@@ -185,6 +194,70 @@ bool TKqpQueryState::SaveAndCheckSplitResult(TEvKqp::TEvSplitResponse* ev) {
return ev->Status == Ydb::StatusIds::SUCCESS;
+bool TKqpQueryState::TryGetFromCache(
+ TKqpQueryCache& cache,
+ const TGUCSettings::TPtr& gUCSettingsPtr,
+ TIntrusivePtr<TKqpCounters>& counters,
+ const TActorId& sender)
+ TMaybe<TKqpQueryId> query;
+ TMaybe<TString> uid;
+ TKqpQuerySettings settings(GetType());
+ settings.DocumentApiRestricted = IsDocumentApiRestricted_;
+ settings.IsInternalCall = IsInternalCall();
+ settings.Syntax = GetSyntax();
+ TGUCSettings gUCSettings = gUCSettingsPtr ? *gUCSettingsPtr : TGUCSettings();
+ bool keepInCache = false;
+ switch (GetAction()) {
+ query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings);
+ keepInCache = GetQueryKeepInCache() && query->IsSql();
+ break;
+ query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings);
+ keepInCache = query->IsSql();
+ break;
+ uid = GetPreparedQuery();
+ keepInCache = GetQueryKeepInCache();
+ break;
+ query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings);
+ keepInCache = false;
+ break;
+ default:
+ YQL_ENSURE(false);
+ }
+ CompileStats = {};
+ auto compileResult = cache.Find(
+ uid,
+ query,
+ TempTablesState,
+ keepInCache,
+ UserToken->GetUserSID(),
+ counters,
+ DbCounters,
+ sender,
+ TlsActivationContext->AsActorContext());
+ if (compileResult) {
+ if (SaveAndCheckCompileResult(compileResult)) {
+ CompileStats.FromCache = true;
+ return true;
+ }
+ }
+ return false;
std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) {
TMaybe<TKqpQueryId> query;
TMaybe<TString> uid;
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index da4cc35472..467605df5c 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -26,6 +26,8 @@
namespace NKikimr::NKqp {
+class TKqpQueryCache;
// basically it's a state that holds all the context
// about the specific query execution.
// it holds the unique pointer to the query request, which may include
@@ -412,7 +414,7 @@ public:
const auto& phyQuery = PreparedQuery->GetPhysicalQuery();
auto tx = PreparedQuery->GetPhyTxOrEmpty(CurrentTx);
- if (TxCtx->CanDeferEffects()) {
+ if (TxCtx->CanDeferEffects()) {
// At current time sinks require separate tnx with commit.
while (tx && tx->GetHasEffects() && !TxCtx->HasOlapTable) {
@@ -470,9 +472,17 @@ public:
// execution.
std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> BuildNavigateKeySet();
// same the context of the compiled query to the query state.
+ bool SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr compileResult);
bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev);
bool SaveAndCheckParseResult(TEvKqp::TEvParseResponse&& ev);
bool SaveAndCheckSplitResult(TEvKqp::TEvSplitResponse* ev);
+ bool TryGetFromCache(
+ TKqpQueryCache& cache,
+ const TGUCSettings::TPtr& gUCSettingsPtr,
+ TIntrusivePtr<TKqpCounters>& counters,
+ const TActorId& sender);
// build the compilation request.
std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr);
// TODO(gvit): get rid of code duplication in these requests,
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 71600ee6db..9e6db5c533 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -149,6 +149,7 @@ public:
TKqpSessionActor(const TActorId& owner,
+ TKqpQueryCachePtr queryCache,
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager,
std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory> caFactory,
const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings,
@@ -159,6 +160,7 @@ public:
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const TActorId& kqpTempTablesAgentActor)
: Owner(owner)
+ , QueryCache(std::move(queryCache))
, SessionId(sessionId)
, ResourceManager_(std::move(resourceManager))
, CaFactory_(std::move(caFactory))
@@ -515,12 +517,34 @@ public:
void CompileQuery() {
QueryState->CompilationRunning = true;
+ Become(&TKqpSessionActor::ExecuteState);
+ // quick path
+ if (QueryState->TryGetFromCache(*QueryCache, GUCSettings, Counters, SelfId()) && !QueryState->CompileResult->NeedToSplit) {
+ LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status);
+ // even if we have successfully compilation result, it doesn't mean anything
+ // in terms of current schema version of the table if response of compilation is from the cache.
+ // because of that, we are forcing to run schema version check
+ if (QueryState->NeedCheckTableVersions()) {
+ auto ev = QueryState->BuildNavigateKeySet();
+ Send(MakeSchemeCacheID(), ev.release());
+ return;
+ }
+ OnSuccessCompileRequest();
+ return;
+ }
+ // TODO: in some cases we could reply right here (e.g. there is uid and query is missing), but
+ // for extra sanity we make extra hop to the compile service, which might handle the issue better
auto ev = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings);
LOG_D("Sending CompileQuery request");
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId,
- Become(&TKqpSessionActor::ExecuteState);
void CompileSplittedQuery() {
@@ -611,8 +635,28 @@ public:
void CompileStatement() {
+ // quick path
+ if (QueryState->TryGetFromCache(*QueryCache, GUCSettings, Counters, SelfId()) && !QueryState->CompileResult->NeedToSplit) {
+ LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status);
+ // even if we have successfully compilation result, it doesn't mean anything
+ // in terms of current schema version of the table if response of compilation is from the cache.
+ // because of that, we are forcing to run schema version check
+ if (QueryState->NeedCheckTableVersions()) {
+ auto ev = QueryState->BuildNavigateKeySet();
+ Send(MakeSchemeCacheID(), ev.release());
+ return;
+ }
+ OnSuccessCompileRequest();
+ return;
+ }
+ // TODO: in some cases we could reply right here (e.g. there is uid and query is missing), but
+ // for extra sanity we make extra hop to the compile service, which might handle the issue better
auto request = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings);
- LOG_D("Sending CompileQuery request");
+ LOG_D("Sending CompileQuery request (statement)");
Send(MakeKqpCompileServiceID(SelfId().NodeId()), request.release(), 0, QueryState->QueryId,
@@ -2633,6 +2677,7 @@ private:
TActorId Owner;
+ TKqpQueryCachePtr QueryCache;
TString SessionId;
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
@@ -2673,7 +2718,9 @@ private:
} // namespace
-IActor* CreateKqpSessionActor(const TActorId& owner, std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager,
+IActor* CreateKqpSessionActor(const TActorId& owner,
+ TKqpQueryCachePtr queryCache,
+ std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager,
std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory> caFactory, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
@@ -2682,7 +2729,9 @@ IActor* CreateKqpSessionActor(const TActorId& owner, std::shared_ptr<NKikimr::NK
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const TActorId& kqpTempTablesAgentActor)
- return new TKqpSessionActor(owner, std::move(resourceManager), std::move(caFactory), sessionId, kqpSettings, workerSettings, federatedQuerySetup,
+ return new TKqpSessionActor(
+ owner, std::move(queryCache),
+ std::move(resourceManager), std::move(caFactory), sessionId, kqpSettings, workerSettings, federatedQuerySetup,
std::move(asyncIoFactory), std::move(moduleResolverState), counters,
queryServiceConfig, kqpTempTablesAgentActor);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h
index 50ad957d24..46e673c258 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.h
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.h
@@ -57,7 +57,10 @@ struct TKqpWorkerSettings {
+class TKqpQueryCache;
IActor* CreateKqpSessionActor(const TActorId& owner,
+ TIntrusivePtr<TKqpQueryCache> queryCache,
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager_,
std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory> caFactory_,
const TString& sessionId,