diff options
author | mdartemenko <mdartemenko@yandex-team.com> | 2022-09-12 15:31:50 +0300 |
---|---|---|
committer | mdartemenko <mdartemenko@yandex-team.com> | 2022-09-12 15:31:50 +0300 |
commit | e6355abc70da5262b2ac0b696e3354bbcfeafc5e (patch) | |
tree | 5251a4ee42ad8c92ac74915952a2b04006f5936b | |
parent | 71cbed4fa838d7e7a4bebf535d71d382af05113c (diff) | |
download | ydb-e6355abc70da5262b2ac0b696e3354bbcfeafc5e.tar.gz |
move scan/full_scan fetch from reply in session_actor to compile_actor
-rw-r--r-- | ydb/core/kqp/kqp.h | 27 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_request.cpp | 34 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_service.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_common.h | 7 |
7 files changed, 63 insertions, 55 deletions
diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h index 3652f129c7..9168308a31 100644 --- a/ydb/core/kqp/kqp.h +++ b/ydb/core/kqp/kqp.h @@ -16,6 +16,12 @@ namespace NKikimr { namespace NKqp { +enum class ETableReadType { + Other = 0, + Scan = 1, + FullScan = 2, +}; + const TStringBuf DefaultKikimrPublicClusterName = "db"; inline NActors::TActorId MakeKqpProxyID(ui32 nodeId) { @@ -174,27 +180,30 @@ struct TKqpCompileResult { using TConstPtr = std::shared_ptr<const TKqpCompileResult>; TKqpCompileResult(const TString& uid, TKqpQueryId&& query, const Ydb::StatusIds::StatusCode& status, - const NYql::TIssues& issues) + const NYql::TIssues& issues, ETableReadType maxReadType) : Status(status) , Issues(issues) , Query(std::move(query)) - , Uid(uid) {} + , Uid(uid) + , MaxReadType(maxReadType) {} - TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues) + TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues, + ETableReadType maxReadType) : Status(status) , Issues(issues) - , Uid(uid) {} + , Uid(uid) + , MaxReadType(maxReadType) {} static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, TKqpQueryId&& query, - const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues) + const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues, ETableReadType maxReadType) { - return std::make_shared<TKqpCompileResult>(uid, std::move(query), status, issues); + return std::make_shared<TKqpCompileResult>(uid, std::move(query), status, issues, maxReadType); } static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status, - const NYql::TIssues& issues) + const NYql::TIssues& issues, ETableReadType maxReadType) { - return std::make_shared<TKqpCompileResult>(uid, status, issues); + return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType); } Ydb::StatusIds::StatusCode Status; @@ -203,6 +212,8 @@ struct TKqpCompileResult { TMaybe<TKqpQueryId> Query; TString Uid; + ETableReadType MaxReadType; + TPreparedQueryConstPtr PreparedQuery; TPreparedQueryConstPtr PreparedQueryNewEngine; std::optional<TQueryTraits> QueryTraits; diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp index 44f2a035f5..e571dc5a6b 100644 --- a/ydb/core/kqp/kqp_compile_actor.cpp +++ b/ydb/core/kqp/kqp_compile_actor.cpp @@ -1,6 +1,8 @@ #include "kqp_impl.h" #include "kqp_metadata_loader.h" +#include "kqp_worker_common.h" + #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/wilson.h> @@ -230,7 +232,7 @@ private: } void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) { - Reply(TKqpCompileResult::Make(Uid, std::move(Query), status, issues), ctx); + Reply(TKqpCompileResult::Make(Uid, std::move(Query), status, issues, ETableReadType::Other), ctx); } void InternalError(const TString message, const TActorContext &ctx) { @@ -273,7 +275,9 @@ private: AddMessageToReplayLog(kqpResult.QueryPlan); } - KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(Query), status, kqpResult.Issues()); + ETableReadType maxReadType = ExtractMostHeavyReadType(kqpResult.QueryPlan); + + KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(Query), status, kqpResult.Issues(), maxReadType); if (status == Ydb::StatusIds::SUCCESS) { YQL_ENSURE(kqpResult.PreparingQuery); diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp index a06a1a43c0..75ff3733c0 100644 --- a/ydb/core/kqp/kqp_compile_request.cpp +++ b/ydb/core/kqp/kqp_compile_request.cpp @@ -41,10 +41,10 @@ public: , DbCounters(dbCounters) , Orbit{std::move(orbit)} , CompileRequestSpan(TWilsonKqp::CompileRequest, std::move(traceId), "CompileRequest") {} - + void Bootstrap(const TActorContext& ctx) { - LWTRACK(KqpCompileRequestBootstrap, - Orbit, + LWTRACK(KqpCompileRequestBootstrap, + Orbit, Query ? Query->UserSid : 0); TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(), @@ -62,10 +62,10 @@ public: void Handle(TEvKqp::TEvCompileResponse::TPtr& ev, const TActorContext &ctx) { const auto& query = ev->Get()->CompileResult->Query; - LWTRACK(KqpCompileRequestHandleServiceReply, - ev->Get()->Orbit, + LWTRACK(KqpCompileRequestHandleServiceReply, + ev->Get()->Orbit, query ? query->UserSid : 0); - + auto compileResult = ev->Get()->CompileResult; const auto& stats = ev->Get()->Stats; @@ -81,7 +81,7 @@ public: } if (!NavigateTables(*compileResult->PreparedQuery, compileResult->Query->Database, ctx)) { - + if (CompileRequestSpan) { CompileRequestSpan.End(); } @@ -313,12 +313,12 @@ private: } void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) { - auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues), std::move(Orbit)); + auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues, ETableReadType::Other), std::move(Orbit)); if (CompileRequestSpan) { CompileRequestSpan.EndError(issues.ToOneLineString()); } - + ctx.Send(Owner, responseEv.Release()); Die(ctx); } @@ -341,17 +341,17 @@ private: IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit, + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit, NWilson::TTraceId traceId) { return new TKqpCompileRequestActor( - owner, - userToken, - uid, - std::move(query), - keepInCache, - deadline, - dbCounters, + owner, + userToken, + uid, + std::move(query), + keepInCache, + deadline, + dbCounters, std::move(orbit), std::move(traceId)); } diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp index efba6222ff..cc11fa520c 100644 --- a/ydb/core/kqp/kqp_compile_service.cpp +++ b/ydb/core/kqp/kqp_compile_service.cpp @@ -189,7 +189,7 @@ private: struct TKqpCompileRequest { TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache, - const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, + const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}) : Sender(sender) , Query(std::move(query)) @@ -411,8 +411,8 @@ private: void Handle(TEvKqp::TEvCompileRequest::TPtr& ev, const TActorContext& ctx) { const auto& query = ev->Get()->Query; - LWTRACK(KqpCompileServiceHandleRequest, - ev->Get()->Orbit, + LWTRACK(KqpCompileServiceHandleRequest, + ev->Get()->Orbit, query ? query->UserSid : 0); try { @@ -503,13 +503,13 @@ private: Counters->ReportQueryCacheHit(dbCounters, false); - LWTRACK(KqpCompileServiceEnqueued, - ev->Get()->Orbit, + LWTRACK(KqpCompileServiceEnqueued, + ev->Get()->Orbit, ev->Get()->Query ? ev->Get()->Query->UserSid : 0); - + TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), - request.KeepInCache, request.UserToken, request.Deadline, dbCounters, + request.KeepInCache, request.UserToken, request.Deadline, dbCounters, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -558,8 +558,8 @@ private: NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, - true, request.UserToken, request.Deadline, dbCounters, - ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), + true, request.UserToken, request.Deadline, dbCounters, + ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(CompileServiceSpan)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -733,9 +733,9 @@ private: const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) { const auto& query = compileResult->Query; - LWTRACK(KqpCompileServiceReply, - orbit, - query ? query->UserSid : 0, + LWTRACK(KqpCompileServiceReply, + orbit, + query ? query->UserSid : 0, compileResult->Issues.ToString()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Send response" @@ -754,7 +754,7 @@ private: if (span) { span.End(); } - + ctx.Send(sender, responseEv.Release()); } @@ -772,7 +772,7 @@ private: const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReplyError, orbit); - Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span)); + Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span)); } void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message, diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h index 0a34d68f84..f9b3333bc2 100644 --- a/ydb/core/kqp/kqp_impl.h +++ b/ydb/core/kqp/kqp_impl.h @@ -55,7 +55,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId = {}); IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {}); struct TKqpWorkerSettings { diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index ae5e3cdabf..abdb105a5c 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -92,6 +92,7 @@ struct TKqpQueryState { NLWTrace::TOrbit Orbit; NWilson::TSpan KqpSessionSpan; + ETableReadType MaxReadType = ETableReadType::Other; TULID TxId; // User tx TString TxId_Human = ""; @@ -496,6 +497,7 @@ public: void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) { auto compileResult = ev->Get()->CompileResult; QueryState->Orbit = std::move(ev->Get()->Orbit); + QueryState->MaxReadType = compileResult->MaxReadType; YQL_ENSURE(compileResult); YQL_ENSURE(QueryState); @@ -1396,16 +1398,14 @@ public: Counters->ReportQueryLatency(Settings.DbCounters, QueryState->Request.GetAction(), queryDuration); - auto& stats = QueryState->Stats; - auto plan = SerializeAnalyzePlan(stats); - - auto maxReadType = ExtractMostHeavyReadType(plan); - if (maxReadType == ETableReadType::FullScan) { + if (QueryState->MaxReadType == ETableReadType::FullScan) { Counters->ReportQueryWithFullScan(Settings.DbCounters); - } else if (maxReadType == ETableReadType::Scan) { + } else if (QueryState->MaxReadType == ETableReadType::Scan) { Counters->ReportQueryWithRangeScan(Settings.DbCounters); } + auto& stats = QueryState->Stats; + ui32 affectedShardsCount = 0; ui64 readBytesCount = 0; ui64 readRowsCount = 0; diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h index bc29a7e93d..be308c066e 100644 --- a/ydb/core/kqp/kqp_worker_common.h +++ b/ydb/core/kqp/kqp_worker_common.h @@ -5,7 +5,6 @@ #include <util/datetime/base.h> #include <util/generic/string.h> - namespace NKikimr::NKqp { struct TSessionShutdownState { @@ -82,12 +81,6 @@ inline TIntrusivePtr<NYql::TKikimrConfiguration> CreateConfig(const TKqpSettings return cfg; } -enum ETableReadType { - Other = 0, - Scan = 1, - FullScan = 2, -}; - inline ETableReadType ExtractMostHeavyReadType(const TString& queryPlan) { ETableReadType maxReadType = ETableReadType::Other; |