diff options
author | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-05 10:13:53 +0300 |
---|---|---|
committer | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-05 10:13:53 +0300 |
commit | b56dba67800643d2ec54af77210f51485c17231d (patch) | |
tree | fe36bbb583cff175d561dd3ceab404f7f8ccd4f3 | |
parent | 7e04ccaa05215d096368b8807ea6ab05fe1da1da (diff) | |
download | ydb-b56dba67800643d2ec54af77210f51485c17231d.tar.gz |
Improve the coverage of the request's execution by LWTRACE in SessionActor
remove unused code
add executer route
add kqp_compile_service route
-rw-r--r-- | ydb/core/kqp/common/kqp_gateway.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_lwtrace_probes.h | 62 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_executer.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_executer_impl.h | 9 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_literal_executer.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/kqp.h | 20 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_request.cpp | 32 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_service.cpp | 74 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 39 |
12 files changed, 212 insertions, 54 deletions
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h index 7c826e2afc5..05eee6cb38e 100644 --- a/ydb/core/kqp/common/kqp_gateway.h +++ b/ydb/core/kqp/common/kqp_gateway.h @@ -9,6 +9,7 @@ #include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <library/cpp/actors/core/actorid.h> +#include <library/cpp/lwtrace/shuttle.h> namespace NKikimr { namespace NKqp { @@ -125,6 +126,8 @@ public: NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; TMaybe<NKikimrKqp::TRlPath> RlPath; bool NeedTxId = true; + + NLWTrace::TOrbit Orbit; }; struct TExecPhysicalResult : public TGenericResult { diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.h b/ydb/core/kqp/common/kqp_lwtrace_probes.h index 5a9394ba09e..e197c32bed3 100644 --- a/ydb/core/kqp/common/kqp_lwtrace_probes.h +++ b/ydb/core/kqp/common/kqp_lwtrace_probes.h @@ -23,26 +23,74 @@ struct TQueryAction { }; #define KQP_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ - PROBE(KqpQueryRequest, GROUPS("KQP"), \ + PROBE(KqpSessionQueryRequest, GROUPS("KQP"), \ TYPES(TString, TQueryType, TQueryAction, TString), \ NAMES("database", "type", "action", "query")) \ - PROBE(KqpQueryCompiled, GROUPS("KQP"), \ + PROBE(KqpSessionQueryCompiled, GROUPS("KQP"), \ TYPES(TString), \ NAMES("compileResultStatus")) \ - PROBE(KqpPhyQueryDefer, GROUPS("KQP"), \ + PROBE(KqpSessionPhyQueryDefer, GROUPS("KQP"), \ TYPES(ui64), \ NAMES("currentTx")) \ - PROBE(KqpPhyQueryProposeTx, GROUPS("KQP"), \ + PROBE(KqpSessionPhyQueryProposeTx, GROUPS("KQP"), \ TYPES(ui64, ui32, ui32, bool), \ NAMES("currentTx", "transactionsSize", "locksSize", "shouldAcquireLocks")) \ - PROBE(KqpPhyQueryTxResponse, GROUPS("KQP"), \ + PROBE(KqpSessionPhyQueryTxResponse, GROUPS("KQP"), \ TYPES(ui64, ui32), \ NAMES("currentTx", "resultsSize")) \ - PROBE(KqpQueryReplySuccess, GROUPS("KQP"), \ + PROBE(KqpSessionReplySuccess, GROUPS("KQP"), \ TYPES(ui64), \ NAMES("responseArenaSpaceUsed")) \ - PROBE(KqpQueryReplyError, GROUPS("KQP"), \ + PROBE(KqpSessionReplyError, GROUPS("KQP"), \ TYPES(TString), \ NAMES("errMsg")) \ + PROBE(KqpCompileRequestBootstrap, GROUPS("KQP"), \ + TYPES(TString, ui64), \ + NAMES("userSid", "queryHash")) \ + PROBE(KqpCompileRequestHandleServiceReply, GROUPS("KQP"), \ + TYPES(TString, ui64), \ + NAMES("userSid", "queryHash")) \ + PROBE(KqpCompileServiceHandleRequest, GROUPS("KQP"), \ + TYPES(TString, ui64), \ + NAMES("userSid", "queryHash")) \ + PROBE(KqpCompileServiceEnqueued, GROUPS("KQP"), \ + TYPES(TString, ui64), \ + NAMES("userSid", "queryHash")) \ + PROBE(KqpCompileServiceGetCompilation, GROUPS("KQP"), \ + TYPES(TString, ui64, TString), \ + NAMES("userSid", "queryHash", "compileActor")) \ + PROBE(KqpCompileServiceReply, GROUPS("KQP"), \ + TYPES(TString, ui64, TString), \ + NAMES("userSid", "queryHash", "compileResult")) \ + PROBE(KqpBaseExecuterHandleReady, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("TxId")) \ + PROBE(KqpBaseExecuterReplyErrorAndDie, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("TxId")) \ + PROBE(KqpDataExecuterStartExecute, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("TxId")) \ + PROBE(KqpDataExecuterStartTasksAndTxs, GROUPS("KQP"), \ + TYPES(ui64, ui64, ui64), \ + NAMES("TxId", "tasksSize", "TxsSize")) \ + PROBE(KqpDataExecuterFinalize, GROUPS("KQP"), \ + TYPES(ui64, ui64, ui64, ui64), \ + NAMES("TxId", "lastCompletedShard", "ResultRows", "resultSize")) \ + PROBE(KqpDataExecuterReplyErrorAndDie, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("TxId")) \ + PROBE(KqpScanExecutorReplyErrorAndDie, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("TxId")) \ + PROBE(KqpScanExecutorFinalize, GROUPS("KQP"), \ + TYPES(ui64, ui64, TString, ui64), \ + NAMES("TxId", "lastCompletedTask", "lastCompletedComputeActor", "ResultsSize")) \ + PROBE(KqpLiteralExecuterReplyErrorAndDie, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("TxId")) \ + PROBE(KqpLiteralExecuterFinalize, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("TxId")) \ /**/ LWTRACE_DECLARE_PROVIDER(KQP_PROVIDER) diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 957e7f5de56..3cd2c8ff25d 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -603,6 +603,8 @@ private: void HandleExecute(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) { TEvDataShard::TEvProposeTransactionResult* res = ev->Get(); const ui64 shardId = res->GetOrigin(); + LastShard = shardId; + TShardState* shardState = ShardStates.FindPtr(shardId); YQL_ENSURE(shardState); @@ -1272,6 +1274,7 @@ private: } void Execute() { + LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); RequestControls.Reqister(TlsActivationContext->AsActorContext()); ReadOnlyTx = true; @@ -1644,6 +1647,8 @@ private: LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem()); } + LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), datashardTxs.size()); + // first, start compute tasks TVector<ui64> computeTaskIds{Reserve(computeTasks.size())}; for (auto&& taskDesc : computeTasks) { @@ -1767,6 +1772,8 @@ private: return; } + LWTRACK(KqpDataExecuterFinalize, ResponseEv->Orbit, TxId, LastShard, response.GetResult().ResultsSize(), response.ByteSize()); + LOG_D("Sending response to: " << Target << ", results: " << Results.size()); Send(Target, ResponseEv.release()); PassAway(); @@ -1881,6 +1888,7 @@ private: // Lock handle for a newly acquired lock TLockHandle LockHandle; + ui64 LastShard = 0; }; } // namespace diff --git a/ydb/core/kqp/executer/kqp_executer.h b/ydb/core/kqp/executer/kqp_executer.h index 11ed8320270..d508753fc91 100644 --- a/ydb/core/kqp/executer/kqp_executer.h +++ b/ydb/core/kqp/executer/kqp_executer.h @@ -1,5 +1,6 @@ #pragma once +#include <library/cpp/lwtrace/shuttle.h> #include <ydb/core/kqp/common/kqp_common.h> #include <ydb/core/kqp/common/kqp_gateway.h> #include <ydb/core/kqp/counters/kqp_counters.h> @@ -19,6 +20,8 @@ struct TEvKqpExecuter { { NLongTxService::TLockHandle LockHandle; + NLWTrace::TOrbit Orbit; + bool IsSerializable() const override { // We cannot serialize LockHandle, should always send locally return false; diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index 1f4bb0ca315..49754c6b8a5 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -6,6 +6,7 @@ #include "kqp_table_resolver.h" #include <ydb/core/kqp/common/kqp_ru_calc.h> +#include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> @@ -30,6 +31,8 @@ #include <util/generic/size_literals.h> +LWTRACE_USING(KQP_PROVIDER); + namespace NKikimr { namespace NKqp { @@ -85,6 +88,7 @@ public: , Counters(counters) { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(); + ResponseEv->Orbit = std::move(Request.Orbit); Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); } @@ -124,6 +128,8 @@ protected: TxId = ev->Get()->Record.GetRequest().GetTxId(); Target = ActorIdFromProto(ev->Get()->Record.GetTarget()); + LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId); + LOG_D("Report self actorId " << this->SelfId() << " to " << Target); auto progressEv = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>(); ActorIdToProto(this->SelfId(), progressEv->Record.MutableExecuterActorId()); @@ -597,6 +603,9 @@ protected: } } + LWTRACK(KqpBaseExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId); + ResponseEv->Orbit = std::move(ResponseEv->Orbit); + this->Send(Target, ResponseEv.release()); this->PassAway(); } diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp index a2b71317a78..3a4935f859b 100644 --- a/ydb/core/kqp/executer/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp @@ -68,6 +68,7 @@ public: , Counters(counters) { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(); + ResponseEv->Orbit = std::move(Request.Orbit); Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); } @@ -346,6 +347,8 @@ private: } } + LWTRACK(KqpLiteralExecuterFinalize, ResponseEv->Orbit, TxId); + LOG_D("Sending response to: " << Target << ", results: " << Results.size()); Send(Target, ResponseEv.release()); } @@ -408,6 +411,9 @@ private: response.SetStatus(status); response.MutableIssues()->Swap(issues); + LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId); + ResponseEv->Orbit = std::move(ResponseEv->Orbit); + Send(Target, ResponseEv.release()); PassAway(); } diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 169e4aa469b..8a9d1ae957f 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -225,6 +225,9 @@ private: Stats->AddComputeActorStats(computeActor.NodeId(), std::move(*state.MutableStats())); } + LastTaskId = taskId; + LastComputeActorId = computeActor.ToString(); + auto it = PendingComputeActors.find(computeActor); if (it == PendingComputeActors.end()) { LOG_W("Got execution state for compute actor: " << computeActor @@ -870,6 +873,8 @@ private: } } + LWTRACK(KqpScanExecutorFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, Results.size()); + LOG_D("Sending response to: " << Target); Send(Target, ResponseEv.release()); PassAway(); @@ -1002,6 +1007,9 @@ private: THashSet<ui64> PendingComputeTasks; // Not started yet, waiting resources TMap<ui64, ui64> ShardIdToNodeId; TMap<ui64, TVector<ui64>> ShardsOnNode; + + ui64 LastTaskId = 0; + TString LastComputeActorId = ""; }; } // namespace diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h index 6de02925e0d..3652f129c7d 100644 --- a/ydb/core/kqp/kqp.h +++ b/ydb/core/kqp/kqp.h @@ -1,6 +1,7 @@ #pragma once #include "kqp_query_replay.h" +#include <library/cpp/lwtrace/shuttle.h> #include <ydb/core/kqp/common/kqp_common.h> #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/provider/yql_kikimr_query_traits.h> @@ -367,13 +368,14 @@ struct TEvKqp { struct TEvCompileRequest : public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> { TEvCompileRequest(const TString& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query, - bool keepInCache, TInstant deadline, TKqpDbCountersPtr dbCounters) + bool keepInCache, TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}) : UserToken(userToken) , Uid(uid) , Query(std::move(query)) , KeepInCache(keepInCache) , Deadline(deadline) , DbCounters(dbCounters) + , Orbit(std::move(orbit)) { Y_ENSURE(Uid.Defined() != Query.Defined()); } @@ -386,16 +388,19 @@ struct TEvKqp { TInstant Deadline; TKqpDbCountersPtr DbCounters; TMaybe<bool> DocumentApiRestricted; + + NLWTrace::TOrbit Orbit; }; struct TEvRecompileRequest : public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> { TEvRecompileRequest(const TString& userToken, const TString& uid, const TMaybe<TKqpQueryId>& query, - TInstant deadline, TKqpDbCountersPtr dbCounters) + TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}) : UserToken(userToken) , Uid(uid) , Query(query) , Deadline(deadline) - , DbCounters(dbCounters) {} + , DbCounters(dbCounters) + , Orbit(std::move(orbit)) {} TString UserToken; TString Uid; @@ -403,11 +408,14 @@ struct TEvKqp { TInstant Deadline; TKqpDbCountersPtr DbCounters; + + NLWTrace::TOrbit Orbit; }; struct TEvCompileResponse : public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> { - TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult) - : CompileResult(compileResult) {} + TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {}) + : CompileResult(compileResult) + , Orbit(std::move(orbit)) {} TKqpCompileResult::TConstPtr CompileResult; NKqpProto::TKqpStatsCompile Stats; @@ -415,6 +423,8 @@ struct TEvKqp { ui32 ForceNewEnginePercent = 0; ui32 ForceNewEngineLevel = 0; + + NLWTrace::TOrbit Orbit; }; struct TEvCompileInvalidateRequest : public TEventLocal<TEvCompileInvalidateRequest, diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp index 76e85a27e0f..3bc7e0fed0d 100644 --- a/ydb/core/kqp/kqp_compile_request.cpp +++ b/ydb/core/kqp/kqp_compile_request.cpp @@ -7,8 +7,12 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <ydb/core/kqp/common/kqp_lwtrace_probes.h> + #include <util/string/escape.h> +LWTRACE_USING(KQP_PROVIDER); + namespace NKikimr { namespace NKqp { @@ -24,16 +28,22 @@ public: } TKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters) + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit) : Owner(owner) , UserToken(userToken) , Uid(uid) , Query(std::move(query)) , KeepInCache(keepInCache) , Deadline(deadline) - , DbCounters(dbCounters) {} - + , DbCounters(dbCounters) + , Orbit{std::move(orbit)} {} + void Bootstrap(const TActorContext& ctx) { + LWTRACK(KqpCompileRequestBootstrap, + Orbit, + Query ? Query->UserSid : 0, + Query ? Query->GetHash() : 0); + TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(), new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup())); @@ -41,13 +51,19 @@ public: std::swap(Query, query); auto compileEv = MakeHolder<TEvKqp::TEvCompileRequest>(UserToken, Uid, std::move(query), - KeepInCache, Deadline, DbCounters); + KeepInCache, Deadline, DbCounters, std::move(Orbit)); ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release()); Become(&TKqpCompileRequestActor::MainState); } void Handle(TEvKqp::TEvCompileResponse::TPtr& ev, const TActorContext &ctx) { + const auto& query = ev->Get()->CompileResult->Query; + LWTRACK(KqpCompileRequestHandleServiceReply, + ev->Get()->Orbit, + query ? query->UserSid : 0, + query ? query->GetHash() : 0); + auto compileResult = ev->Get()->CompileResult; const auto& stats = ev->Get()->Stats; @@ -280,7 +296,7 @@ private: } void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) { - auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues)); + auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues), std::move(Orbit)); ctx.Send(Owner, responseEv.Release()); Die(ctx); } @@ -296,13 +312,15 @@ private: TActorId TimeoutTimerId; THashMap<TTableId, ui64> TableVersions; THolder<TEvKqp::TEvCompileResponse> DeferredResponse; + + NLWTrace::TOrbit Orbit; }; IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters) + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit) { - return new TKqpCompileRequestActor(owner, userToken, uid, std::move(query), keepInCache, deadline, dbCounters); + return new TKqpCompileRequestActor(owner, userToken, uid, std::move(query), keepInCache, deadline, dbCounters, std::move(orbit)); } } // namespace NKqp diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp index 480c5ee6ef2..a992fec7ea2 100644 --- a/ydb/core/kqp/kqp_compile_service.cpp +++ b/ydb/core/kqp/kqp_compile_service.cpp @@ -1,11 +1,13 @@ #include "kqp_impl.h" #include "kqp_query_replay.h" + #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> #include <ydb/core/cms/console/console.h> #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/library/aclib/aclib.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -14,6 +16,8 @@ #include <util/string/escape.h> +LWTRACE_USING(KQP_PROVIDER); + namespace NKikimr { namespace NKqp { @@ -183,14 +187,15 @@ private: struct TKqpCompileRequest { TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache, - const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters) + const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}) : Sender(sender) , Query(std::move(query)) , Uid(uid) , KeepInCache(keepInCache) , UserToken(userToken) , Deadline(deadline) - , DbCounters(dbCounters) {} + , DbCounters(dbCounters) + , Orbit(std::move(orbit)) {} TActorId Sender; TKqpQueryId Query; @@ -200,6 +205,8 @@ struct TKqpCompileRequest { TInstant Deadline; TKqpDbCountersPtr DbCounters; TActorId CompileActor; + + NLWTrace::TOrbit Orbit; }; class TKqpRequestsQueue { @@ -398,12 +405,18 @@ private: } void Handle(TEvKqp::TEvCompileRequest::TPtr& ev, const TActorContext& ctx) { + const auto& query = ev->Get()->Query; + LWTRACK(KqpCompileServiceHandleRequest, + ev->Get()->Orbit, + query ? query->UserSid : 0, + query ? query->GetHash() : 0); + try { PerformRequest(ev, ctx); } catch (const std::exception& e) { LogException("TEvCompileRequest", ev->Sender, e, ctx); - ReplyInternalError(ev->Sender, "", e.what(), ctx); + ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit)); } } @@ -436,7 +449,7 @@ private: << ", queryUid: " << *request.Uid); - ReplyFromCache(ev->Sender, compileResult, ctx); + ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit)); return; } else { LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" @@ -454,7 +467,7 @@ private: << ", queryUid: " << *request.Uid); NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid); - ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx); + ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit)); return; } @@ -477,14 +490,20 @@ private: << ", sender: " << ev->Sender << ", queryUid: " << compileResult->Uid); - ReplyFromCache(ev->Sender, compileResult, ctx); + ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit)); return; } Counters->ReportQueryCacheHit(dbCounters, false); + LWTRACK(KqpCompileServiceEnqueued, + ev->Get()->Orbit, + ev->Get()->Query ? ev->Get()->Query->UserSid : 0, + ev->Get()->Query ? ev->Get()->Query->GetHash() : 0); + + TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), - request.KeepInCache, request.UserToken, request.Deadline, dbCounters); + request.KeepInCache, request.UserToken, request.Deadline, dbCounters, std::move(ev->Get()->Orbit)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); @@ -495,7 +514,7 @@ private: NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Exceeded maximum number of requests in compile service queue."); - ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx); + ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit)); return; } @@ -512,7 +531,7 @@ private: } catch (const std::exception& e) { LogException("TEvRecompileRequest", ev->Sender, e, ctx); - ReplyInternalError(ev->Sender, "", e.what(), ctx); + ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit)); } } @@ -541,7 +560,7 @@ private: NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Exceeded maximum number of requests in compile service queue."); - ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx); + ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit)); return; } } else { @@ -550,7 +569,7 @@ private: << ", queryUid: " << request.Uid); NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << request.Uid); - ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx); + ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit)); return; } @@ -593,7 +612,8 @@ private: auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query); for (auto& request : requests) { - Reply(request.Sender, compileResult, compileStats, ctx); + LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, request.Query.GetHash(), compileActorId.ToString()); + Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit)); } } else { if (QueryCache.FindByUid(compileResult->Uid, false)) { @@ -601,11 +621,12 @@ private: } } - Reply(compileRequest.Sender, compileResult, compileStats, ctx); + LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileRequest.Query.GetHash(), compileActorId.ToString()); + Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit)); } catch (const std::exception& e) { LogException("TEvCompileResponse", ev->Sender, e, ctx); - ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx); + ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit)); } ProcessQueue(ctx); @@ -663,7 +684,7 @@ private: Counters->ReportCompileRequestTimeout(request->DbCounters); NYql::TIssue issue(NYql::TPosition(), "Compilation timed out."); - ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx); + ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit)); } else { StartCompilation(std::move(*request), ctx); } @@ -695,14 +716,21 @@ private: } void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult, - const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx) + const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit) { + const auto& query = compileResult->Query; + LWTRACK(KqpCompileServiceReply, + orbit, + query ? query->UserSid : 0, + query ? query->GetHash() : 0, + compileResult->Issues.ToString()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Send response" << ", sender: " << sender << ", queryUid: " << compileResult->Uid << ", status:" << compileResult->Status); - auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult); + auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit)); responseEv->Stats.CopyFrom(compileStats); if (responseEv->CompileResult && responseEv->CompileResult->PreparedQueryNewEngine) { @@ -714,27 +742,27 @@ private: } void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult, - const TActorContext& ctx) + const TActorContext& ctx, NLWTrace::TOrbit orbit) { NKqpProto::TKqpStatsCompile stats; stats.SetFromCache(true); - Reply(sender, compileResult, stats, ctx); + Reply(sender, compileResult, stats, ctx, std::move(orbit)); } void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status, - const TIssues& issues, const TActorContext& ctx) + const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit) { - Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx); + Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit)); } void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message, - const TActorContext& ctx) + const TActorContext& ctx, NLWTrace::TOrbit orbit) { NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation."); issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message)); - ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx); + ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit)); } static void LogException(const TString& scope, const TActorId& sender, const std::exception& e, diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h index f5240f5e81f..8adae6b1d8d 100644 --- a/ydb/core/kqp/kqp_impl.h +++ b/ydb/core/kqp/kqp_impl.h @@ -55,7 +55,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine); IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters); + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}); struct TKqpWorkerSettings { TString Cluster; diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 884da540e46..46c12550eb0 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -312,9 +312,12 @@ public: YQL_ENSURE(queryRequest.HasAction()); auto action = queryRequest.GetAction(); - LWTRACK(KqpQueryRequest, QueryState->Orbit, queryRequest.GetDatabase(), - queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED, - action, queryRequest.GetQuery()); + LWTRACK(KqpSessionQueryRequest, + QueryState->Orbit, + queryRequest.GetDatabase(), + queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED, + action, + queryRequest.GetQuery()); LOG_D(requestInfo << "Received request," << " selfId : " << SelfId() << " proxyRequestId: " << proxyRequestId @@ -422,8 +425,11 @@ public: } auto compileRequestActor = CreateKqpCompileRequestActor(SelfId(), QueryState->UserToken, uid, - std::move(query), keepInCache, compileDeadline, Settings.DbCounters); + std::move(query), keepInCache, compileDeadline, Settings.DbCounters, + QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit()); + TlsActivationContext->ExecutorThread.RegisterActor(compileRequestActor); + Become(&TKqpSessionActor::CompileState); } @@ -433,11 +439,12 @@ public: void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) { auto compileResult = ev->Get()->CompileResult; + QueryState->Orbit = std::move(ev->Get()->Orbit); YQL_ENSURE(compileResult); YQL_ENSURE(QueryState); - LWTRACK(KqpQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status); + LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status); if (compileResult->Status != Ydb::StatusIds::SUCCESS) { ReplyQueryCompileError(compileResult); @@ -898,7 +905,7 @@ public: << "Failed to mix queries with old- and new- engines"; } if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { - LWTRACK(KqpPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); + LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); ++QueryState->CurrentTx; tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx)); @@ -980,16 +987,24 @@ public: request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); } - LWTRACK(KqpPhyQueryProposeTx, QueryState->Orbit, QueryState->CurrentTx, request.Transactions.size(), - request.Locks.size(), request.AcquireLocksTxId.Defined()); + LWTRACK(KqpSessionPhyQueryProposeTx, + QueryState->Orbit, + QueryState->CurrentTx, + request.Transactions.size(), + request.Locks.size(), + request.AcquireLocksTxId.Defined()); SendToExecuter(std::move(request)); return false; } void SendToExecuter(IKqpGateway::TExecPhysicalRequest&& request) { + if (QueryState) { + request.Orbit = std::move(QueryState->Orbit); + } auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, (QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(), RequestCounters); + ExecuterId = TlsActivationContext->ExecutorThread.RegisterActor(executerActor); LOG_D("Created new KQP executer: " << ExecuterId); @@ -1038,6 +1053,8 @@ public: } void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { + QueryState->Orbit = std::move(ev->Get()->Orbit); + auto* response = ev->Get()->Record.MutableResponse(); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); LOG_D(SelfId() << " " << requestInfo << " TEvTxResponse, CurrentTx: " << QueryState->CurrentTx @@ -1067,7 +1084,7 @@ public: } YQL_ENSURE(QueryState); - LWTRACK(KqpPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize()); + LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize()); auto& txResult = *response->MutableResult(); QueryState->QueryCtx->TxResults.emplace_back(ExtractTxResults(txResult)); @@ -1471,9 +1488,9 @@ public: } if (status == Ydb::StatusIds::SUCCESS) { - LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, record.GetArena()->SpaceUsed()); + LWTRACK(KqpSessionReplySuccess, QueryState->Orbit, record.GetArena() ? record.GetArena()->SpaceUsed() : 0); } else { - LWTRACK(KqpQueryReplyError, QueryState->Orbit, TStringBuilder() << status); + LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status); } Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId); LOG_D(requestInfo << "Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId |