diff options
author | mdartemenko <mdartemenko@yandex-team.com> | 2022-09-19 17:01:48 +0300 |
---|---|---|
committer | mdartemenko <mdartemenko@yandex-team.com> | 2022-09-19 17:01:48 +0300 |
commit | 168273e822f2d73c568d8c8ce32220abe497d60b (patch) | |
tree | 936cdf2f8af13ce134fa6738e33975656691ca5c | |
parent | ddbed3bdc9fbfec40475f0e3939187bb6ce716c3 (diff) | |
download | ydb-168273e822f2d73c568d8c8ce32220abe497d60b.tar.gz |
PR from branch users/mdartemenko/fix_whitespaces
Revert "move scan/full_scan fetch from reply in session_actor to compile_actor"
This reverts commit 2823e7875f243a68929c7d325725e1f3fbb85af2, reversing
changes made to cf96b119658d4b4a43f18537d6f81e3d9dfcbf1b.
-rw-r--r-- | ydb/core/kqp/kqp.h | 27 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_request.cpp | 34 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_service.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_common.h | 7 |
7 files changed, 55 insertions, 63 deletions
diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h index 9168308a314..3652f129c7d 100644 --- a/ydb/core/kqp/kqp.h +++ b/ydb/core/kqp/kqp.h @@ -16,12 +16,6 @@ namespace NKikimr { namespace NKqp { -enum class ETableReadType { - Other = 0, - Scan = 1, - FullScan = 2, -}; - const TStringBuf DefaultKikimrPublicClusterName = "db"; inline NActors::TActorId MakeKqpProxyID(ui32 nodeId) { @@ -180,30 +174,27 @@ struct TKqpCompileResult { using TConstPtr = std::shared_ptr<const TKqpCompileResult>; TKqpCompileResult(const TString& uid, TKqpQueryId&& query, const Ydb::StatusIds::StatusCode& status, - const NYql::TIssues& issues, ETableReadType maxReadType) + const NYql::TIssues& issues) : Status(status) , Issues(issues) , Query(std::move(query)) - , Uid(uid) - , MaxReadType(maxReadType) {} + , Uid(uid) {} - TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues, - ETableReadType maxReadType) + TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues) : Status(status) , Issues(issues) - , Uid(uid) - , MaxReadType(maxReadType) {} + , Uid(uid) {} static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, TKqpQueryId&& query, - const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues, ETableReadType maxReadType) + const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues) { - return std::make_shared<TKqpCompileResult>(uid, std::move(query), status, issues, maxReadType); + return std::make_shared<TKqpCompileResult>(uid, std::move(query), status, issues); } static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status, - const NYql::TIssues& issues, ETableReadType maxReadType) + const NYql::TIssues& issues) { - return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType); + return std::make_shared<TKqpCompileResult>(uid, status, issues); } Ydb::StatusIds::StatusCode Status; @@ -212,8 +203,6 @@ struct TKqpCompileResult { TMaybe<TKqpQueryId> Query; TString Uid; - ETableReadType MaxReadType; - TPreparedQueryConstPtr PreparedQuery; TPreparedQueryConstPtr PreparedQueryNewEngine; std::optional<TQueryTraits> QueryTraits; diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp index e571dc5a6b0..44f2a035f5e 100644 --- a/ydb/core/kqp/kqp_compile_actor.cpp +++ b/ydb/core/kqp/kqp_compile_actor.cpp @@ -1,8 +1,6 @@ #include "kqp_impl.h" #include "kqp_metadata_loader.h" -#include "kqp_worker_common.h" - #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/wilson.h> @@ -232,7 +230,7 @@ private: } void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) { - Reply(TKqpCompileResult::Make(Uid, std::move(Query), status, issues, ETableReadType::Other), ctx); + Reply(TKqpCompileResult::Make(Uid, std::move(Query), status, issues), ctx); } void InternalError(const TString message, const TActorContext &ctx) { @@ -275,9 +273,7 @@ private: AddMessageToReplayLog(kqpResult.QueryPlan); } - ETableReadType maxReadType = ExtractMostHeavyReadType(kqpResult.QueryPlan); - - KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(Query), status, kqpResult.Issues(), maxReadType); + KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(Query), status, kqpResult.Issues()); if (status == Ydb::StatusIds::SUCCESS) { YQL_ENSURE(kqpResult.PreparingQuery); diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp index 75ff3733c03..a06a1a43c0d 100644 --- a/ydb/core/kqp/kqp_compile_request.cpp +++ b/ydb/core/kqp/kqp_compile_request.cpp @@ -41,10 +41,10 @@ public: , DbCounters(dbCounters) , Orbit{std::move(orbit)} , CompileRequestSpan(TWilsonKqp::CompileRequest, std::move(traceId), "CompileRequest") {} - + void Bootstrap(const TActorContext& ctx) { - LWTRACK(KqpCompileRequestBootstrap, - Orbit, + LWTRACK(KqpCompileRequestBootstrap, + Orbit, Query ? Query->UserSid : 0); TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(), @@ -62,10 +62,10 @@ public: void Handle(TEvKqp::TEvCompileResponse::TPtr& ev, const TActorContext &ctx) { const auto& query = ev->Get()->CompileResult->Query; - LWTRACK(KqpCompileRequestHandleServiceReply, - ev->Get()->Orbit, + LWTRACK(KqpCompileRequestHandleServiceReply, + ev->Get()->Orbit, query ? query->UserSid : 0); - + auto compileResult = ev->Get()->CompileResult; const auto& stats = ev->Get()->Stats; @@ -81,7 +81,7 @@ public: } if (!NavigateTables(*compileResult->PreparedQuery, compileResult->Query->Database, ctx)) { - + if (CompileRequestSpan) { CompileRequestSpan.End(); } @@ -313,12 +313,12 @@ private: } void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) { - auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues, ETableReadType::Other), std::move(Orbit)); + auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues), std::move(Orbit)); if (CompileRequestSpan) { CompileRequestSpan.EndError(issues.ToOneLineString()); } - + ctx.Send(Owner, responseEv.Release()); Die(ctx); } @@ -341,17 +341,17 @@ private: IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit, + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit, NWilson::TTraceId traceId) { return new TKqpCompileRequestActor( - owner, - userToken, - uid, - std::move(query), - keepInCache, - deadline, - dbCounters, + owner, + userToken, + uid, + std::move(query), + keepInCache, + deadline, + dbCounters, std::move(orbit), std::move(traceId)); } diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp index cc11fa520c2..efba6222ffd 100644 --- a/ydb/core/kqp/kqp_compile_service.cpp +++ b/ydb/core/kqp/kqp_compile_service.cpp @@ -189,7 +189,7 @@ private: struct TKqpCompileRequest { TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache, - const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, + const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}) : Sender(sender) , Query(std::move(query)) @@ -411,8 +411,8 @@ private: void Handle(TEvKqp::TEvCompileRequest::TPtr& ev, const TActorContext& ctx) { const auto& query = ev->Get()->Query; - LWTRACK(KqpCompileServiceHandleRequest, - ev->Get()->Orbit, + LWTRACK(KqpCompileServiceHandleRequest, + ev->Get()->Orbit, query ? query->UserSid : 0); try { @@ -503,13 +503,13 @@ private: Counters->ReportQueryCacheHit(dbCounters, false); - LWTRACK(KqpCompileServiceEnqueued, - ev->Get()->Orbit, + LWTRACK(KqpCompileServiceEnqueued, + ev->Get()->Orbit, ev->Get()->Query ? ev->Get()->Query->UserSid : 0); - + TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), - request.KeepInCache, request.UserToken, request.Deadline, dbCounters, + request.KeepInCache, request.UserToken, request.Deadline, dbCounters, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -558,8 +558,8 @@ private: NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, - true, request.UserToken, request.Deadline, dbCounters, - ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), + true, request.UserToken, request.Deadline, dbCounters, + ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(CompileServiceSpan)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -733,9 +733,9 @@ private: const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) { const auto& query = compileResult->Query; - LWTRACK(KqpCompileServiceReply, - orbit, - query ? query->UserSid : 0, + LWTRACK(KqpCompileServiceReply, + orbit, + query ? query->UserSid : 0, compileResult->Issues.ToString()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Send response" @@ -754,7 +754,7 @@ private: if (span) { span.End(); } - + ctx.Send(sender, responseEv.Release()); } @@ -772,7 +772,7 @@ private: const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReplyError, orbit); - Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span)); + Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span)); } void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message, diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h index f9b3333bc29..0a34d68f845 100644 --- a/ydb/core/kqp/kqp_impl.h +++ b/ydb/core/kqp/kqp_impl.h @@ -55,7 +55,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId = {}); IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {}); struct TKqpWorkerSettings { diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 28ea418d34e..c37fab6f9bb 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -92,7 +92,6 @@ struct TKqpQueryState { NLWTrace::TOrbit Orbit; NWilson::TSpan KqpSessionSpan; - ETableReadType MaxReadType = ETableReadType::Other; TULID TxId; // User tx TString TxId_Human = ""; @@ -515,7 +514,6 @@ public: void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) { auto compileResult = ev->Get()->CompileResult; QueryState->Orbit = std::move(ev->Get()->Orbit); - QueryState->MaxReadType = compileResult->MaxReadType; YQL_ENSURE(compileResult); YQL_ENSURE(QueryState); @@ -1433,14 +1431,16 @@ public: Counters->ReportQueryLatency(Settings.DbCounters, QueryState->Request.GetAction(), queryDuration); - if (QueryState->MaxReadType == ETableReadType::FullScan) { + auto& stats = QueryState->Stats; + auto plan = SerializeAnalyzePlan(stats); + + auto maxReadType = ExtractMostHeavyReadType(plan); + if (maxReadType == ETableReadType::FullScan) { Counters->ReportQueryWithFullScan(Settings.DbCounters); - } else if (QueryState->MaxReadType == ETableReadType::Scan) { + } else if (maxReadType == ETableReadType::Scan) { Counters->ReportQueryWithRangeScan(Settings.DbCounters); } - auto& stats = QueryState->Stats; - ui32 affectedShardsCount = 0; ui64 readBytesCount = 0; ui64 readRowsCount = 0; diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h index be308c066e0..bc29a7e93d1 100644 --- a/ydb/core/kqp/kqp_worker_common.h +++ b/ydb/core/kqp/kqp_worker_common.h @@ -5,6 +5,7 @@ #include <util/datetime/base.h> #include <util/generic/string.h> + namespace NKikimr::NKqp { struct TSessionShutdownState { @@ -81,6 +82,12 @@ inline TIntrusivePtr<NYql::TKikimrConfiguration> CreateConfig(const TKqpSettings return cfg; } +enum ETableReadType { + Other = 0, + Scan = 1, + FullScan = 2, +}; + inline ETableReadType ExtractMostHeavyReadType(const TString& queryPlan) { ETableReadType maxReadType = ETableReadType::Other; |