aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <mdartemenko@yandex-team.com>2022-08-26 17:27:23 +0300
committermdartemenko <mdartemenko@yandex-team.com>2022-08-26 17:27:23 +0300
commit5ed0649ded71642e99fbfad64f95ccf0b8dd18fb (patch)
treee3b0f67baa07e87e52799649df544062c826af61
parentc0ec5c1dde9b0d33aba0d5e4e942fc0c64d99b30 (diff)
downloadydb-5ed0649ded71642e99fbfad64f95ccf0b8dd18fb.tar.gz
Add support of Wilson tracing lib for kqp execution
add tracing to scan executer add tracing to data and literal executer add local traces add scan executer coverage
-rw-r--r--library/cpp/actors/wilson/wilson_span.cpp4
-rw-r--r--ydb/core/base/wilson.h36
-rw-r--r--ydb/core/kqp/CMakeLists.txt2
-rw-r--r--ydb/core/kqp/common/kqp_gateway.h2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp10
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp10
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp51
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h39
-rw-r--r--ydb/core/kqp/executer/kqp_literal_executer.cpp23
-rw-r--r--ydb/core/kqp/executer/kqp_planner.cpp43
-rw-r--r--ydb/core/kqp/executer/kqp_planner.h6
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp23
-rw-r--r--ydb/core/kqp/kqp_compile_actor.cpp19
-rw-r--r--ydb/core/kqp/kqp_compile_request.cpp47
-rw-r--r--ydb/core/kqp/kqp_compile_service.cpp70
-rw-r--r--ydb/core/kqp/kqp_impl.h5
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp22
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard__propose_tx_base.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h32
22 files changed, 407 insertions, 68 deletions
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp
index c932560254d..dcd458be7c6 100644
--- a/library/cpp/actors/wilson/wilson_span.cpp
+++ b/library/cpp/actors/wilson/wilson_span.cpp
@@ -53,7 +53,9 @@ namespace NWilson {
}
void TSpan::Send() {
- TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
+ if (TlsActivationContext) {
+ TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
+ }
Data->Sent = true;
}
diff --git a/ydb/core/base/wilson.h b/ydb/core/base/wilson.h
index 5b62bd51c91..47dd6076988 100644
--- a/ydb/core/base/wilson.h
+++ b/ydb/core/base/wilson.h
@@ -11,4 +11,40 @@ namespace NKikimr {
};
};
+ struct TWilsonKqp {
+ enum {
+ KqpSession = 10,
+ CompileRequest = 9,
+ CompileService = 8,
+ CompileActor = 7,
+
+ KqpSessionCreateAndSendPropose = 9,
+
+ ExecuterReadyState = 9,
+ ExecuterWaitResolveState = 9,
+ ExecuterTableResolve = 9,
+ ExecuterZombieState = 9,
+
+ ComputeActor = 6,
+
+ LiteralExecuter = 9,
+ LiteralExecuterPrepareTasks = 9,
+ LiteralExecuterRunTasks = 9,
+
+ DataExecuter = 9,
+ DataExecuterPrepareState = 9,
+ DataExecuterPrepateTasks = 9,
+ DataExecuterExecuteState = 9,
+ DataExecuterSendTasksAndTxs = 9,
+ DataExecuterWaitSnapshotState = 9,
+ ProposeTransaction = 8,
+
+ ScanExecuter = 9,
+ ScanExecuterExecuteState = 9,
+ ScanExecuterPrepareTasks = 9,
+ KqpPlanner = 8,
+ KqpNodeSendTasks = 7,
+ };
+ };
+
} // NKikimr
diff --git a/ydb/core/kqp/CMakeLists.txt b/ydb/core/kqp/CMakeLists.txt
index 041ed5ad1ae..7d8bf5d77c3 100644
--- a/ydb/core/kqp/CMakeLists.txt
+++ b/ydb/core/kqp/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(ydb-core-kqp PUBLIC
cpp-actors-helpers
cpp-digest-md5
cpp-string_utils-base64
+ cpp-actors-wilson
ydb-core-actorlib_impl
ydb-core-base
core-client-minikql_compile
@@ -72,6 +73,7 @@ target_link_libraries(ydb-core-kqp.global PUBLIC
cpp-actors-helpers
cpp-digest-md5
cpp-string_utils-base64
+ cpp-actors-wilson
ydb-core-actorlib_impl
ydb-core-base
core-client-minikql_compile
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h
index 05eee6cb38e..f7d18b682dd 100644
--- a/ydb/core/kqp/common/kqp_gateway.h
+++ b/ydb/core/kqp/common/kqp_gateway.h
@@ -8,6 +8,7 @@
#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
+#include <library/cpp/actors/wilson/wilson_trace.h>
#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/lwtrace/shuttle.h>
@@ -128,6 +129,7 @@ public:
bool NeedTxId = true;
NLWTrace::TOrbit Orbit;
+ NWilson::TTraceId TraceId;
};
struct TExecPhysicalResult : public TGenericResult {
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index d9c57edc276..702918be8af 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -19,13 +19,14 @@ namespace NKqp {
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits);
+ const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
+ NWilson::TTraceId traceId = {});
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
- TIntrusivePtr<TKqpCounters> counters);
+ TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {});
namespace NComputeActor {
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 3c6d20918cc..aeb2cafe70a 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -35,8 +35,9 @@ public:
TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
- : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ NWilson::TTraceId traceId)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
{
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
@@ -305,10 +306,11 @@ private:
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ NWilson::TTraceId traceId)
{
return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits);
+ functionRegistry, settings, memoryLimits, std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 8152d361782..1351c6a35b7 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -103,8 +103,9 @@ public:
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
- : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters,
+ NWilson::TTraceId traceId)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
, Counters(counters)
@@ -1045,10 +1046,11 @@ private:
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters,
+ NWilson::TTraceId traceId)
{
return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits, counters);
+ functionRegistry, settings, memoryLimits, counters, std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 2b8438bafba..4728b59d004 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/client/minikql_compile/db_key_resolver.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
@@ -133,7 +134,7 @@ public:
TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
TKqpRequestCounters::TPtr counters)
- : TBase(std::move(request), database, userToken, counters)
+ : TBase(std::move(request), database, userToken, counters, TWilsonKqp::DataExecuter, "DataExecuter")
{
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
@@ -181,6 +182,10 @@ public:
return;
}
+ if (ExecuterTableResolveSpan) {
+ ExecuterTableResolveSpan.End();
+ }
+
Execute();
}
@@ -503,6 +508,11 @@ private:
LOG_D("All shards prepared, become ExecuteState.");
Become(&TKqpDataExecuter::ExecuteState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END);
+ }
+
ExecutePlanned();
}
@@ -1231,7 +1241,11 @@ private:
ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0);
}
- Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(ev, shardId, true));
+ auto traceId = ExecuterSpan.GetTraceId();
+
+ LOG_D("ExecuteDatashardTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));
+
+ Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(ev, shardId, true), 0, 0, std::move(traceId));
auto result = ShardStates.emplace(shardId, std::move(shardState));
YQL_ENSURE(result.second);
@@ -1263,7 +1277,7 @@ private:
return false;
};
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits);
+ auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits, ExecuterSpan.GetTraceId());
auto computeActorId = Register(computeActor);
task.ComputeActorId = computeActorId;
@@ -1274,6 +1288,7 @@ private:
}
void Execute() {
+ NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
RequestControls.Reqister(TlsActivationContext->AsActorContext());
@@ -1513,9 +1528,17 @@ private:
LOG_T("Create temporary mvcc snapshot, ebcome WaitSnapshotState");
Become(&TKqpDataExecuter::WaitSnapshotState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterWaitSnapshotState, ExecuterSpan.GetTraceId(), "WaitSnapshotState", NWilson::EFlags::AUTO_END);
+ }
+
return;
}
+ if (prepareTasksSpan) {
+ prepareTasksSpan.End();
+ }
ContinueExecute(computeTasks, datashardTxs);
}
@@ -1579,9 +1602,17 @@ private:
if (ImmediateTx) {
LOG_T("Immediate tx, become ExecuteState");
Become(&TKqpDataExecuter::ExecuteState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END);
+ }
} else {
LOG_T("Not immediate tx, become PrepareState");
Become(&TKqpDataExecuter::PrepareState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterPrepareState, ExecuterSpan.GetTraceId(), "PrepareState", NWilson::EFlags::AUTO_END);
+ }
}
}
@@ -1647,6 +1678,7 @@ private:
LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
}
+ NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), datashardTxs.size());
// first, start compute tasks
@@ -1705,6 +1737,10 @@ private:
ExecuteDatashardTransaction(shardId, shardTx, lockTxId);
}
+ if (sendTasksSpan) {
+ sendTasksSpan.End();
+ }
+
LOG_I("Total tasks: " << TasksGraph.GetTasks().size()
<< ", readonly: " << ReadOnlyTx
<< ", datashardTxs: " << datashardTxs.size()
@@ -1773,6 +1809,15 @@ private:
}
LWTRACK(KqpDataExecuterFinalize, ResponseEv->Orbit, TxId, LastShard, response.GetResult().ResultsSize(), response.ByteSize());
+
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = {};
+ }
+
+ if (ExecuterSpan) {
+ ExecuterSpan.EndOk();
+ }
LOG_D("Sending response to: " << Target << ", results: " << Results.size());
Send(Target, ResponseEv.release());
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index e1830b9f5b1..250299766d2 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -10,6 +10,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/base/kikimr_issue.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/kqp/executer/kqp_tasks_graph.h>
@@ -26,6 +27,7 @@
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
@@ -81,11 +83,12 @@ template <class TDerived, EExecType ExecType>
class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
public:
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
- TKqpRequestCounters::TPtr counters)
+ TKqpRequestCounters::TPtr counters, ui64 spanVerbosity = 0, TString spanName = "no_name")
: Request(std::move(request))
, Database(database)
, UserToken(userToken)
, Counters(counters)
+ , ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>();
ResponseEv->Orbit = std::move(Request.Orbit);
@@ -104,6 +107,7 @@ public:
LOG_T("Bootstrap done, become ReadyState");
this->Become(&TKqpExecuterBase::ReadyState);
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterReadyState, ExecuterSpan.GetTraceId(), "ReadyState", NWilson::EFlags::AUTO_END);
}
void ReportEventElapsedTime() {
@@ -166,6 +170,13 @@ protected:
LOG_T("Got request, become WaitResolveState");
this->Become(&TDerived::WaitResolveState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterWaitResolveState, ExecuterSpan.GetTraceId(), "WaitResolveState", NWilson::EFlags::AUTO_END);
+ }
+
+ ExecuterTableResolveSpan = NWilson::TSpan(TWilsonKqp::ExecuterTableResolve, ExecuterStateSpan.GetTraceId(), "ExecuterTableResolve", NWilson::EFlags::AUTO_END);
+
StartResolveTime = now;
if (Stats) {
@@ -184,6 +195,11 @@ protected:
InternalError(issues);
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
+
+ if (ExecuterSpan) {
+ ExecuterSpan.EndError("timeout");
+ }
+
this->Send(Target, abortEv.Release());
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, "timeout");
@@ -199,10 +215,20 @@ protected:
if (cancel) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::CANCELLED, "Request timeout exceeded");
+
+ if (ExecuterSpan) {
+ ExecuterSpan.EndError("timeout");
+ }
+
this->Send(Target, abortEv.Release());
CancelAtActor = {};
} else {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
+
+ if (ExecuterSpan) {
+ ExecuterSpan.EndError("timeout");
+ }
+
this->Send(Target, abortEv.Release());
DeadlineActor = {};
}
@@ -605,6 +631,10 @@ protected:
LWTRACK(KqpBaseExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
+ if (ExecuterSpan) {
+ ExecuterSpan.EndError(response.DebugString());
+ }
+
this->Send(Target, ResponseEv.release());
this->PassAway();
}
@@ -638,6 +668,10 @@ protected:
this->Send(this->SelfId(), new TEvents::TEvPoison);
LOG_T("Terminate, become ZombieState");
this->Become(&TKqpExecuterBase::ZombieState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterZombieState, ExecuterSpan.GetTraceId(), "ZombieState", NWilson::EFlags::AUTO_END);
+ }
} else {
IActor::PassAway();
}
@@ -686,6 +720,9 @@ protected:
TInstant LastResourceUsageUpdate;
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv;
+ NWilson::TSpan ExecuterSpan;
+ NWilson::TSpan ExecuterStateSpan;
+ NWilson::TSpan ExecuterTableResolveSpan;
private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
};
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp
index 5ca5367accd..0c663161f26 100644
--- a/ydb/core/kqp/executer/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp
@@ -9,6 +9,8 @@
#include <ydb/core/kqp/prepare/kqp_query_plan.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/core/base/wilson.h>
+
namespace NKikimr {
namespace NKqp {
@@ -66,6 +68,7 @@ public:
TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters)
: Request(std::move(request))
, Counters(counters)
+ , LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter")
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>();
ResponseEv->Orbit = std::move(Request.Orbit);
@@ -109,6 +112,7 @@ private:
}
void Handle(TEvKqpExecuter::TEvTxRequest::TPtr& ev) {
+ NWilson::TSpan prepareTasksSpan(TWilsonKqp::LiteralExecuterPrepareTasks, LiteralExecuterSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
if (Stats) {
Stats->StartTs = TInstant::Now();
}
@@ -171,6 +175,12 @@ private:
}
});
+ if (prepareTasksSpan) {
+ prepareTasksSpan.EndOk();
+ }
+
+ NWilson::TSpan runTasksSpan(TWilsonKqp::LiteralExecuterRunTasks, LiteralExecuterSpan.GetTraceId(), "RunTasks", NWilson::EFlags::AUTO_END);
+
// task runner settings
NMiniKQL::TKqpComputeContextBase computeCtx;
TDqTaskRunnerContext context = CreateTaskRunnerContext(&computeCtx, &alloc, &typeEnv);
@@ -190,6 +200,10 @@ private:
}
}
+ if (runTasksSpan) {
+ runTasksSpan.End();
+ }
+
Finalize(context, holderFactory);
PassAway();
}
@@ -349,6 +363,10 @@ private:
LWTRACK(KqpLiteralExecuterFinalize, ResponseEv->Orbit, TxId);
+ if (LiteralExecuterSpan) {
+ LiteralExecuterSpan.EndOk();
+ }
+
LOG_D("Sending response to: " << Target << ", results: " << Results.size());
Send(Target, ResponseEv.release());
}
@@ -413,6 +431,10 @@ private:
LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
+ if (LiteralExecuterSpan) {
+ LiteralExecuterSpan.EndError(response.DebugString());
+ }
+
Send(Target, ResponseEv.release());
PassAway();
}
@@ -438,6 +460,7 @@ private:
TVector<TIntrusivePtr<IDqTaskRunner>> TaskRunners;
std::unordered_map<ui64, ui32> TaskId2StageId;
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv;
+ NWilson::TSpan LiteralExecuterSpan;
};
} // anonymous namespace
diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp
index 313c8b0779b..a2258184358 100644
--- a/ydb/core/kqp/executer/kqp_planner.cpp
+++ b/ydb/core/kqp/executer/kqp_planner.cpp
@@ -4,6 +4,7 @@
#include "kqp_shards_resolver.h"
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/kqp/rm/kqp_rm.h>
#include <ydb/core/kqp/rm/kqp_resource_estimation.h>
@@ -26,7 +27,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath)
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId)
: TxId(txId)
, ExecuterId(executer)
, Tasks(std::move(tasks))
@@ -40,6 +41,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
, EnableLlvm(enableLlvm)
, WithSpilling(withSpilling)
, RlPath(rlPath)
+ , KqpPlannerSpan(TWilsonKqp::KqpPlanner, std::move(traceId), "KqpPlanner")
{
if (!Database) {
// a piece of magic for tests
@@ -135,6 +137,11 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
LOG_E("Not enough resources to execute query locally and no information about other nodes");
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
"Not enough resources to execute query locally and no information about other nodes");
+
+ if (KqpPlannerSpan) {
+ KqpPlannerSpan.EndError("Not enough resources to execute query locally and no information about other nodes");
+ }
+
Send(ExecuterId, ev.Release());
PassAway();
return;
@@ -154,6 +161,8 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto plan = planner->Plan(snapshot, std::move(est));
+ long requestsCnt = 0;
+
if (!plan.empty()) {
for (auto& group : plan) {
auto ev = PrepareKqpNodeRequest(group.TaskIds);
@@ -161,7 +170,8 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto target = MakeKqpNodeServiceID(group.NodeId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId())));
+ CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ ++requestsCnt;
}
TVector<ui64> nodes;
@@ -177,15 +187,26 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto target = MakeKqpNodeServiceID(nodeId);
LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId())));
+ CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ ++requestsCnt;
}
Y_VERIFY(ScanTasks.empty());
} else {
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
"Not enough resources to execute query");
+
+ if (KqpPlannerSpan) {
+ KqpPlannerSpan.EndError("Not enough resources to execute query");
+ }
+
Send(ExecuterId, ev.Release());
}
+ if (KqpPlannerSpan) {
+ KqpPlannerSpan.Attribute("RequestsCnt", requestsCnt);
+ KqpPlannerSpan.EndOk();
+ }
+
PassAway();
}
@@ -200,8 +221,8 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho
auto target = MakeKqpNodeServiceID(SelfId().NodeId());
LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
- TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery));
-
+ TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery, 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ long requestsCnt = 1;
TVector<ui64> nodes;
for (const auto& pair: ScanTasks) {
@@ -220,10 +241,16 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho
AddScansToKqpNodeRequest(ev, nodeId);
auto target = MakeKqpNodeServiceID(nodeId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId())));
+ CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ ++requestsCnt;
}
Y_VERIFY(ScanTasks.size() == 0);
+ if (KqpPlannerSpan) {
+ KqpPlannerSpan.Attribute("requestsCnt", requestsCnt);
+ KqpPlannerSpan.EndOk();
+ }
+
PassAway();
}
@@ -352,10 +379,10 @@ IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& token, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath)
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId)
{
return new TKqpPlanner(txId, executer, std::move(tasks), std::move(scanTasks), snapshot,
- database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath);
+ database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, std::move(traceId));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer/kqp_planner.h b/ydb/core/kqp/executer/kqp_planner.h
index ae1cb852c13..5c5cfeabb69 100644
--- a/ydb/core/kqp/executer/kqp_planner.h
+++ b/ydb/core/kqp/executer/kqp_planner.h
@@ -6,6 +6,7 @@
#include <ydb/core/kqp/rm/kqp_rm.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
@@ -35,7 +36,7 @@ public:
THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages,
- bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath);
+ bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId);
void Bootstrap(const TActorContext& ctx);
@@ -71,12 +72,13 @@ private:
const bool WithSpilling;
const TMaybe<NKikimrKqp::TRlPath> RlPath;
THashSet<ui32> TrackingNodes;
+ NWilson::TSpan KqpPlannerSpan;
};
IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath);
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId = {});
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 228b36899c3..da8e800f5e2 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -44,7 +44,7 @@ public:
TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters)
- : TBase(std::move(request), database, userToken, counters)
+ : TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter")
{
YQL_ENSURE(Request.Transactions.size() == 1);
YQL_ENSURE(Request.Locks.empty());
@@ -101,6 +101,10 @@ public:
}
}
+ if (ExecuterTableResolveSpan) {
+ ExecuterTableResolveSpan.End();
+ }
+
if (shardIds.size() > 0) {
LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds));
@@ -636,6 +640,8 @@ private:
NMiniKQL::TScopedAlloc alloc(TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators());
NMiniKQL::TTypeEnvironment typeEnv(alloc);
+ NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
+
NMiniKQL::TMemoryUsageInfo memInfo("PrepareTasks");
NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry);
@@ -812,6 +818,10 @@ private:
return;
}
+ if (prepareTasksSpan) {
+ prepareTasksSpan.End();
+ }
+
LOG_D("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: true"
<< ", " << nScanTasks << " scan tasks on " << scanTasks.size() << " nodes"
<< ", totalShardScans: " << nShardScans << ", execType: Scan"
@@ -820,6 +830,11 @@ private:
ExecuteScanTx(std::move(computeTasks), std::move(scanTasks));
Become(&TKqpScanExecuter::ExecuteState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ScanExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END);
+ }
+
}
void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks) {
@@ -838,7 +853,7 @@ private:
auto planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks),
std::move(scanTasks), Request.Snapshot,
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
- Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath);
+ Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan.GetTraceId());
RegisterWithSameMailbox(planner);
}
@@ -877,6 +892,10 @@ private:
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, Results.size());
+ if (ExecuterSpan) {
+ ExecuterSpan.EndOk();
+ }
+
LOG_D("Sending response to: " << Target);
Send(Target, ResponseEv.release());
PassAway();
diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp
index ee471926fb2..44f2a035f5e 100644
--- a/ydb/core/kqp/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/kqp_compile_actor.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/host/kqp_host.h>
@@ -10,6 +11,7 @@
#include <ydb/library/yql/utils/actor_log/log.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/json/json_writer.h>
#include <library/cpp/string_utils/base64/base64.h>
@@ -46,7 +48,7 @@ public:
TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken,
- TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine)
+ TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId)
: Owner(owner)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
@@ -57,6 +59,7 @@ public:
, Config(MakeIntrusive<TKikimrConfiguration>())
, CompilationTimeout(TDuration::MilliSeconds(serviceConfig.GetCompileTimeoutMs()))
, RecompileWithNewEngine(recompileWithNewEngine)
+ , CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
{
Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), Query.Cluster, kqpSettings->Settings, false);
@@ -74,6 +77,10 @@ public:
Counters->ReportCompileStart(DbCounters);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "traceId: verbosity = "
+ << std::to_string(CompileActorSpan.GetTraceId().GetVerbosity()) << ", trace_id = "
+ << std::to_string(CompileActorSpan.GetTraceId().GetTraceId()));
+
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Start compilation"
<< ", self: " << ctx.SelfID
<< ", cluster: " << Query.Cluster
@@ -215,6 +222,10 @@ private:
Counters->ReportCompileFinish(DbCounters);
+ if (CompileActorSpan) {
+ CompileActorSpan.End();
+ }
+
Die(ctx);
}
@@ -390,6 +401,8 @@ private:
TIntrusivePtr<IKqpHost::IAsyncQueryResult> AsyncCompileResult;
std::shared_ptr<TKqpCompileResult> KqpCompileResult;
std::optional<TString> ReplayMessage;
+
+ NWilson::TSpan CompileActorSpan;
};
void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) {
@@ -405,10 +418,10 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken,
- TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine)
+ TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId)
{
return new TKqpCompileActor(owner, kqpSettings, serviceConfig, moduleResolverState, counters, uid,
- std::move(query), userToken, dbCounters, recompileWithNewEngine);
+ std::move(query), userToken, dbCounters, recompileWithNewEngine, std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp
index 0d5f80ce409..a06a1a43c0d 100644
--- a/ydb/core/kqp/kqp_compile_request.cpp
+++ b/ydb/core/kqp/kqp_compile_request.cpp
@@ -4,10 +4,12 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
+#include <ydb/core/base/wilson.h>
#include <util/string/escape.h>
@@ -28,7 +30,8 @@ public:
}
TKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit)
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit,
+ NWilson::TTraceId traceId)
: Owner(owner)
, UserToken(userToken)
, Uid(uid)
@@ -36,7 +39,8 @@ public:
, KeepInCache(keepInCache)
, Deadline(deadline)
, DbCounters(dbCounters)
- , Orbit{std::move(orbit)} {}
+ , Orbit{std::move(orbit)}
+ , CompileRequestSpan(TWilsonKqp::CompileRequest, std::move(traceId), "CompileRequest") {}
void Bootstrap(const TActorContext& ctx) {
LWTRACK(KqpCompileRequestBootstrap,
@@ -51,7 +55,7 @@ public:
auto compileEv = MakeHolder<TEvKqp::TEvCompileRequest>(UserToken, Uid, std::move(query),
KeepInCache, Deadline, DbCounters, std::move(Orbit));
- ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release());
+ ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId());
Become(&TKqpCompileRequestActor::MainState);
}
@@ -66,12 +70,22 @@ public:
const auto& stats = ev->Get()->Stats;
if (compileResult->Status != Ydb::StatusIds::SUCCESS || !stats.GetFromCache()) {
+
+ if (CompileRequestSpan) {
+ CompileRequestSpan.End();
+ }
+
ctx.Send(Owner, ev->Release().Release());
Die(ctx);
return;
}
if (!NavigateTables(*compileResult->PreparedQuery, compileResult->Query->Database, ctx)) {
+
+ if (CompileRequestSpan) {
+ CompileRequestSpan.End();
+ }
+
ctx.Send(Owner, ev->Release().Release());
Die(ctx);
return;
@@ -82,6 +96,11 @@ public:
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext &ctx) {
if (ValidateTables(*ev->Get(), ctx)) {
+
+ if (CompileRequestSpan) {
+ CompileRequestSpan.EndOk();
+ }
+
ctx.Send(Owner, DeferredResponse.Release());
Die(ctx);
return;
@@ -95,7 +114,7 @@ public:
auto recompileEv = MakeHolder<TEvKqp::TEvRecompileRequest>(UserToken, compileResult.Uid, compileResult.Query,
Deadline, DbCounters, std::move(DeferredResponse->Orbit));
- ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release());
+ ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId());
DeferredResponse.Reset();
}
@@ -295,6 +314,11 @@ private:
void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) {
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues), std::move(Orbit));
+
+ if (CompileRequestSpan) {
+ CompileRequestSpan.EndError(issues.ToOneLineString());
+ }
+
ctx.Send(Owner, responseEv.Release());
Die(ctx);
}
@@ -312,13 +336,24 @@ private:
THolder<TEvKqp::TEvCompileResponse> DeferredResponse;
NLWTrace::TOrbit Orbit;
+ NWilson::TSpan CompileRequestSpan;
};
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit)
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit,
+ NWilson::TTraceId traceId)
{
- return new TKqpCompileRequestActor(owner, userToken, uid, std::move(query), keepInCache, deadline, dbCounters, std::move(orbit));
+ return new TKqpCompileRequestActor(
+ owner,
+ userToken,
+ uid,
+ std::move(query),
+ keepInCache,
+ deadline,
+ dbCounters,
+ std::move(orbit),
+ std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp
index 2c1ee8e2f78..efba6222ffd 100644
--- a/ydb/core/kqp/kqp_compile_service.cpp
+++ b/ydb/core/kqp/kqp_compile_service.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -11,6 +12,7 @@
#include <ydb/library/aclib/aclib.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/cache/cache.h>
@@ -187,7 +189,8 @@ private:
struct TKqpCompileRequest {
TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache,
- const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
+ const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
+ NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {})
: Sender(sender)
, Query(std::move(query))
, Uid(uid)
@@ -195,7 +198,8 @@ struct TKqpCompileRequest {
, UserToken(userToken)
, Deadline(deadline)
, DbCounters(dbCounters)
- , Orbit(std::move(orbit)) {}
+ , Orbit(std::move(orbit))
+ , CompileServiceSpan(std::move(span)) {}
TActorId Sender;
TKqpQueryId Query;
@@ -207,6 +211,7 @@ struct TKqpCompileRequest {
TActorId CompileActor;
NLWTrace::TOrbit Orbit;
+ NWilson::TSpan CompileServiceSpan;
};
class TKqpRequestsQueue {
@@ -415,13 +420,17 @@ private:
}
catch (const std::exception& e) {
LogException("TEvCompileRequest", ev->Sender, e, ctx);
- ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit));
+ ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {});
}
}
void PerformRequest(TEvKqp::TEvCompileRequest::TPtr& ev, const TActorContext& ctx) {
auto& request = *ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Perform request, TraceId.SpanIdPtr: " << ev->TraceId.GetSpanIdPtr());
+
+ NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, std::move(ev->TraceId), "CompileService");
+
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Received compile request"
<< ", sender: " << ev->Sender
<< ", queryUid: " << (request.Uid ? *request.Uid : "<empty>")
@@ -447,8 +456,7 @@ private:
<< ", sender: " << ev->Sender
<< ", queryUid: " << *request.Uid);
-
- ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit));
+ ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
} else {
LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
@@ -466,7 +474,7 @@ private:
<< ", queryUid: " << *request.Uid);
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid);
- ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit));
+ ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -489,7 +497,7 @@ private:
<< ", sender: " << ev->Sender
<< ", queryUid: " << compileResult->Uid);
- ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit));
+ ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -501,7 +509,8 @@ private:
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
- request.KeepInCache, request.UserToken, request.Deadline, dbCounters, std::move(ev->Get()->Orbit));
+ request.KeepInCache, request.UserToken, request.Deadline, dbCounters,
+ std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);
@@ -512,7 +521,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() <<
"Exceeded maximum number of requests in compile service queue.");
- ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit));
+ ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
}
@@ -529,7 +538,7 @@ private:
}
catch (const std::exception& e) {
LogException("TEvRecompileRequest", ev->Sender, e, ctx);
- ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit));
+ ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {});
}
}
@@ -546,8 +555,12 @@ private:
if (compileResult || request.Query) {
Counters->ReportCompileRequestCompile(dbCounters);
+ NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
+
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
- true, request.UserToken, request.Deadline, dbCounters, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit());
+ true, request.UserToken, request.Deadline, dbCounters,
+ ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
+ std::move(CompileServiceSpan));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);
@@ -558,7 +571,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() <<
"Exceeded maximum number of requests in compile service queue.");
- ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit));
+ ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
}
} else {
@@ -567,7 +580,10 @@ private:
<< ", queryUid: " << request.Uid);
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << request.Uid);
- ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit));
+
+ NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
+
+ ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -611,7 +627,7 @@ private:
auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query);
for (auto& request : requests) {
LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString());
- Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit));
+ Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit), std::move(request.CompileServiceSpan));
}
} else {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
@@ -620,11 +636,11 @@ private:
}
LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString());
- Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit));
+ Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
}
catch (const std::exception& e) {
LogException("TEvCompileResponse", ev->Sender, e, ctx);
- ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit));
+ ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
}
ProcessQueue(ctx);
@@ -682,7 +698,7 @@ private:
Counters->ReportCompileRequestTimeout(request->DbCounters);
NYql::TIssue issue(NYql::TPosition(), "Compilation timed out.");
- ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit));
+ ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit), std::move(request->CompileServiceSpan));
} else {
StartCompilation(std::move(*request), ctx);
}
@@ -695,7 +711,7 @@ private:
bool recompileWithNewEngine = Config.GetForceNewEnginePercent() > 0;
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, ModuleResolverState, Counters,
- request.Uid, request.Query, request.UserToken, request.DbCounters, recompileWithNewEngine);
+ request.Uid, request.Query, request.UserToken, request.DbCounters, recompileWithNewEngine, request.CompileServiceSpan.GetTraceId());
auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap,
AppData(ctx)->UserPoolId);
@@ -714,7 +730,7 @@ private:
}
void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
- const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit)
+ const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
const auto& query = compileResult->Query;
LWTRACK(KqpCompileServiceReply,
@@ -735,34 +751,38 @@ private:
responseEv->ForceNewEngineLevel = Config.GetForceNewEngineLevel();
}
+ if (span) {
+ span.End();
+ }
+
ctx.Send(sender, responseEv.Release());
}
void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
- const TActorContext& ctx, NLWTrace::TOrbit orbit)
+ const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
NKqpProto::TKqpStatsCompile stats;
stats.SetFromCache(true);
LWTRACK(KqpCompileServiceReplyFromCache, orbit);
- Reply(sender, compileResult, stats, ctx, std::move(orbit));
+ Reply(sender, compileResult, stats, ctx, std::move(orbit), std::move(span));
}
void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status,
- const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit)
+ const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReplyError, orbit);
- Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit));
+ Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span));
}
void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,
- const TActorContext& ctx, NLWTrace::TOrbit orbit)
+ const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation.");
issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message));
LWTRACK(KqpCompileServiceReplyInternalError, orbit);
- ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit));
+ ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit), std::move(span));
}
static void LogException(const TString& scope, const TActorId& sender, const std::exception& e,
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h
index 8adae6b1d8d..0a34d68f845 100644
--- a/ydb/core/kqp/kqp_impl.h
+++ b/ydb/core/kqp/kqp_impl.h
@@ -52,10 +52,11 @@ IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& servic
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const NKikimrConfig::TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken,
- TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine);
+ TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId = {});
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {});
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
+ NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {});
struct TKqpWorkerSettings {
TString Cluster;
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 18cebe401fd..cfb8f81be38 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -16,6 +16,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/cputime.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/sys_view/service/sysview_service.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
@@ -29,6 +30,9 @@
#include <util/string/printf.h>
#include <util/string/escape.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
+#include <library/cpp/actors/wilson/wilson_trace.h>
+
LWTRACE_USING(KQP_PROVIDER);
namespace NKikimr {
@@ -83,6 +87,7 @@ struct TKqpQueryState {
TString UserToken;
NLWTrace::TOrbit Orbit;
+ NWilson::TSpan KqpSessionSpan;
TString TxId; // User tx
bool Commit = false;
@@ -312,6 +317,10 @@ public:
YQL_ENSURE(queryRequest.HasAction());
auto action = queryRequest.GetAction();
+ auto id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>());
+ LOG_I("Wilson Tracing started, id: " + std::to_string(id.GetTraceId()));
+ QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
+
LWTRACK(KqpSessionQueryRequest,
QueryState->Orbit,
queryRequest.GetDatabase(),
@@ -426,7 +435,8 @@ public:
auto compileRequestActor = CreateKqpCompileRequestActor(SelfId(), QueryState->UserToken, uid,
std::move(query), keepInCache, compileDeadline, Settings.DbCounters,
- QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit());
+ QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit(),
+ QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId());
TlsActivationContext->ExecutorThread.RegisterActor(compileRequestActor);
@@ -1001,6 +1011,9 @@ public:
if (QueryState) {
request.Orbit = std::move(QueryState->Orbit);
}
+ request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId();
+ LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
+
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
(QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(),
RequestCounters);
@@ -1488,8 +1501,14 @@ public:
}
if (status == Ydb::StatusIds::SUCCESS) {
+ if (QueryState && QueryState->KqpSessionSpan) {
+ QueryState->KqpSessionSpan.EndOk();
+ }
LWTRACK(KqpSessionReplySuccess, QueryState->Orbit, record.GetArena() ? record.GetArena()->SpaceUsed() : 0);
} else {
+ if (QueryState && QueryState->KqpSessionSpan) {
+ QueryState->KqpSessionSpan.EndError(response.DebugString());
+ }
LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status);
}
Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId);
@@ -1947,6 +1966,7 @@ private:
TActorId IdleTimerActorId;
ui32 IdleTimerId = 0;
std::optional<TSessionShutdownState> ShutdownState;
+
};
} // namespace
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index 632b36b924a..34c23cd0a4c 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -14,10 +14,13 @@
#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
+#include <ydb/core/base/wilson.h>
+
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <util/string/join.h>
@@ -117,6 +120,8 @@ private:
}
void HandleWork(TEvKqpNode::TEvStartKqpTasksRequest::TPtr& ev) {
+ NWilson::TSpan sendTasksSpan(TWilsonKqp::KqpNodeSendTasks, NWilson::TTraceId(ev->TraceId), "KqpNode.SendTasks", NWilson::EFlags::AUTO_END);
+
auto& msg = ev->Get()->Record;
auto requester = ev->Sender;
@@ -314,12 +319,12 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters);
+ CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateAsyncIoFactory(),
- nullptr, runtimeSettings, memoryLimits);
+ nullptr, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
computeActor = CaFactory->CreateKqpComputeActor(request.Executer, txId, std::move(dqTask),
diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
index 7dfb1a294e6..94ff3791e6d 100644
--- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
+++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
@@ -3,6 +3,7 @@
#include "operation.h"
#include <ydb/core/util/pb.h>
+#include <ydb/core/base/wilson.h>
namespace NKikimr {
namespace NDataShard {
@@ -18,6 +19,7 @@ TDataShard::TTxProposeTransactionBase::TTxProposeTransactionBase(TDataShard *sel
, Kind(static_cast<EOperationKind>(Ev->Get()->GetTxKind()))
, TxId(Ev->Get()->GetTxId())
, Acked(!delayed)
+ , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END)
{
}
@@ -66,6 +68,10 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
TActorId target = Op ? Op->GetTarget() : Ev->Get()->GetSource();
ui64 cookie = Op ? Op->GetCookie() : Ev->Cookie;
+
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.EndOk();
+ }
ctx.Send(target, result.Release(), 0, cookie);
return true;
@@ -84,6 +90,10 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
// Unsuccessful operation parse.
if (op->IsAborted()) {
Y_VERIFY(op->Result());
+
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.EndError("Unsuccessful operation parse");
+ }
ctx.Send(op->GetTarget(), op->Result().Release());
return true;
}
@@ -158,6 +168,10 @@ void TDataShard::TTxProposeTransactionBase::Complete(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TTxProposeTransactionBase::Complete at " << Self->TabletID());
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.End();
+ }
+
if (Op) {
Y_VERIFY(!Op->GetExecutionPlan().empty());
if (!CompleteList.empty()) {
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index a73e24b1991..9447bf36e87 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -4,6 +4,8 @@
#include "datashard_impl.h"
#include "execution_unit_kind.h"
+#include <library/cpp/actors/wilson/wilson_span.h>
+
namespace NKikimr {
namespace NDataShard {
@@ -106,6 +108,7 @@ protected:
TInstant CommitStart;
bool Acked;
bool Rescheduled = false;
+ NWilson::TSpan ProposeTransactionSpan;
};
class TDataShard::TTxReadSet : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index c71f512809c..1925de10c38 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -8,6 +8,7 @@
#include "dq_compute_memory_quota.h"
#include <ydb/core/base/kikimr_issue.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/protos/services.pb.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
@@ -20,6 +21,7 @@
#include <ydb/library/yql/dq/actors/dq.h>
#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <util/generic/size_literals.h>
#include <util/string/join.h>
@@ -169,7 +171,8 @@ protected:
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
bool ownMemoryQuota = true, bool passExceptions = false,
- const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr)
+ const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr,
+ NWilson::TTraceId traceId = {})
: ExecuterId(executerId)
, TxId(txId)
, Task(std::move(task))
@@ -183,6 +186,7 @@ protected:
, MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr)
, Running(!Task.GetCreateSuspended())
, PassExceptions(passExceptions)
+ , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
{
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
@@ -195,7 +199,8 @@ protected:
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr)
+ const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr,
+ NWilson::TTraceId traceId = {})
: ExecuterId(executerId)
, TxId(txId)
, Task(task)
@@ -207,6 +212,7 @@ protected:
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(InitMemoryQuota())
, Running(!Task.GetCreateSuspended())
+ , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
{
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
@@ -571,6 +577,10 @@ protected:
}
IssuesToMessage(issues, record.MutableIssues());
+ if (ComputeActorSpan) {
+ ComputeActorSpan.End();
+ }
+
this->Send(ExecuterId, execEv.Release());
if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) {
@@ -1023,6 +1033,14 @@ protected:
<< "Timeout event from compute actor " << this->SelfId()
<< ", TxId: " << TxId << ", task: " << Task.GetId());
+ if (ComputeActorSpan) {
+ ComputeActorSpan.EndError(
+ TStringBuilder()
+ << "Timeout event from compute actor " << this->SelfId()
+ << ", TxId: " << TxId << ", task: " << Task.GetId()
+ );
+ }
+
this->Send(ExecuterId, abortEv.Release());
TerminateSources("timeout exceeded", false);
@@ -1115,6 +1133,11 @@ protected:
this->TerminateSources(issues, success);
if (ev->Sender != ExecuterId) {
+
+ if (ComputeActorSpan) {
+ ComputeActorSpan.End();
+ }
+
NActors::TActivationContext::Send(ev->Forward(ExecuterId));
}
@@ -1777,6 +1800,10 @@ protected:
CA_LOG_D("Send stats to executor actor " << ExecuterId << " TaskId: " << Task.GetId()
<< " Stats: " << dbgPrintStats());
+ if (ComputeActorSpan) {
+ ComputeActorSpan.End();
+ }
+
this->Send(ExecuterId, evState.release(), NActors::IEventHandle::FlagTrackDelivery);
LastSendStatsTime = now;
@@ -1825,6 +1852,7 @@ private:
bool Running = true;
TInstant LastSendStatsTime;
bool PassExceptions = false;
+ NWilson::TSpan ComputeActorSpan;
protected:
bool MonCountersProvided = false;
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage;