diff options
author | gvit <gvit@ydb.tech> | 2023-03-17 09:56:11 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-03-17 09:56:11 +0300 |
commit | b968d8bab7e871258ee24a0b36cbc52f54c57e93 (patch) | |
tree | e6afb4ef2bbdc2ab4c7401d6a02ee24fe5525118 | |
parent | 702e198576ad3a00300f3f321dfef634f347a971 (diff) | |
download | ydb-b968d8bab7e871258ee24a0b36cbc52f54c57e93.tar.gz |
decouple kqp session actor state into the separate file and introduce cookies to differentiate events; get rid of compile request actor
17 files changed, 582 insertions, 635 deletions
diff --git a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt index c8431e865bd..b3ac2e0be94 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt @@ -21,6 +21,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC ) target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_request.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp ) diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt index 985a5c70ff3..e786b9cc559 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt @@ -22,6 +22,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC ) target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_request.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp ) diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt index 985a5c70ff3..e786b9cc559 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt @@ -22,6 +22,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC ) target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_request.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp ) diff --git a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt index c8431e865bd..b3ac2e0be94 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt @@ -21,6 +21,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC ) target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_request.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp ) diff --git a/ydb/core/kqp/compile_service/kqp_compile_request.cpp b/ydb/core/kqp/compile_service/kqp_compile_request.cpp deleted file mode 100644 index 3b5c7e55622..00000000000 --- a/ydb/core/kqp/compile_service/kqp_compile_request.cpp +++ /dev/null @@ -1,339 +0,0 @@ -#include "kqp_compile_service.h" - -#include <ydb/core/actorlib_impl/long_timer.h> -#include <ydb/core/tx/scheme_cache/scheme_cache.h> - -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/actors/wilson/wilson_span.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/core/log.h> - -#include <ydb/core/kqp/common/kqp_lwtrace_probes.h> -#include <ydb/core/base/wilson.h> - -#include <util/string/escape.h> - -LWTRACE_USING(KQP_PROVIDER); - -namespace NKikimr { -namespace NKqp { - -using namespace NSchemeCache; -using namespace NYql; - -class TKqpCompileRequestActor : public TActorBootstrapped<TKqpCompileRequestActor> { -public: - using TBase = TActorBootstrapped<TKqpCompileRequestActor>; - - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::KQP_COMPILE_REQUEST; - } - - TKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit, - NWilson::TTraceId traceId) - : Owner(owner) - , UserToken(userToken) - , Uid(uid) - , Query(std::move(query)) - , KeepInCache(keepInCache) - , Deadline(deadline) - , DbCounters(dbCounters) - , Orbit{std::move(orbit)} - , CompileRequestSpan(TWilsonKqp::CompileRequest, std::move(traceId), "CompileRequest") {} - - void Bootstrap(const TActorContext& ctx) { - LWTRACK(KqpCompileRequestBootstrap, - Orbit, - Query ? Query->UserSid : 0); - - TMaybe<TKqpQueryId> query; - std::swap(Query, query); - - auto compileEv = MakeHolder<TEvKqp::TEvCompileRequest>(UserToken, Uid, std::move(query), - KeepInCache, Deadline, DbCounters, std::move(Orbit)); - ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId()); - - Become(&TKqpCompileRequestActor::MainState); - } - - void Handle(TEvKqp::TEvCompileResponse::TPtr& ev, const TActorContext &ctx) { - const auto& query = ev->Get()->CompileResult->Query; - LWTRACK(KqpCompileRequestHandleServiceReply, - ev->Get()->Orbit, - query ? query->UserSid : 0); - - auto compileResult = ev->Get()->CompileResult; - const auto& stats = ev->Get()->Stats; - - if (compileResult->Status != Ydb::StatusIds::SUCCESS || !stats.GetFromCache()) { - - if (CompileRequestSpan) { - CompileRequestSpan.End(); - } - - ctx.Send(Owner, ev.Release()); - Die(ctx); - return; - } - - if (!NavigateTables(compileResult->PreparedQuery, compileResult->Query->Database, ctx)) { - - if (CompileRequestSpan) { - CompileRequestSpan.End(); - } - - ctx.Send(Owner, ev.Release()); - Die(ctx); - return; - } - - DeferredResponse.Reset(ev.Release()); - } - - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext &ctx) { - if (ValidateTables(*ev->Get(), ctx)) { - - if (CompileRequestSpan) { - CompileRequestSpan.EndOk(); - } - - ctx.Send(Owner, DeferredResponse.Release()); - Die(ctx); - return; - } - - auto& compileResult = *DeferredResponse->CompileResult; - - LOG_INFO_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Recompiling query due to scheme error" - << ", self: " << ctx.SelfID - << ", queryUid: " << compileResult.Uid); - - auto recompileEv = MakeHolder<TEvKqp::TEvRecompileRequest>(UserToken, compileResult.Uid, compileResult.Query, - Deadline, DbCounters, std::move(DeferredResponse->Orbit)); - ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId()); - - DeferredResponse.Reset(); - } - - void Handle(TEvKqp::TEvAbortExecution::TPtr& , const TActorContext &ctx) { - this->Die(ctx); - } - -private: - STFUNC(MainState) { - try { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKqp::TEvCompileResponse, Handle); - HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - HFunc(TEvKqp::TEvAbortExecution, Handle); - default: - UnexpectedEvent("MainState", ev->GetTypeRewrite(), ctx); - } - } catch (const yexception& e) { - InternalError(e.what(), ctx); - } - } - -private: - void FillTables(const NKqpProto::TKqpPhyTx& phyTx) { - for (const auto& stage : phyTx.GetStages()) { - auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) { - TTableId tableId(table.GetOwnerId(), table.GetTableId()); - auto it = TableVersions.find(tableId); - if (it != TableVersions.end()) { - Y_ENSURE(it->second == table.GetVersion()); - } else { - TableVersions.emplace(tableId, table.GetVersion()); - } - }; - for (const auto& tableOp : stage.GetTableOps()) { - addTable(tableOp.GetTable()); - } - for (const auto& input : stage.GetInputs()) { - if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { - addTable(input.GetStreamLookup().GetTable()); - } - } - - for (const auto& source : stage.GetSources()) { - if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) { - addTable(source.GetReadRangesSource().GetTable()); - } - } - } - } - - bool NavigateTables(const TPreparedQueryHolder::TConstPtr& query, const TString& database, const TActorContext& ctx) { - TableVersions.clear(); - - switch (query->GetVersion()) { - case NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1: - for (const auto& tx : query->GetPhysicalQuery().GetTransactions()) { - FillTables(tx); - } - break; - - default: - LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, - "Unexpected prepared query version" - << ", self: " << ctx.SelfID - << ", version: " << (ui32)query->GetVersion()); - return false; - } - - if (TableVersions.empty()) { - return false; - } - - auto navigate = MakeHolder<TSchemeCacheNavigate>(); - navigate->DatabaseName = database; - if (UserToken && !UserToken->GetSerializedToken().empty()) { - navigate->UserToken = UserToken; - } - - for (const auto& [tableId, _] : TableVersions) { - TSchemeCacheNavigate::TEntry entry; - entry.TableId = tableId; - entry.RequestType = TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; - entry.Operation = TSchemeCacheNavigate::EOp::OpTable; - entry.SyncVersion = false; - entry.ShowPrivatePath = true; - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Query has dependency on table, check the table schema version" - << ", self: " << ctx.SelfID - << ", pathId: " << entry.TableId.PathId - << ", version: " << entry.TableId.SchemaVersion); - - navigate->ResultSet.emplace_back(entry); - } - - auto ev = MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(navigate.Release()); - ctx.Send(MakeSchemeCacheID(), ev.Release()); - return true; - } - - bool ValidateTables(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response, const TActorContext& ctx) { - Y_ENSURE(response.Request); - const auto& navigate = *response.Request; - - for (const auto& entry : navigate.ResultSet) { - switch (entry.Status) { - case TSchemeCacheNavigate::EStatus::Ok: { - auto expectedVersion = TableVersions.FindPtr(TTableId(entry.TableId.PathId)); - if (!expectedVersion) { - LOG_WARN_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, - "Unexpected tableId in scheme cache navigate reply" - << ", self: " << ctx.SelfID - << ", tableId: " << entry.TableId); - continue; - } - - if (!*expectedVersion) { - // Do not check tables with zero version. - continue; - } - - if (entry.TableId.SchemaVersion && entry.TableId.SchemaVersion != *expectedVersion) { - LOG_INFO_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Scheme version mismatch" - << ", self: " << ctx.SelfID - << ", pathId: " << entry.TableId.PathId - << ", expected version: " << *expectedVersion - << ", actual version: " << entry.TableId.SchemaVersion); - return false; - } - - break; - } - - case TSchemeCacheNavigate::EStatus::PathErrorUnknown: - case TSchemeCacheNavigate::EStatus::PathNotTable: - case TSchemeCacheNavigate::EStatus::TableCreationNotComplete: - LOG_INFO_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Scheme error" - << ", self: " << ctx.SelfID - << ", pathId: " << entry.TableId.PathId - << ", status: " << entry.Status); - return false; - - case TSchemeCacheNavigate::EStatus::LookupError: - case TSchemeCacheNavigate::EStatus::RedirectLookupError: - // Transient error, do not invalidate the query. - // Hard validation will be performed later during the query execution. - break; - - default: - // Unexpected reply, do not invalidate the query as it may block the query execution. - // Hard validation will be performed later during the query execution. - LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Unexpected reply from scheme cache" - << ", self: " << ctx.SelfID - << ", pathId: " << entry.TableId.PathId - << ", status: " << entry.Status); - break; - } - } - - return true; - } - -private: - void UnexpectedEvent(const TString& state, ui32 eventType, const TActorContext &ctx) { - InternalError(TStringBuilder() << "TKqpCompileRequestActor, unexpected event: " << eventType - << ", at state:" << state, ctx); - } - - void InternalError(const TString& message, const TActorContext &ctx) { - LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Internal error" - << ", self: " << ctx.SelfID - << ", message: " << message); - - - NYql::TIssue issue(NYql::TPosition(), "Internal error while proccessing query compilation request."); - issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message)); - - ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx); - } - - void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) { - auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues, ETableReadType::Other), std::move(Orbit)); - - if (CompileRequestSpan) { - CompileRequestSpan.EndError(issues.ToOneLineString()); - } - - ctx.Send(Owner, responseEv.Release()); - Die(ctx); - } - -private: - TActorId Owner; - TIntrusiveConstPtr<NACLib::TUserToken> UserToken; - TMaybe<TString> Uid; - TMaybe<TKqpQueryId> Query; - bool KeepInCache = false; - TInstant Deadline; - TKqpDbCountersPtr DbCounters; - THashMap<TTableId, ui64> TableVersions; - THolder<TEvKqp::TEvCompileResponse> DeferredResponse; - NLWTrace::TOrbit Orbit; - NWilson::TSpan CompileRequestSpan; -}; - - -IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit, - NWilson::TTraceId traceId) -{ - return new TKqpCompileRequestActor( - owner, - userToken, - uid, - std::move(query), - keepInCache, - deadline, - dbCounters, - std::move(orbit), - std::move(traceId)); -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 9ad13e9e662..711b9e927bc 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -178,7 +178,7 @@ private: struct TKqpCompileRequest { TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, - NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}) + ui64 cookie, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}) : Sender(sender) , Query(std::move(query)) , Uid(uid) @@ -187,7 +187,9 @@ struct TKqpCompileRequest { , Deadline(deadline) , DbCounters(dbCounters) , Orbit(std::move(orbit)) - , CompileServiceSpan(std::move(span)) {} + , CompileServiceSpan(std::move(span)) + , Cookie(cookie) + {} TActorId Sender; TKqpQueryId Query; @@ -200,6 +202,7 @@ struct TKqpCompileRequest { NLWTrace::TOrbit Orbit; NWilson::TSpan CompileServiceSpan; + ui64 Cookie; }; class TKqpRequestsQueue { @@ -418,7 +421,7 @@ private: } catch (const std::exception& e) { LogException("TEvCompileRequest", ev->Sender, e, ctx); - ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {}); + ReplyInternalError(ev->Sender, "", e.what(), ctx, ev->Cookie, std::move(ev->Get()->Orbit), {}); } } @@ -454,7 +457,7 @@ private: << ", sender: " << ev->Sender << ", queryUid: " << *request.Uid); - ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); + ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); return; } else { LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" @@ -472,7 +475,7 @@ private: << ", queryUid: " << *request.Uid); NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid); - ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); + ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); return; } @@ -495,7 +498,7 @@ private: << ", sender: " << ev->Sender << ", queryUid: " << compileResult->Uid); - ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); + ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); return; } @@ -508,6 +511,7 @@ private: TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), request.KeepInCache, request.UserToken, request.Deadline, dbCounters, + ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -519,7 +523,7 @@ private: NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Exceeded maximum number of requests in compile service queue."); - ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); return; } @@ -536,7 +540,7 @@ private: } catch (const std::exception& e) { LogException("TEvRecompileRequest", ev->Sender, e, ctx); - ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {}); + ReplyInternalError(ev->Sender, "", e.what(), ctx, ev->Cookie, std::move(ev->Get()->Orbit), {}); } } @@ -557,6 +561,7 @@ private: TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, true, request.UserToken, request.Deadline, dbCounters, + ev->Cookie, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(CompileServiceSpan)); @@ -569,7 +574,7 @@ private: NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Exceeded maximum number of requests in compile service queue."); - ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); return; } } else { @@ -581,7 +586,8 @@ private: NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); - ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); + ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, + ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); return; } @@ -625,7 +631,8 @@ private: auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query); for (auto& request : requests) { LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString()); - Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit), std::move(request.CompileServiceSpan)); + Reply(request.Sender, compileResult, compileStats, ctx, + request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan)); } } else { if (QueryCache.FindByUid(compileResult->Uid, false)) { @@ -634,11 +641,13 @@ private: } LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString()); - Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + Reply(compileRequest.Sender, compileResult, compileStats, ctx, + compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); } catch (const std::exception& e) { LogException("TEvCompileResponse", ev->Sender, e, ctx); - ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, + compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); } ProcessQueue(ctx); @@ -696,7 +705,8 @@ private: Counters->ReportCompileRequestTimeout(request->DbCounters); NYql::TIssue issue(NYql::TPosition(), "Compilation timed out."); - ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit), std::move(request->CompileServiceSpan)); + ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, + request->Cookie, std::move(request->Orbit), std::move(request->CompileServiceSpan)); } else { StartCompilation(std::move(*request), ctx); } @@ -724,7 +734,8 @@ private: } void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult, - const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) + const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie, + NLWTrace::TOrbit orbit, NWilson::TSpan span) { const auto& query = compileResult->Query; LWTRACK(KqpCompileServiceReply, @@ -744,34 +755,34 @@ private: span.End(); } - ctx.Send(sender, responseEv.Release()); + ctx.Send(sender, responseEv.Release(), 0, cookie); } void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult, - const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) + const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { NKqpProto::TKqpStatsCompile stats; stats.SetFromCache(true); LWTRACK(KqpCompileServiceReplyFromCache, orbit); - Reply(sender, compileResult, stats, ctx, std::move(orbit), std::move(span)); + Reply(sender, compileResult, stats, ctx, cookie, std::move(orbit), std::move(span)); } void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status, - const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) + const TIssues& issues, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReplyError, orbit); - Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span)); + Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, cookie, std::move(orbit), std::move(span)); } void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message, - const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) + const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation."); issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message)); LWTRACK(KqpCompileServiceReplyInternalError, orbit); - ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit), std::move(span)); + ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, cookie, std::move(orbit), std::move(span)); } static void LogException(const TString& scope, const TActorId& sender, const std::exception& e, diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index 5f7b408724f..aca598f2748 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -21,7 +21,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, - NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {}); + ui64 cookie, NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {}); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/compile_service/ya.make b/ydb/core/kqp/compile_service/ya.make index 2a0bfa651da..3fadfdad449 100644 --- a/ydb/core/kqp/compile_service/ya.make +++ b/ydb/core/kqp/compile_service/ya.make @@ -2,7 +2,6 @@ LIBRARY() SRCS( kqp_compile_actor.cpp - kqp_compile_request.cpp kqp_compile_service.cpp ) diff --git a/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt index d241a939b3a..3b2e287ad0f 100644 --- a/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt @@ -23,4 +23,5 @@ target_sources(core-kqp-session_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_query_state.cpp ) diff --git a/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt index d54d507f651..88931dbd4f6 100644 --- a/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt @@ -24,4 +24,5 @@ target_sources(core-kqp-session_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_query_state.cpp ) diff --git a/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt index d54d507f651..88931dbd4f6 100644 --- a/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt @@ -24,4 +24,5 @@ target_sources(core-kqp-session_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_query_state.cpp ) diff --git a/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt index d241a939b3a..3b2e287ad0f 100644 --- a/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt @@ -23,4 +23,5 @@ target_sources(core-kqp-session_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_query_state.cpp ) diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp new file mode 100644 index 00000000000..ac6b84def53 --- /dev/null +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -0,0 +1,192 @@ +#include "kqp_query_state.h" + +namespace NKikimr::NKqp { + +using namespace NSchemeCache; + +#define LOG_C(msg) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) +#define LOG_E(msg) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) +#define LOG_W(msg) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) +#define LOG_N(msg) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) +#define LOG_I(msg) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) +#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) +#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) + +bool TKqpQueryState::EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response) { + Y_ENSURE(response.Request); + const auto& navigate = *response.Request; + + for (const auto& entry : navigate.ResultSet) { + switch (entry.Status) { + case TSchemeCacheNavigate::EStatus::Ok: { + auto expectedVersion = TableVersions.FindPtr(TTableId(entry.TableId.PathId)); + if (!expectedVersion) { + LOG_W("Unexpected tableId in scheme cache navigate reply" + << ", tableId: " << entry.TableId); + continue; + } + + if (!*expectedVersion) { + // Do not check tables with zero version. + continue; + } + + if (entry.TableId.SchemaVersion && entry.TableId.SchemaVersion != *expectedVersion) { + LOG_I("Scheme version mismatch" + << ", pathId: " << entry.TableId.PathId + << ", expected version: " << *expectedVersion + << ", actual version: " << entry.TableId.SchemaVersion); + return false; + } + + break; + } + + case TSchemeCacheNavigate::EStatus::PathErrorUnknown: + case TSchemeCacheNavigate::EStatus::PathNotTable: + case TSchemeCacheNavigate::EStatus::TableCreationNotComplete: + LOG_I("Scheme error" + << ", pathId: " << entry.TableId.PathId + << ", status: " << entry.Status); + return false; + + case TSchemeCacheNavigate::EStatus::LookupError: + case TSchemeCacheNavigate::EStatus::RedirectLookupError: + // Transient error, do not invalidate the query. + // Hard validation will be performed later during the query execution. + break; + + default: + // Unexpected reply, do not invalidate the query as it may block the query execution. + // Hard validation will be performed later during the query execution. + LOG_E("Unexpected reply from scheme cache" + << ", pathId: " << entry.TableId.PathId + << ", status: " << entry.Status); + break; + } + } + + return true; +} + + +std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildNavigateKeySet() { + TableVersions.clear(); + + for (const auto& tx : PreparedQuery->GetPhysicalQuery().GetTransactions()) { + FillTables(tx); + } + + auto navigate = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + navigate->DatabaseName = Database; + if (UserToken && !UserToken->GetSerializedToken().empty()) { + navigate->UserToken = UserToken; + } + + for (const auto& [tableId, _] : TableVersions) { + NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.TableId = tableId; + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable; + entry.SyncVersion = false; + entry.ShowPrivatePath = true; + navigate->ResultSet.emplace_back(entry); + } + + return std::make_unique<TEvTxProxySchemeCache::TEvNavigateKeySet>(navigate.Release()); +} + + +bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) { + CompileResult = ev->CompileResult; + YQL_ENSURE(CompileResult); + MaxReadType = CompileResult->MaxReadType; + Orbit = std::move(ev->Orbit); + + if (CompileResult->Status != Ydb::StatusIds::SUCCESS) + return false; + + YQL_ENSURE(CompileResult->PreparedQuery); + const ui32 compiledVersion = CompileResult->PreparedQuery->GetVersion(); + YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1, + "Unexpected prepared query version: " << compiledVersion); + + CompileStats.Swap(&ev->Stats); + PreparedQuery = CompileResult->PreparedQuery; + return true; +} + +std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest() { + TMaybe<TKqpQueryId> query; + TMaybe<TString> uid; + + bool keepInCache = false; + switch (GetAction()) { + case NKikimrKqp::QUERY_ACTION_EXECUTE: + query = TKqpQueryId(Cluster, Database, GetQuery(), GetType()); + keepInCache = GetQueryKeepInCache() && query->IsSql(); + break; + + case NKikimrKqp::QUERY_ACTION_PREPARE: + query = TKqpQueryId(Cluster, Database, GetQuery(), GetType()); + keepInCache = query->IsSql(); + break; + + case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: + uid = GetPreparedQuery(); + keepInCache = GetQueryKeepInCache(); + break; + + default: + YQL_ENSURE(false); + } + + if (query) { + query->Settings.DocumentApiRestricted = IsDocumentApiRestricted_; + } + + auto compileDeadline = QueryDeadlines.TimeoutAt; + if (QueryDeadlines.CancelAt) { + compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt); + } + + return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, uid, + std::move(query), keepInCache, compileDeadline, DbCounters, std::move(Orbit)); +} + +std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest() { + YQL_ENSURE(CompileResult); + TMaybe<TKqpQueryId> query; + TMaybe<TString> uid; + + switch (GetAction()) { + case NKikimrKqp::QUERY_ACTION_EXECUTE: + query = TKqpQueryId(Cluster, Database, GetQuery(), GetType()); + break; + + case NKikimrKqp::QUERY_ACTION_PREPARE: + query = TKqpQueryId(Cluster, Database, GetQuery(), GetType()); + break; + + case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: + uid = GetPreparedQuery(); + break; + + default: + YQL_ENSURE(false); + } + + if (query) { + query->Settings.DocumentApiRestricted = IsDocumentApiRestricted_; + } + + auto compileDeadline = QueryDeadlines.TimeoutAt; + if (QueryDeadlines.CancelAt) { + compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt); + } + + return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, + CompileResult->Query, compileDeadline, DbCounters, std::move(Orbit)); +} + +} diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h new file mode 100644 index 00000000000..6d2fb731fe4 --- /dev/null +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -0,0 +1,288 @@ +#pragma once + +#include "kqp_worker_common.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/wilson/wilson_span.h> +#include <library/cpp/actors/wilson/wilson_trace.h> + +#include <ydb/core/base/cputime.h> +#include <ydb/core/base/wilson.h> +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/common/kqp_resolve.h> +#include <ydb/core/kqp/common/kqp_timeouts.h> +#include <ydb/core/kqp/session_actor/kqp_tx.h> + +#include <util/generic/noncopyable.h> + +namespace NKikimr::NKqp { + +// basically it's a state that holds all the context +// about the specific query execution. +// it holds the unique pointer to the query request, which may include +// the context of user RPC (if session is on the same node with user RPC, which is the most +// common case). +class TKqpQueryState : public TNonCopyable { +public: + TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, + const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, + const NKikimrConfig::TTableServiceConfig& config, NWilson::TTraceId&& traceId) + : QueryId(queryId) + , Database(database) + , Cluster(cluster) + , DbCounters(dbCounters) + , Sender(ev->Sender) + , ProxyRequestId(ev->Cookie) + , ParametersSize(ev->Get()->GetParametersSize()) + , RequestActorId(ev->Get()->GetRequestActorId()) + , TraceId(ev->Get()->GetTraceId()) + , IsDocumentApiRestricted_(IsDocumentApiRestricted(ev->Get()->GetRequestType())) + , StartTime(TInstant::Now()) + , KeepSession(ev->Get()->GetKeepSession() || longSession) + , UserToken(ev->Get()->GetUserToken()) + { + RequestEv.reset(ev->Release().Release()); + SetQueryDeadlines(config); + auto action = GetAction(); + KqpSessionSpan = NWilson::TSpan( + TWilsonKqp::KqpSession, std::move(traceId), + "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); + } + + // the monotonously growing counter, the ordinal number of the query, + // executed by the session. + // this counter may be used as a cookie by a session actor to reject events + // with cookie less than current QueryId. + ui64 QueryId = 0; + TString Database; + TString Cluster; + TKqpDbCountersPtr DbCounters; + TActorId Sender; + ui64 ProxyRequestId = 0; + std::unique_ptr<TEvKqp::TEvQueryRequest> RequestEv; + ui64 ParametersSize = 0; + TPreparedQueryHolder::TConstPtr PreparedQuery; + TKqpCompileResult::TConstPtr CompileResult; + NKqpProto::TKqpStatsCompile CompileStats; + TIntrusivePtr<TKqpTransactionContext> TxCtx; + TQueryData::TPtr QueryData; + + TActorId RequestActorId; + + ui64 CurrentTx = 0; + TString TraceId; + bool IsDocumentApiRestricted_ = false; + + TInstant StartTime; + NYql::TKikimrQueryDeadlines QueryDeadlines; + + NKqpProto::TKqpStatsQuery Stats; + bool KeepSession = false; + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + + THashMap<NKikimr::TTableId, ui64> TableVersions; + + NLWTrace::TOrbit Orbit; + NWilson::TSpan KqpSessionSpan; + ETableReadType MaxReadType = ETableReadType::Other; + + TTxId TxId; // User tx + bool Commit = false; + bool Commited = false; + + NTopic::TTopicOperations TopicOperations; + TDuration CpuTime; + std::optional<NCpuTime::TCpuTimer> CurrentTimer; + + NKikimrKqp::EQueryAction GetAction() const { + return RequestEv->GetAction(); + } + + bool GetKeepSession() const { + return RequestEv->GetKeepSession(); + } + + const TString& GetQuery() const { + return RequestEv->GetQuery(); + } + + const TString& GetPreparedQuery() const { + return RequestEv->GetPreparedQuery(); + } + + NKikimrKqp::EQueryType GetType() const { + return RequestEv->GetType(); + } + + void EnsureAction() { + YQL_ENSURE(RequestEv->HasAction()); + } + + bool GetUsePublicResponseDataFormat() const { + return RequestEv->GetUsePublicResponseDataFormat(); + } + + void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) { + auto now = TAppData::TimeProvider->Now(); + auto cancelAfter = RequestEv->GetCancelAfter(); + auto timeout = RequestEv->GetOperationTimeout(); + if (cancelAfter.MilliSeconds() > 0) { + QueryDeadlines.CancelAt = now + cancelAfter; + } + + auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), service); + QueryDeadlines.TimeoutAt = now + timeoutMs; + } + + bool HasTopicOperations() const { + return RequestEv->HasTopicOperations(); + } + + bool GetQueryKeepInCache() const { + return RequestEv->GetQueryKeepInCache(); + } + + const TString& GetDatabase() const { + return RequestEv->GetDatabase(); + } + + // todo: gvit + // fill this hash set only once on query compilation. + void FillTables(const NKqpProto::TKqpPhyTx& phyTx) { + for (const auto& stage : phyTx.GetStages()) { + auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) { + NKikimr::TTableId tableId(table.GetOwnerId(), table.GetTableId()); + auto it = TableVersions.find(tableId); + if (it != TableVersions.end()) { + Y_ENSURE(it->second == table.GetVersion()); + } else { + TableVersions.emplace(tableId, table.GetVersion()); + } + }; + for (const auto& tableOp : stage.GetTableOps()) { + addTable(tableOp.GetTable()); + } + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + addTable(input.GetStreamLookup().GetTable()); + } + } + + for (const auto& source : stage.GetSources()) { + if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) { + addTable(source.GetReadRangesSource().GetTable()); + } + } + } + } + + bool NeedCheckTableVersions() const { + return CompileStats.GetFromCache(); + } + + TString ExtractQueryText() const { + if (CompileResult) { + if (CompileResult->Query) { + return CompileResult->Query->Text; + } + return {}; + } + return RequestEv->GetQuery(); + } + + const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { + return RequestEv->GetTopicOperations(); + } + + bool NeedPersistentSnapshot() const { + auto type = GetType(); + return ( + type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || + type == NKikimrKqp::QUERY_TYPE_AST_SCAN + ); + } + + bool NeedSnapshot(const NYql::TKikimrConfiguration& config) const { + return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery()); + } + + bool HasTxControl() const { + return RequestEv->HasTxControl(); + } + + const ::Ydb::Table::TransactionControl& GetTxControl() const { + return RequestEv->GetTxControl(); + } + + const ::NKikimrMiniKQL::TParams& GetParameters() const { + return RequestEv->GetParameters(); + } + + // validate the compiled query response and ensure that all table versions are not + // changed since the last compilation. + bool EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response); + // builds a request to navigate schema of all tables, that participate in query + // execution. + std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> BuildNavigateKeySet(); + // same the context of the compiled query to the query state. + bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev); + // build the compilation request. + std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest(); + // TODO(gvit): get rid of code duplication in these requests, + // use only one of these requests. + std::unique_ptr<TEvKqp::TEvRecompileRequest> BuildReCompileRequest(); + + const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const { + return RequestEv->GetYdbParameters(); + } + + Ydb::Table::QueryStatsCollection::Mode GetStatsMode() const { + if (!RequestEv->HasCollectStats()) { + return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; + } + + auto cStats = RequestEv->GetCollectStats(); + if (cStats == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) { + return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; + } + + return cStats; + } + + bool ReportStats() const { + return GetStatsMode() != Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE + // always report stats for scripting subrequests + || GetType() == NKikimrKqp::QUERY_TYPE_AST_DML + || GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN + ; + } + + bool HasPreparedQuery() const { + return RequestEv->HasPreparedQuery(); + } + + bool IsStreamResult() const { + auto type = GetType(); + return ( + type == NKikimrKqp::QUERY_TYPE_AST_SCAN || + type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || + type == NKikimrKqp::QUERY_TYPE_SQL_QUERY || + type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY + ); + } + + void ResetTimer() { + if (CurrentTimer) { + CpuTime += CurrentTimer->GetTime(); + CurrentTimer.reset(); + } + } + + TDuration GetCpuTime() { + ResetTimer(); + return CpuTime; + } +}; + + +} diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index ac2ce73af44..a52d197028f 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1,6 +1,7 @@ #include "kqp_session_actor.h" #include "kqp_tx.h" #include "kqp_worker_common.h" +#include "kqp_query_state.h" #include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/core/kqp/common/kqp_ru_calc.h> @@ -44,6 +45,7 @@ namespace NKikimr { namespace NKqp { using namespace NYql; +using namespace NSchemeCache; namespace { @@ -67,180 +69,6 @@ public: {} }; -struct TKqpQueryState { - TActorId Sender; - ui64 ProxyRequestId = 0; - std::unique_ptr<TEvKqp::TEvQueryRequest> RequestEv; - ui64 ParametersSize = 0; - TPreparedQueryHolder::TConstPtr PreparedQuery; - TKqpCompileResult::TConstPtr CompileResult; - NKqpProto::TKqpStatsCompile CompileStats; - TIntrusivePtr<TKqpTransactionContext> TxCtx; - TQueryData::TPtr QueryData; - - TActorId RequestActorId; - - ui64 CurrentTx = 0; - TString TraceId; - bool IsDocumentApiRestricted = false; - - TInstant StartTime; - NYql::TKikimrQueryDeadlines QueryDeadlines; - - NKqpProto::TKqpStatsQuery Stats; - bool KeepSession = false; - TIntrusiveConstPtr<NACLib::TUserToken> UserToken; - - NLWTrace::TOrbit Orbit; - NWilson::TSpan KqpSessionSpan; - ETableReadType MaxReadType = ETableReadType::Other; - - TTxId TxId; // User tx - bool Commit = false; - bool Commited = false; - - NTopic::TTopicOperations TopicOperations; - TDuration CpuTime; - std::optional<NCpuTime::TCpuTimer> CurrentTimer; - - NKikimrKqp::EQueryAction GetAction() const { - return RequestEv->GetAction(); - } - - bool GetKeepSession() const { - return RequestEv->GetKeepSession(); - } - - const TString& GetQuery() const { - return RequestEv->GetQuery(); - } - - const TString& GetPreparedQuery() const { - return RequestEv->GetPreparedQuery(); - } - - NKikimrKqp::EQueryType GetType() const { - return RequestEv->GetType(); - } - - void EnsureAction() { - YQL_ENSURE(RequestEv->HasAction()); - } - - bool GetUsePublicResponseDataFormat() const { - return RequestEv->GetUsePublicResponseDataFormat(); - } - - void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) { - auto now = TAppData::TimeProvider->Now(); - auto cancelAfter = RequestEv->GetCancelAfter(); - auto timeout = RequestEv->GetOperationTimeout(); - if (cancelAfter.MilliSeconds() > 0) { - QueryDeadlines.CancelAt = now + cancelAfter; - } - - auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), service); - QueryDeadlines.TimeoutAt = now + timeoutMs; - } - - bool HasTopicOperations() const { - return RequestEv->HasTopicOperations(); - } - - bool GetQueryKeepInCache() const { - return RequestEv->GetQueryKeepInCache(); - } - - const TString& GetDatabase() const { - return RequestEv->GetDatabase(); - } - - TString ExtractQueryText() const { - if (CompileResult) { - if (CompileResult->Query) { - return CompileResult->Query->Text; - } - return {}; - } - return RequestEv->GetQuery(); - } - - const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { - return RequestEv->GetTopicOperations(); - } - - bool NeedPersistentSnapshot() const { - auto type = GetType(); - return ( - type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || - type == NKikimrKqp::QUERY_TYPE_AST_SCAN - ); - } - - bool HasTxControl() const { - return RequestEv->HasTxControl(); - } - - const ::Ydb::Table::TransactionControl& GetTxControl() const { - return RequestEv->GetTxControl(); - } - - const ::NKikimrMiniKQL::TParams& GetParameters() const { - return RequestEv->GetParameters(); - } - - const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const { - return RequestEv->GetYdbParameters(); - } - - Ydb::Table::QueryStatsCollection::Mode GetStatsMode() const { - if (!RequestEv->HasCollectStats()) { - return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; - } - - auto cStats = RequestEv->GetCollectStats(); - if (cStats == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) { - return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; - } - - return cStats; - } - - bool ReportStats() const { - return GetStatsMode() != Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE - // always report stats for scripting subrequests - || GetType() == NKikimrKqp::QUERY_TYPE_AST_DML - || GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN - ; - } - - bool HasPreparedQuery() const { - return RequestEv->HasPreparedQuery(); - } - - bool IsStreamResult() const { - auto type = GetType(); - return ( - type == NKikimrKqp::QUERY_TYPE_AST_SCAN || - type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || - type == NKikimrKqp::QUERY_TYPE_SQL_QUERY || - type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY - ); - } - - void ResetTimer() { - if (CurrentTimer) { - CpuTime += CurrentTimer->GetTime(); - CurrentTimer.reset(); - } - } - - TDuration GetCpuTime() { - ResetTimer(); - return CpuTime; - } -}; - struct TKqpCleanupCtx { std::deque<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; bool IsWaitingForWorkerToClose = false; @@ -325,29 +153,17 @@ public: void MakeNewQueryState(TEvKqp::TEvQueryRequest::TPtr& ev) { ++QueryId; YQL_ENSURE(!QueryState); - QueryState = std::make_shared<TKqpQueryState>(); - QueryState->Sender = ev->Sender; - QueryState->ProxyRequestId = ev->Cookie; - QueryState->TraceId = ev->Get()->GetTraceId(); - QueryState->IsDocumentApiRestricted = IsDocumentApiRestricted(ev->Get()->GetRequestType()); - QueryState->StartTime = TInstant::Now(); - QueryState->UserToken = ev->Get()->GetUserToken(); - QueryState->ParametersSize = ev->Get()->GetParametersSize(); - QueryState->RequestActorId = ev->Get()->GetRequestActorId(); - auto selfId = SelfId(); - auto as = TActivationContext::ActorSystem(); - ev->Get()->SetClientLostAction(selfId, as); - QueryState->RequestEv.reset(ev->Release().Release()); - QueryState->KeepSession = Settings.LongSession || QueryState->GetKeepSession(); - QueryState->SetQueryDeadlines(Settings.Service); - auto action = QueryState->GetAction(); - NWilson::TTraceId id; if (false) { // change to enable Wilson tracing id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>()); LOG_I("wilson tracing started, id: " + std::to_string(id.GetTraceId())); } - QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); + auto selfId = SelfId(); + auto as = TActivationContext::ActorSystem(); + ev->Get()->SetClientLostAction(selfId, as); + QueryState = std::make_shared<TKqpQueryState>( + ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession, + Settings.Service, std::move(id)); } bool ConvertParameters() { @@ -614,46 +430,8 @@ public: void CompileQuery() { YQL_ENSURE(QueryState); - TMaybe<TKqpQueryId> query; - TMaybe<TString> uid; - - bool keepInCache = false; - switch (QueryState->GetAction()) { - case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, QueryState->GetQuery(), QueryState->GetType()); - keepInCache = QueryState->GetQueryKeepInCache() && query->IsSql(); - break; - - case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, QueryState->GetQuery(), QueryState->GetType()); - keepInCache = query->IsSql(); - break; - - case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: - uid = QueryState->GetPreparedQuery(); - keepInCache = QueryState->GetQueryKeepInCache(); - break; - - default: - YQL_ENSURE(false); - } - - if (query) { - query->Settings.DocumentApiRestricted = QueryState->IsDocumentApiRestricted; - } - - auto compileDeadline = QueryState->QueryDeadlines.TimeoutAt; - if (QueryState->QueryDeadlines.CancelAt) { - compileDeadline = Min(compileDeadline, QueryState->QueryDeadlines.CancelAt); - } - - auto compileRequestActor = CreateKqpCompileRequestActor(SelfId(), QueryState->UserToken, uid, - std::move(query), keepInCache, compileDeadline, Settings.DbCounters, - QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit(), - QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId()); - - CompileActorId = RegisterWithSameMailbox(compileRequestActor); - + auto ev = QueryState->BuildCompileRequest(); + Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId); Become(&TKqpSessionActor::CompileState); } @@ -661,38 +439,53 @@ public: ReplyBusy(ev); } + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto* response = ev->Get(); + + // table versions are not the same. need the query recompilation. + if (!QueryState->EnsureTableVersions(*response)) { + auto ev = QueryState->BuildReCompileRequest(); + Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId); + return; + } + + OnSuccessCompileRequest(); + } + void Handle(TEvKqp::TEvCompileResponse::TPtr& ev) { - if (ev->Sender != CompileActorId) { + // outdated event from previous query. + // ignoring that. + if (ev->Cookie < QueryId) { return; } - CompileActorId = TActorId{}; - TTimerGuard timer(this); - auto compileResult = ev->Get()->CompileResult; - YQL_ENSURE(compileResult); YQL_ENSURE(QueryState); - QueryState->MaxReadType = compileResult->MaxReadType; - QueryState->Orbit = std::move(ev->Get()->Orbit); - - LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status); + TTimerGuard timer(this); - if (compileResult->Status != Ydb::StatusIds::SUCCESS) { - ReplyQueryCompileError(compileResult); + // saving compile response and checking that compilation status + // is success. + if (!QueryState->SaveAndCheckCompileResult(ev->Get())) { + LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status); + ReplyQueryCompileError(); return; } - YQL_ENSURE(compileResult->PreparedQuery); - const ui32 compiledVersion = compileResult->PreparedQuery->GetVersion(); - YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1, - "Unexpected prepared query version: " << compiledVersion); + LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status); + // even if we have successfully compilation result, it doesn't mean anything + // in terms of current schema version of the table if response of compilation is from the cache. + // because of that, we are forcing to run schema version check + if (QueryState->NeedCheckTableVersions()) { + auto ev = QueryState->BuildNavigateKeySet(); + Send(MakeSchemeCacheID(), ev.release()); + return; + } - QueryState->CompileResult = compileResult; - QueryState->CompileStats.Swap(&ev->Get()->Stats); - QueryState->PreparedQuery = compileResult->PreparedQuery; + OnSuccessCompileRequest(); + } + void OnSuccessCompileRequest() { if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) { - ReplyPrepareResult(compileResult); - return; + return ReplyPrepareResult(); } if (!PrepareQueryContext()) { @@ -704,11 +497,11 @@ public: QueryState->TxCtx->OnBeginQuery(); if (QueryState->NeedPersistentSnapshot()) { - return AcquirePersistentSnapshot(); - } else if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit, - QueryState->PreparedQuery->GetPhysicalQuery())) - { - return AcquireMvccSnapshot(); + AcquirePersistentSnapshot(); + return; + } else if (QueryState->NeedSnapshot(*Config)) { + AcquireMvccSnapshot(); + return; } // Can reply inside (in case of deferred-only transactions) and become ReadyState @@ -917,9 +710,10 @@ public: } } - void ReplyPrepareResult(const TKqpCompileResult::TConstPtr& compileResult) { + void ReplyPrepareResult() { + YQL_ENSURE(QueryState); QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>(); - FillCompileStatus(compileResult, QueryResponse->Record); + FillCompileStatus(QueryState->CompileResult, QueryResponse->Record); auto ru = NRuCalc::CpuTimeToUnit(TDuration::MicroSeconds(QueryState->CompileStats.GetCpuTimeUs())); auto& record = QueryResponse->Record.GetRef(); @@ -1405,11 +1199,6 @@ public: void Handle(TEvKqp::TEvAbortExecution::TPtr& ev) { auto& msg = ev->Get()->Record; - if (CompileActorId) { - auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded"); - Send(std::exchange(CompileActorId, {}), abortEv.Release()); - } - if (ExecuterId) { auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded"); Send(std::exchange(ExecuterId, {}), abortEv.Release()); @@ -1639,9 +1428,10 @@ public: Cleanup(); } - void ReplyQueryCompileError(const TKqpCompileResult::TConstPtr& compileResult) { + void ReplyQueryCompileError() { + YQL_ENSURE(QueryState); QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>(); - FillCompileStatus(compileResult, QueryResponse->Record); + FillCompileStatus(QueryState->CompileResult, QueryResponse->Record); auto txId = TTxId(); if (QueryState->HasTxControl()) { @@ -1651,7 +1441,7 @@ public: } } - LOG_W("ReplyQueryCompileError, status " << compileResult->Status << " remove tx with tx_id: " << txId.GetHumanStr()); + LOG_W("ReplyQueryCompileError, status " << QueryState->CompileResult->Status << " remove tx with tx_id: " << txId.GetHumanStr()); if (auto ctx = Transactions.ReleaseTransaction(txId)) { ctx->Invalidate(); Transactions.AddToBeAborted(std::move(ctx)); @@ -2008,6 +1798,7 @@ public: // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, HandleNoop); hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); default: UnexpectedEvent("ReadyState", ev); } @@ -2032,6 +1823,7 @@ public: // forgotten messages from previous aborted request hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); default: UnexpectedEvent("CompileState", ev); } @@ -2061,6 +1853,7 @@ public: // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); @@ -2089,6 +1882,7 @@ public: // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, HandleNoop); hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); @@ -2261,7 +2055,6 @@ private: TIntrusivePtr<TModuleResolverState> ModuleResolverState; TKqpSettings::TConstPtr KqpSettings; std::optional<TActorId> WorkerId; - TActorId CompileActorId; TActorId ExecuterId; std::shared_ptr<TKqpQueryState> QueryState; diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h index cb2ee2a7cf8..481fa2ce896 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.h +++ b/ydb/core/kqp/session_actor/kqp_worker_common.h @@ -1,3 +1,5 @@ +#pragma once + #include "kqp_session_actor.h" #include <ydb/core/kqp/common/kqp.h> diff --git a/ydb/core/kqp/session_actor/ya.make b/ydb/core/kqp/session_actor/ya.make index 600976976f1..3fa3c490f82 100644 --- a/ydb/core/kqp/session_actor/ya.make +++ b/ydb/core/kqp/session_actor/ya.make @@ -6,6 +6,7 @@ SRCS( kqp_tx.cpp kqp_worker_actor.cpp kqp_worker_common.cpp + kqp_query_state.cpp ) PEERDIR( |