aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <mdartemenko@yandex-team.com>2022-08-05 10:13:53 +0300
committermdartemenko <mdartemenko@yandex-team.com>2022-08-05 10:13:53 +0300
commitb56dba67800643d2ec54af77210f51485c17231d (patch)
treefe36bbb583cff175d561dd3ceab404f7f8ccd4f3
parent7e04ccaa05215d096368b8807ea6ab05fe1da1da (diff)
downloadydb-b56dba67800643d2ec54af77210f51485c17231d.tar.gz
Improve the coverage of the request's execution by LWTRACE in SessionActor
remove unused code add executer route add kqp_compile_service route
-rw-r--r--ydb/core/kqp/common/kqp_gateway.h3
-rw-r--r--ydb/core/kqp/common/kqp_lwtrace_probes.h62
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp8
-rw-r--r--ydb/core/kqp/executer/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h9
-rw-r--r--ydb/core/kqp/executer/kqp_literal_executer.cpp6
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp8
-rw-r--r--ydb/core/kqp/kqp.h20
-rw-r--r--ydb/core/kqp/kqp_compile_request.cpp32
-rw-r--r--ydb/core/kqp/kqp_compile_service.cpp74
-rw-r--r--ydb/core/kqp/kqp_impl.h2
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp39
12 files changed, 212 insertions, 54 deletions
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h
index 7c826e2afc5..05eee6cb38e 100644
--- a/ydb/core/kqp/common/kqp_gateway.h
+++ b/ydb/core/kqp/common/kqp_gateway.h
@@ -9,6 +9,7 @@
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/lwtrace/shuttle.h>
namespace NKikimr {
namespace NKqp {
@@ -125,6 +126,8 @@ public:
NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
TMaybe<NKikimrKqp::TRlPath> RlPath;
bool NeedTxId = true;
+
+ NLWTrace::TOrbit Orbit;
};
struct TExecPhysicalResult : public TGenericResult {
diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.h b/ydb/core/kqp/common/kqp_lwtrace_probes.h
index 5a9394ba09e..e197c32bed3 100644
--- a/ydb/core/kqp/common/kqp_lwtrace_probes.h
+++ b/ydb/core/kqp/common/kqp_lwtrace_probes.h
@@ -23,26 +23,74 @@ struct TQueryAction {
};
#define KQP_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \
- PROBE(KqpQueryRequest, GROUPS("KQP"), \
+ PROBE(KqpSessionQueryRequest, GROUPS("KQP"), \
TYPES(TString, TQueryType, TQueryAction, TString), \
NAMES("database", "type", "action", "query")) \
- PROBE(KqpQueryCompiled, GROUPS("KQP"), \
+ PROBE(KqpSessionQueryCompiled, GROUPS("KQP"), \
TYPES(TString), \
NAMES("compileResultStatus")) \
- PROBE(KqpPhyQueryDefer, GROUPS("KQP"), \
+ PROBE(KqpSessionPhyQueryDefer, GROUPS("KQP"), \
TYPES(ui64), \
NAMES("currentTx")) \
- PROBE(KqpPhyQueryProposeTx, GROUPS("KQP"), \
+ PROBE(KqpSessionPhyQueryProposeTx, GROUPS("KQP"), \
TYPES(ui64, ui32, ui32, bool), \
NAMES("currentTx", "transactionsSize", "locksSize", "shouldAcquireLocks")) \
- PROBE(KqpPhyQueryTxResponse, GROUPS("KQP"), \
+ PROBE(KqpSessionPhyQueryTxResponse, GROUPS("KQP"), \
TYPES(ui64, ui32), \
NAMES("currentTx", "resultsSize")) \
- PROBE(KqpQueryReplySuccess, GROUPS("KQP"), \
+ PROBE(KqpSessionReplySuccess, GROUPS("KQP"), \
TYPES(ui64), \
NAMES("responseArenaSpaceUsed")) \
- PROBE(KqpQueryReplyError, GROUPS("KQP"), \
+ PROBE(KqpSessionReplyError, GROUPS("KQP"), \
TYPES(TString), \
NAMES("errMsg")) \
+ PROBE(KqpCompileRequestBootstrap, GROUPS("KQP"), \
+ TYPES(TString, ui64), \
+ NAMES("userSid", "queryHash")) \
+ PROBE(KqpCompileRequestHandleServiceReply, GROUPS("KQP"), \
+ TYPES(TString, ui64), \
+ NAMES("userSid", "queryHash")) \
+ PROBE(KqpCompileServiceHandleRequest, GROUPS("KQP"), \
+ TYPES(TString, ui64), \
+ NAMES("userSid", "queryHash")) \
+ PROBE(KqpCompileServiceEnqueued, GROUPS("KQP"), \
+ TYPES(TString, ui64), \
+ NAMES("userSid", "queryHash")) \
+ PROBE(KqpCompileServiceGetCompilation, GROUPS("KQP"), \
+ TYPES(TString, ui64, TString), \
+ NAMES("userSid", "queryHash", "compileActor")) \
+ PROBE(KqpCompileServiceReply, GROUPS("KQP"), \
+ TYPES(TString, ui64, TString), \
+ NAMES("userSid", "queryHash", "compileResult")) \
+ PROBE(KqpBaseExecuterHandleReady, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("TxId")) \
+ PROBE(KqpBaseExecuterReplyErrorAndDie, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("TxId")) \
+ PROBE(KqpDataExecuterStartExecute, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("TxId")) \
+ PROBE(KqpDataExecuterStartTasksAndTxs, GROUPS("KQP"), \
+ TYPES(ui64, ui64, ui64), \
+ NAMES("TxId", "tasksSize", "TxsSize")) \
+ PROBE(KqpDataExecuterFinalize, GROUPS("KQP"), \
+ TYPES(ui64, ui64, ui64, ui64), \
+ NAMES("TxId", "lastCompletedShard", "ResultRows", "resultSize")) \
+ PROBE(KqpDataExecuterReplyErrorAndDie, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("TxId")) \
+ PROBE(KqpScanExecutorReplyErrorAndDie, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("TxId")) \
+ PROBE(KqpScanExecutorFinalize, GROUPS("KQP"), \
+ TYPES(ui64, ui64, TString, ui64), \
+ NAMES("TxId", "lastCompletedTask", "lastCompletedComputeActor", "ResultsSize")) \
+ PROBE(KqpLiteralExecuterReplyErrorAndDie, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("TxId")) \
+ PROBE(KqpLiteralExecuterFinalize, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("TxId")) \
/**/
LWTRACE_DECLARE_PROVIDER(KQP_PROVIDER)
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 957e7f5de56..3cd2c8ff25d 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -603,6 +603,8 @@ private:
void HandleExecute(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) {
TEvDataShard::TEvProposeTransactionResult* res = ev->Get();
const ui64 shardId = res->GetOrigin();
+ LastShard = shardId;
+
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState);
@@ -1272,6 +1274,7 @@ private:
}
void Execute() {
+ LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
RequestControls.Reqister(TlsActivationContext->AsActorContext());
ReadOnlyTx = true;
@@ -1644,6 +1647,8 @@ private:
LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
}
+ LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), datashardTxs.size());
+
// first, start compute tasks
TVector<ui64> computeTaskIds{Reserve(computeTasks.size())};
for (auto&& taskDesc : computeTasks) {
@@ -1767,6 +1772,8 @@ private:
return;
}
+ LWTRACK(KqpDataExecuterFinalize, ResponseEv->Orbit, TxId, LastShard, response.GetResult().ResultsSize(), response.ByteSize());
+
LOG_D("Sending response to: " << Target << ", results: " << Results.size());
Send(Target, ResponseEv.release());
PassAway();
@@ -1881,6 +1888,7 @@ private:
// Lock handle for a newly acquired lock
TLockHandle LockHandle;
+ ui64 LastShard = 0;
};
} // namespace
diff --git a/ydb/core/kqp/executer/kqp_executer.h b/ydb/core/kqp/executer/kqp_executer.h
index 11ed8320270..d508753fc91 100644
--- a/ydb/core/kqp/executer/kqp_executer.h
+++ b/ydb/core/kqp/executer/kqp_executer.h
@@ -1,5 +1,6 @@
#pragma once
+#include <library/cpp/lwtrace/shuttle.h>
#include <ydb/core/kqp/common/kqp_common.h>
#include <ydb/core/kqp/common/kqp_gateway.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -19,6 +20,8 @@ struct TEvKqpExecuter {
{
NLongTxService::TLockHandle LockHandle;
+ NLWTrace::TOrbit Orbit;
+
bool IsSerializable() const override {
// We cannot serialize LockHandle, should always send locally
return false;
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index 1f4bb0ca315..49754c6b8a5 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -6,6 +6,7 @@
#include "kqp_table_resolver.h"
#include <ydb/core/kqp/common/kqp_ru_calc.h>
+#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
@@ -30,6 +31,8 @@
#include <util/generic/size_literals.h>
+LWTRACE_USING(KQP_PROVIDER);
+
namespace NKikimr {
namespace NKqp {
@@ -85,6 +88,7 @@ public:
, Counters(counters)
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>();
+ ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
}
@@ -124,6 +128,8 @@ protected:
TxId = ev->Get()->Record.GetRequest().GetTxId();
Target = ActorIdFromProto(ev->Get()->Record.GetTarget());
+ LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);
+
LOG_D("Report self actorId " << this->SelfId() << " to " << Target);
auto progressEv = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
ActorIdToProto(this->SelfId(), progressEv->Record.MutableExecuterActorId());
@@ -597,6 +603,9 @@ protected:
}
}
+ LWTRACK(KqpBaseExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
+ ResponseEv->Orbit = std::move(ResponseEv->Orbit);
+
this->Send(Target, ResponseEv.release());
this->PassAway();
}
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp
index a2b71317a78..3a4935f859b 100644
--- a/ydb/core/kqp/executer/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp
@@ -68,6 +68,7 @@ public:
, Counters(counters)
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>();
+ ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
}
@@ -346,6 +347,8 @@ private:
}
}
+ LWTRACK(KqpLiteralExecuterFinalize, ResponseEv->Orbit, TxId);
+
LOG_D("Sending response to: " << Target << ", results: " << Results.size());
Send(Target, ResponseEv.release());
}
@@ -408,6 +411,9 @@ private:
response.SetStatus(status);
response.MutableIssues()->Swap(issues);
+ LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
+ ResponseEv->Orbit = std::move(ResponseEv->Orbit);
+
Send(Target, ResponseEv.release());
PassAway();
}
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 169e4aa469b..8a9d1ae957f 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -225,6 +225,9 @@ private:
Stats->AddComputeActorStats(computeActor.NodeId(), std::move(*state.MutableStats()));
}
+ LastTaskId = taskId;
+ LastComputeActorId = computeActor.ToString();
+
auto it = PendingComputeActors.find(computeActor);
if (it == PendingComputeActors.end()) {
LOG_W("Got execution state for compute actor: " << computeActor
@@ -870,6 +873,8 @@ private:
}
}
+ LWTRACK(KqpScanExecutorFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, Results.size());
+
LOG_D("Sending response to: " << Target);
Send(Target, ResponseEv.release());
PassAway();
@@ -1002,6 +1007,9 @@ private:
THashSet<ui64> PendingComputeTasks; // Not started yet, waiting resources
TMap<ui64, ui64> ShardIdToNodeId;
TMap<ui64, TVector<ui64>> ShardsOnNode;
+
+ ui64 LastTaskId = 0;
+ TString LastComputeActorId = "";
};
} // namespace
diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h
index 6de02925e0d..3652f129c7d 100644
--- a/ydb/core/kqp/kqp.h
+++ b/ydb/core/kqp/kqp.h
@@ -1,6 +1,7 @@
#pragma once
#include "kqp_query_replay.h"
+#include <library/cpp/lwtrace/shuttle.h>
#include <ydb/core/kqp/common/kqp_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/provider/yql_kikimr_query_traits.h>
@@ -367,13 +368,14 @@ struct TEvKqp {
struct TEvCompileRequest : public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TString& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query,
- bool keepInCache, TInstant deadline, TKqpDbCountersPtr dbCounters)
+ bool keepInCache, TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
: UserToken(userToken)
, Uid(uid)
, Query(std::move(query))
, KeepInCache(keepInCache)
, Deadline(deadline)
, DbCounters(dbCounters)
+ , Orbit(std::move(orbit))
{
Y_ENSURE(Uid.Defined() != Query.Defined());
}
@@ -386,16 +388,19 @@ struct TEvKqp {
TInstant Deadline;
TKqpDbCountersPtr DbCounters;
TMaybe<bool> DocumentApiRestricted;
+
+ NLWTrace::TOrbit Orbit;
};
struct TEvRecompileRequest : public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
TEvRecompileRequest(const TString& userToken, const TString& uid, const TMaybe<TKqpQueryId>& query,
- TInstant deadline, TKqpDbCountersPtr dbCounters)
+ TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
: UserToken(userToken)
, Uid(uid)
, Query(query)
, Deadline(deadline)
- , DbCounters(dbCounters) {}
+ , DbCounters(dbCounters)
+ , Orbit(std::move(orbit)) {}
TString UserToken;
TString Uid;
@@ -403,11 +408,14 @@ struct TEvKqp {
TInstant Deadline;
TKqpDbCountersPtr DbCounters;
+
+ NLWTrace::TOrbit Orbit;
};
struct TEvCompileResponse : public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
- TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult)
- : CompileResult(compileResult) {}
+ TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {})
+ : CompileResult(compileResult)
+ , Orbit(std::move(orbit)) {}
TKqpCompileResult::TConstPtr CompileResult;
NKqpProto::TKqpStatsCompile Stats;
@@ -415,6 +423,8 @@ struct TEvKqp {
ui32 ForceNewEnginePercent = 0;
ui32 ForceNewEngineLevel = 0;
+
+ NLWTrace::TOrbit Orbit;
};
struct TEvCompileInvalidateRequest : public TEventLocal<TEvCompileInvalidateRequest,
diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp
index 76e85a27e0f..3bc7e0fed0d 100644
--- a/ydb/core/kqp/kqp_compile_request.cpp
+++ b/ydb/core/kqp/kqp_compile_request.cpp
@@ -7,8 +7,12 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
+
#include <util/string/escape.h>
+LWTRACE_USING(KQP_PROVIDER);
+
namespace NKikimr {
namespace NKqp {
@@ -24,16 +28,22 @@ public:
}
TKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters)
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit)
: Owner(owner)
, UserToken(userToken)
, Uid(uid)
, Query(std::move(query))
, KeepInCache(keepInCache)
, Deadline(deadline)
- , DbCounters(dbCounters) {}
-
+ , DbCounters(dbCounters)
+ , Orbit{std::move(orbit)} {}
+
void Bootstrap(const TActorContext& ctx) {
+ LWTRACK(KqpCompileRequestBootstrap,
+ Orbit,
+ Query ? Query->UserSid : 0,
+ Query ? Query->GetHash() : 0);
+
TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(),
new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup()));
@@ -41,13 +51,19 @@ public:
std::swap(Query, query);
auto compileEv = MakeHolder<TEvKqp::TEvCompileRequest>(UserToken, Uid, std::move(query),
- KeepInCache, Deadline, DbCounters);
+ KeepInCache, Deadline, DbCounters, std::move(Orbit));
ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release());
Become(&TKqpCompileRequestActor::MainState);
}
void Handle(TEvKqp::TEvCompileResponse::TPtr& ev, const TActorContext &ctx) {
+ const auto& query = ev->Get()->CompileResult->Query;
+ LWTRACK(KqpCompileRequestHandleServiceReply,
+ ev->Get()->Orbit,
+ query ? query->UserSid : 0,
+ query ? query->GetHash() : 0);
+
auto compileResult = ev->Get()->CompileResult;
const auto& stats = ev->Get()->Stats;
@@ -280,7 +296,7 @@ private:
}
void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) {
- auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues));
+ auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues), std::move(Orbit));
ctx.Send(Owner, responseEv.Release());
Die(ctx);
}
@@ -296,13 +312,15 @@ private:
TActorId TimeoutTimerId;
THashMap<TTableId, ui64> TableVersions;
THolder<TEvKqp::TEvCompileResponse> DeferredResponse;
+
+ NLWTrace::TOrbit Orbit;
};
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters)
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit)
{
- return new TKqpCompileRequestActor(owner, userToken, uid, std::move(query), keepInCache, deadline, dbCounters);
+ return new TKqpCompileRequestActor(owner, userToken, uid, std::move(query), keepInCache, deadline, dbCounters, std::move(orbit));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp
index 480c5ee6ef2..a992fec7ea2 100644
--- a/ydb/core/kqp/kqp_compile_service.cpp
+++ b/ydb/core/kqp/kqp_compile_service.cpp
@@ -1,11 +1,13 @@
#include "kqp_impl.h"
#include "kqp_query_replay.h"
+
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/library/aclib/aclib.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -14,6 +16,8 @@
#include <util/string/escape.h>
+LWTRACE_USING(KQP_PROVIDER);
+
namespace NKikimr {
namespace NKqp {
@@ -183,14 +187,15 @@ private:
struct TKqpCompileRequest {
TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache,
- const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters)
+ const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
: Sender(sender)
, Query(std::move(query))
, Uid(uid)
, KeepInCache(keepInCache)
, UserToken(userToken)
, Deadline(deadline)
- , DbCounters(dbCounters) {}
+ , DbCounters(dbCounters)
+ , Orbit(std::move(orbit)) {}
TActorId Sender;
TKqpQueryId Query;
@@ -200,6 +205,8 @@ struct TKqpCompileRequest {
TInstant Deadline;
TKqpDbCountersPtr DbCounters;
TActorId CompileActor;
+
+ NLWTrace::TOrbit Orbit;
};
class TKqpRequestsQueue {
@@ -398,12 +405,18 @@ private:
}
void Handle(TEvKqp::TEvCompileRequest::TPtr& ev, const TActorContext& ctx) {
+ const auto& query = ev->Get()->Query;
+ LWTRACK(KqpCompileServiceHandleRequest,
+ ev->Get()->Orbit,
+ query ? query->UserSid : 0,
+ query ? query->GetHash() : 0);
+
try {
PerformRequest(ev, ctx);
}
catch (const std::exception& e) {
LogException("TEvCompileRequest", ev->Sender, e, ctx);
- ReplyInternalError(ev->Sender, "", e.what(), ctx);
+ ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit));
}
}
@@ -436,7 +449,7 @@ private:
<< ", queryUid: " << *request.Uid);
- ReplyFromCache(ev->Sender, compileResult, ctx);
+ ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit));
return;
} else {
LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
@@ -454,7 +467,7 @@ private:
<< ", queryUid: " << *request.Uid);
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid);
- ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx);
+ ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit));
return;
}
@@ -477,14 +490,20 @@ private:
<< ", sender: " << ev->Sender
<< ", queryUid: " << compileResult->Uid);
- ReplyFromCache(ev->Sender, compileResult, ctx);
+ ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit));
return;
}
Counters->ReportQueryCacheHit(dbCounters, false);
+ LWTRACK(KqpCompileServiceEnqueued,
+ ev->Get()->Orbit,
+ ev->Get()->Query ? ev->Get()->Query->UserSid : 0,
+ ev->Get()->Query ? ev->Get()->Query->GetHash() : 0);
+
+
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
- request.KeepInCache, request.UserToken, request.Deadline, dbCounters);
+ request.KeepInCache, request.UserToken, request.Deadline, dbCounters, std::move(ev->Get()->Orbit));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);
@@ -495,7 +514,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() <<
"Exceeded maximum number of requests in compile service queue.");
- ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx);
+ ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit));
return;
}
@@ -512,7 +531,7 @@ private:
}
catch (const std::exception& e) {
LogException("TEvRecompileRequest", ev->Sender, e, ctx);
- ReplyInternalError(ev->Sender, "", e.what(), ctx);
+ ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit));
}
}
@@ -541,7 +560,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() <<
"Exceeded maximum number of requests in compile service queue.");
- ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx);
+ ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit));
return;
}
} else {
@@ -550,7 +569,7 @@ private:
<< ", queryUid: " << request.Uid);
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << request.Uid);
- ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx);
+ ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit));
return;
}
@@ -593,7 +612,8 @@ private:
auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query);
for (auto& request : requests) {
- Reply(request.Sender, compileResult, compileStats, ctx);
+ LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, request.Query.GetHash(), compileActorId.ToString());
+ Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit));
}
} else {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
@@ -601,11 +621,12 @@ private:
}
}
- Reply(compileRequest.Sender, compileResult, compileStats, ctx);
+ LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileRequest.Query.GetHash(), compileActorId.ToString());
+ Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit));
}
catch (const std::exception& e) {
LogException("TEvCompileResponse", ev->Sender, e, ctx);
- ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx);
+ ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit));
}
ProcessQueue(ctx);
@@ -663,7 +684,7 @@ private:
Counters->ReportCompileRequestTimeout(request->DbCounters);
NYql::TIssue issue(NYql::TPosition(), "Compilation timed out.");
- ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx);
+ ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit));
} else {
StartCompilation(std::move(*request), ctx);
}
@@ -695,14 +716,21 @@ private:
}
void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
- const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx)
+ const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit)
{
+ const auto& query = compileResult->Query;
+ LWTRACK(KqpCompileServiceReply,
+ orbit,
+ query ? query->UserSid : 0,
+ query ? query->GetHash() : 0,
+ compileResult->Issues.ToString());
+
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Send response"
<< ", sender: " << sender
<< ", queryUid: " << compileResult->Uid
<< ", status:" << compileResult->Status);
- auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult);
+ auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit));
responseEv->Stats.CopyFrom(compileStats);
if (responseEv->CompileResult && responseEv->CompileResult->PreparedQueryNewEngine) {
@@ -714,27 +742,27 @@ private:
}
void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
- const TActorContext& ctx)
+ const TActorContext& ctx, NLWTrace::TOrbit orbit)
{
NKqpProto::TKqpStatsCompile stats;
stats.SetFromCache(true);
- Reply(sender, compileResult, stats, ctx);
+ Reply(sender, compileResult, stats, ctx, std::move(orbit));
}
void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status,
- const TIssues& issues, const TActorContext& ctx)
+ const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit)
{
- Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx);
+ Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit));
}
void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,
- const TActorContext& ctx)
+ const TActorContext& ctx, NLWTrace::TOrbit orbit)
{
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation.");
issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message));
- ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx);
+ ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit));
}
static void LogException(const TString& scope, const TActorId& sender, const std::exception& e,
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h
index f5240f5e81f..8adae6b1d8d 100644
--- a/ydb/core/kqp/kqp_impl.h
+++ b/ydb/core/kqp/kqp_impl.h
@@ -55,7 +55,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine);
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters);
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {});
struct TKqpWorkerSettings {
TString Cluster;
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 884da540e46..46c12550eb0 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -312,9 +312,12 @@ public:
YQL_ENSURE(queryRequest.HasAction());
auto action = queryRequest.GetAction();
- LWTRACK(KqpQueryRequest, QueryState->Orbit, queryRequest.GetDatabase(),
- queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED,
- action, queryRequest.GetQuery());
+ LWTRACK(KqpSessionQueryRequest,
+ QueryState->Orbit,
+ queryRequest.GetDatabase(),
+ queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED,
+ action,
+ queryRequest.GetQuery());
LOG_D(requestInfo << "Received request,"
<< " selfId : " << SelfId()
<< " proxyRequestId: " << proxyRequestId
@@ -422,8 +425,11 @@ public:
}
auto compileRequestActor = CreateKqpCompileRequestActor(SelfId(), QueryState->UserToken, uid,
- std::move(query), keepInCache, compileDeadline, Settings.DbCounters);
+ std::move(query), keepInCache, compileDeadline, Settings.DbCounters,
+ QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit());
+
TlsActivationContext->ExecutorThread.RegisterActor(compileRequestActor);
+
Become(&TKqpSessionActor::CompileState);
}
@@ -433,11 +439,12 @@ public:
void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) {
auto compileResult = ev->Get()->CompileResult;
+ QueryState->Orbit = std::move(ev->Get()->Orbit);
YQL_ENSURE(compileResult);
YQL_ENSURE(QueryState);
- LWTRACK(KqpQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status);
+ LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status);
if (compileResult->Status != Ydb::StatusIds::SUCCESS) {
ReplyQueryCompileError(compileResult);
@@ -898,7 +905,7 @@ public:
<< "Failed to mix queries with old- and new- engines";
}
if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
- LWTRACK(KqpPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
+ LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
++QueryState->CurrentTx;
tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery,
&phyQuery.GetTransactions(QueryState->CurrentTx));
@@ -980,16 +987,24 @@ public:
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
}
- LWTRACK(KqpPhyQueryProposeTx, QueryState->Orbit, QueryState->CurrentTx, request.Transactions.size(),
- request.Locks.size(), request.AcquireLocksTxId.Defined());
+ LWTRACK(KqpSessionPhyQueryProposeTx,
+ QueryState->Orbit,
+ QueryState->CurrentTx,
+ request.Transactions.size(),
+ request.Locks.size(),
+ request.AcquireLocksTxId.Defined());
SendToExecuter(std::move(request));
return false;
}
void SendToExecuter(IKqpGateway::TExecPhysicalRequest&& request) {
+ if (QueryState) {
+ request.Orbit = std::move(QueryState->Orbit);
+ }
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
(QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(),
RequestCounters);
+
ExecuterId = TlsActivationContext->ExecutorThread.RegisterActor(executerActor);
LOG_D("Created new KQP executer: " << ExecuterId);
@@ -1038,6 +1053,8 @@ public:
}
void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
+ QueryState->Orbit = std::move(ev->Get()->Orbit);
+
auto* response = ev->Get()->Record.MutableResponse();
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
LOG_D(SelfId() << " " << requestInfo << " TEvTxResponse, CurrentTx: " << QueryState->CurrentTx
@@ -1067,7 +1084,7 @@ public:
}
YQL_ENSURE(QueryState);
- LWTRACK(KqpPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize());
+ LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize());
auto& txResult = *response->MutableResult();
QueryState->QueryCtx->TxResults.emplace_back(ExtractTxResults(txResult));
@@ -1471,9 +1488,9 @@ public:
}
if (status == Ydb::StatusIds::SUCCESS) {
- LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, record.GetArena()->SpaceUsed());
+ LWTRACK(KqpSessionReplySuccess, QueryState->Orbit, record.GetArena() ? record.GetArena()->SpaceUsed() : 0);
} else {
- LWTRACK(KqpQueryReplyError, QueryState->Orbit, TStringBuilder() << status);
+ LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status);
}
Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId);
LOG_D(requestInfo << "Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId