aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <mdartemenko@yandex-team.com>2022-09-12 15:31:50 +0300
committermdartemenko <mdartemenko@yandex-team.com>2022-09-12 15:31:50 +0300
commite6355abc70da5262b2ac0b696e3354bbcfeafc5e (patch)
tree5251a4ee42ad8c92ac74915952a2b04006f5936b
parent71cbed4fa838d7e7a4bebf535d71d382af05113c (diff)
downloadydb-e6355abc70da5262b2ac0b696e3354bbcfeafc5e.tar.gz
move scan/full_scan fetch from reply in session_actor to compile_actor
-rw-r--r--ydb/core/kqp/kqp.h27
-rw-r--r--ydb/core/kqp/kqp_compile_actor.cpp8
-rw-r--r--ydb/core/kqp/kqp_compile_request.cpp34
-rw-r--r--ydb/core/kqp/kqp_compile_service.cpp28
-rw-r--r--ydb/core/kqp/kqp_impl.h2
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp12
-rw-r--r--ydb/core/kqp/kqp_worker_common.h7
7 files changed, 63 insertions, 55 deletions
diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h
index 3652f129c7..9168308a31 100644
--- a/ydb/core/kqp/kqp.h
+++ b/ydb/core/kqp/kqp.h
@@ -16,6 +16,12 @@
namespace NKikimr {
namespace NKqp {
+enum class ETableReadType {
+ Other = 0,
+ Scan = 1,
+ FullScan = 2,
+};
+
const TStringBuf DefaultKikimrPublicClusterName = "db";
inline NActors::TActorId MakeKqpProxyID(ui32 nodeId) {
@@ -174,27 +180,30 @@ struct TKqpCompileResult {
using TConstPtr = std::shared_ptr<const TKqpCompileResult>;
TKqpCompileResult(const TString& uid, TKqpQueryId&& query, const Ydb::StatusIds::StatusCode& status,
- const NYql::TIssues& issues)
+ const NYql::TIssues& issues, ETableReadType maxReadType)
: Status(status)
, Issues(issues)
, Query(std::move(query))
- , Uid(uid) {}
+ , Uid(uid)
+ , MaxReadType(maxReadType) {}
- TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues)
+ TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues,
+ ETableReadType maxReadType)
: Status(status)
, Issues(issues)
- , Uid(uid) {}
+ , Uid(uid)
+ , MaxReadType(maxReadType) {}
static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, TKqpQueryId&& query,
- const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues)
+ const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues, ETableReadType maxReadType)
{
- return std::make_shared<TKqpCompileResult>(uid, std::move(query), status, issues);
+ return std::make_shared<TKqpCompileResult>(uid, std::move(query), status, issues, maxReadType);
}
static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status,
- const NYql::TIssues& issues)
+ const NYql::TIssues& issues, ETableReadType maxReadType)
{
- return std::make_shared<TKqpCompileResult>(uid, status, issues);
+ return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType);
}
Ydb::StatusIds::StatusCode Status;
@@ -203,6 +212,8 @@ struct TKqpCompileResult {
TMaybe<TKqpQueryId> Query;
TString Uid;
+ ETableReadType MaxReadType;
+
TPreparedQueryConstPtr PreparedQuery;
TPreparedQueryConstPtr PreparedQueryNewEngine;
std::optional<TQueryTraits> QueryTraits;
diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp
index 44f2a035f5..e571dc5a6b 100644
--- a/ydb/core/kqp/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/kqp_compile_actor.cpp
@@ -1,6 +1,8 @@
#include "kqp_impl.h"
#include "kqp_metadata_loader.h"
+#include "kqp_worker_common.h"
+
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/wilson.h>
@@ -230,7 +232,7 @@ private:
}
void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) {
- Reply(TKqpCompileResult::Make(Uid, std::move(Query), status, issues), ctx);
+ Reply(TKqpCompileResult::Make(Uid, std::move(Query), status, issues, ETableReadType::Other), ctx);
}
void InternalError(const TString message, const TActorContext &ctx) {
@@ -273,7 +275,9 @@ private:
AddMessageToReplayLog(kqpResult.QueryPlan);
}
- KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(Query), status, kqpResult.Issues());
+ ETableReadType maxReadType = ExtractMostHeavyReadType(kqpResult.QueryPlan);
+
+ KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(Query), status, kqpResult.Issues(), maxReadType);
if (status == Ydb::StatusIds::SUCCESS) {
YQL_ENSURE(kqpResult.PreparingQuery);
diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp
index a06a1a43c0..75ff3733c0 100644
--- a/ydb/core/kqp/kqp_compile_request.cpp
+++ b/ydb/core/kqp/kqp_compile_request.cpp
@@ -41,10 +41,10 @@ public:
, DbCounters(dbCounters)
, Orbit{std::move(orbit)}
, CompileRequestSpan(TWilsonKqp::CompileRequest, std::move(traceId), "CompileRequest") {}
-
+
void Bootstrap(const TActorContext& ctx) {
- LWTRACK(KqpCompileRequestBootstrap,
- Orbit,
+ LWTRACK(KqpCompileRequestBootstrap,
+ Orbit,
Query ? Query->UserSid : 0);
TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(),
@@ -62,10 +62,10 @@ public:
void Handle(TEvKqp::TEvCompileResponse::TPtr& ev, const TActorContext &ctx) {
const auto& query = ev->Get()->CompileResult->Query;
- LWTRACK(KqpCompileRequestHandleServiceReply,
- ev->Get()->Orbit,
+ LWTRACK(KqpCompileRequestHandleServiceReply,
+ ev->Get()->Orbit,
query ? query->UserSid : 0);
-
+
auto compileResult = ev->Get()->CompileResult;
const auto& stats = ev->Get()->Stats;
@@ -81,7 +81,7 @@ public:
}
if (!NavigateTables(*compileResult->PreparedQuery, compileResult->Query->Database, ctx)) {
-
+
if (CompileRequestSpan) {
CompileRequestSpan.End();
}
@@ -313,12 +313,12 @@ private:
}
void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) {
- auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues), std::move(Orbit));
+ auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues, ETableReadType::Other), std::move(Orbit));
if (CompileRequestSpan) {
CompileRequestSpan.EndError(issues.ToOneLineString());
}
-
+
ctx.Send(Owner, responseEv.Release());
Die(ctx);
}
@@ -341,17 +341,17 @@ private:
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit,
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit,
NWilson::TTraceId traceId)
{
return new TKqpCompileRequestActor(
- owner,
- userToken,
- uid,
- std::move(query),
- keepInCache,
- deadline,
- dbCounters,
+ owner,
+ userToken,
+ uid,
+ std::move(query),
+ keepInCache,
+ deadline,
+ dbCounters,
std::move(orbit),
std::move(traceId));
}
diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp
index efba6222ff..cc11fa520c 100644
--- a/ydb/core/kqp/kqp_compile_service.cpp
+++ b/ydb/core/kqp/kqp_compile_service.cpp
@@ -189,7 +189,7 @@ private:
struct TKqpCompileRequest {
TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache,
- const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
+ const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {})
: Sender(sender)
, Query(std::move(query))
@@ -411,8 +411,8 @@ private:
void Handle(TEvKqp::TEvCompileRequest::TPtr& ev, const TActorContext& ctx) {
const auto& query = ev->Get()->Query;
- LWTRACK(KqpCompileServiceHandleRequest,
- ev->Get()->Orbit,
+ LWTRACK(KqpCompileServiceHandleRequest,
+ ev->Get()->Orbit,
query ? query->UserSid : 0);
try {
@@ -503,13 +503,13 @@ private:
Counters->ReportQueryCacheHit(dbCounters, false);
- LWTRACK(KqpCompileServiceEnqueued,
- ev->Get()->Orbit,
+ LWTRACK(KqpCompileServiceEnqueued,
+ ev->Get()->Orbit,
ev->Get()->Query ? ev->Get()->Query->UserSid : 0);
-
+
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
- request.KeepInCache, request.UserToken, request.Deadline, dbCounters,
+ request.KeepInCache, request.UserToken, request.Deadline, dbCounters,
std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
@@ -558,8 +558,8 @@ private:
NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
- true, request.UserToken, request.Deadline, dbCounters,
- ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
+ true, request.UserToken, request.Deadline, dbCounters,
+ ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(CompileServiceSpan));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
@@ -733,9 +733,9 @@ private:
const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
const auto& query = compileResult->Query;
- LWTRACK(KqpCompileServiceReply,
- orbit,
- query ? query->UserSid : 0,
+ LWTRACK(KqpCompileServiceReply,
+ orbit,
+ query ? query->UserSid : 0,
compileResult->Issues.ToString());
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Send response"
@@ -754,7 +754,7 @@ private:
if (span) {
span.End();
}
-
+
ctx.Send(sender, responseEv.Release());
}
@@ -772,7 +772,7 @@ private:
const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReplyError, orbit);
- Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span));
+ Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span));
}
void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h
index 0a34d68f84..f9b3333bc2 100644
--- a/ydb/core/kqp/kqp_impl.h
+++ b/ydb/core/kqp/kqp_impl.h
@@ -55,7 +55,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId = {});
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {});
struct TKqpWorkerSettings {
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index ae5e3cdabf..abdb105a5c 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -92,6 +92,7 @@ struct TKqpQueryState {
NLWTrace::TOrbit Orbit;
NWilson::TSpan KqpSessionSpan;
+ ETableReadType MaxReadType = ETableReadType::Other;
TULID TxId; // User tx
TString TxId_Human = "";
@@ -496,6 +497,7 @@ public:
void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) {
auto compileResult = ev->Get()->CompileResult;
QueryState->Orbit = std::move(ev->Get()->Orbit);
+ QueryState->MaxReadType = compileResult->MaxReadType;
YQL_ENSURE(compileResult);
YQL_ENSURE(QueryState);
@@ -1396,16 +1398,14 @@ public:
Counters->ReportQueryLatency(Settings.DbCounters, QueryState->Request.GetAction(), queryDuration);
- auto& stats = QueryState->Stats;
- auto plan = SerializeAnalyzePlan(stats);
-
- auto maxReadType = ExtractMostHeavyReadType(plan);
- if (maxReadType == ETableReadType::FullScan) {
+ if (QueryState->MaxReadType == ETableReadType::FullScan) {
Counters->ReportQueryWithFullScan(Settings.DbCounters);
- } else if (maxReadType == ETableReadType::Scan) {
+ } else if (QueryState->MaxReadType == ETableReadType::Scan) {
Counters->ReportQueryWithRangeScan(Settings.DbCounters);
}
+ auto& stats = QueryState->Stats;
+
ui32 affectedShardsCount = 0;
ui64 readBytesCount = 0;
ui64 readRowsCount = 0;
diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h
index bc29a7e93d..be308c066e 100644
--- a/ydb/core/kqp/kqp_worker_common.h
+++ b/ydb/core/kqp/kqp_worker_common.h
@@ -5,7 +5,6 @@
#include <util/datetime/base.h>
#include <util/generic/string.h>
-
namespace NKikimr::NKqp {
struct TSessionShutdownState {
@@ -82,12 +81,6 @@ inline TIntrusivePtr<NYql::TKikimrConfiguration> CreateConfig(const TKqpSettings
return cfg;
}
-enum ETableReadType {
- Other = 0,
- Scan = 1,
- FullScan = 2,
-};
-
inline ETableReadType ExtractMostHeavyReadType(const TString& queryPlan) {
ETableReadType maxReadType = ETableReadType::Other;