summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <[email protected]>2022-08-26 17:27:23 +0300
committermdartemenko <[email protected]>2022-08-26 17:27:23 +0300
commit5ed0649ded71642e99fbfad64f95ccf0b8dd18fb (patch)
treee3b0f67baa07e87e52799649df544062c826af61
parentc0ec5c1dde9b0d33aba0d5e4e942fc0c64d99b30 (diff)
Add support of Wilson tracing lib for kqp execution
add tracing to scan executer add tracing to data and literal executer add local traces add scan executer coverage
-rw-r--r--library/cpp/actors/wilson/wilson_span.cpp4
-rw-r--r--ydb/core/base/wilson.h36
-rw-r--r--ydb/core/kqp/CMakeLists.txt2
-rw-r--r--ydb/core/kqp/common/kqp_gateway.h2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp10
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp10
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp51
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h39
-rw-r--r--ydb/core/kqp/executer/kqp_literal_executer.cpp23
-rw-r--r--ydb/core/kqp/executer/kqp_planner.cpp43
-rw-r--r--ydb/core/kqp/executer/kqp_planner.h6
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp23
-rw-r--r--ydb/core/kqp/kqp_compile_actor.cpp19
-rw-r--r--ydb/core/kqp/kqp_compile_request.cpp47
-rw-r--r--ydb/core/kqp/kqp_compile_service.cpp70
-rw-r--r--ydb/core/kqp/kqp_impl.h5
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp22
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard__propose_tx_base.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h32
22 files changed, 407 insertions, 68 deletions
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp
index c932560254d..dcd458be7c6 100644
--- a/library/cpp/actors/wilson/wilson_span.cpp
+++ b/library/cpp/actors/wilson/wilson_span.cpp
@@ -53,7 +53,9 @@ namespace NWilson {
}
void TSpan::Send() {
- TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
+ if (TlsActivationContext) {
+ TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
+ }
Data->Sent = true;
}
diff --git a/ydb/core/base/wilson.h b/ydb/core/base/wilson.h
index 5b62bd51c91..47dd6076988 100644
--- a/ydb/core/base/wilson.h
+++ b/ydb/core/base/wilson.h
@@ -11,4 +11,40 @@ namespace NKikimr {
};
};
+ struct TWilsonKqp {
+ enum {
+ KqpSession = 10,
+ CompileRequest = 9,
+ CompileService = 8,
+ CompileActor = 7,
+
+ KqpSessionCreateAndSendPropose = 9,
+
+ ExecuterReadyState = 9,
+ ExecuterWaitResolveState = 9,
+ ExecuterTableResolve = 9,
+ ExecuterZombieState = 9,
+
+ ComputeActor = 6,
+
+ LiteralExecuter = 9,
+ LiteralExecuterPrepareTasks = 9,
+ LiteralExecuterRunTasks = 9,
+
+ DataExecuter = 9,
+ DataExecuterPrepareState = 9,
+ DataExecuterPrepateTasks = 9,
+ DataExecuterExecuteState = 9,
+ DataExecuterSendTasksAndTxs = 9,
+ DataExecuterWaitSnapshotState = 9,
+ ProposeTransaction = 8,
+
+ ScanExecuter = 9,
+ ScanExecuterExecuteState = 9,
+ ScanExecuterPrepareTasks = 9,
+ KqpPlanner = 8,
+ KqpNodeSendTasks = 7,
+ };
+ };
+
} // NKikimr
diff --git a/ydb/core/kqp/CMakeLists.txt b/ydb/core/kqp/CMakeLists.txt
index 041ed5ad1ae..7d8bf5d77c3 100644
--- a/ydb/core/kqp/CMakeLists.txt
+++ b/ydb/core/kqp/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(ydb-core-kqp PUBLIC
cpp-actors-helpers
cpp-digest-md5
cpp-string_utils-base64
+ cpp-actors-wilson
ydb-core-actorlib_impl
ydb-core-base
core-client-minikql_compile
@@ -72,6 +73,7 @@ target_link_libraries(ydb-core-kqp.global PUBLIC
cpp-actors-helpers
cpp-digest-md5
cpp-string_utils-base64
+ cpp-actors-wilson
ydb-core-actorlib_impl
ydb-core-base
core-client-minikql_compile
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h
index 05eee6cb38e..f7d18b682dd 100644
--- a/ydb/core/kqp/common/kqp_gateway.h
+++ b/ydb/core/kqp/common/kqp_gateway.h
@@ -8,6 +8,7 @@
#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
+#include <library/cpp/actors/wilson/wilson_trace.h>
#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/lwtrace/shuttle.h>
@@ -128,6 +129,7 @@ public:
bool NeedTxId = true;
NLWTrace::TOrbit Orbit;
+ NWilson::TTraceId TraceId;
};
struct TExecPhysicalResult : public TGenericResult {
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index d9c57edc276..702918be8af 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -19,13 +19,14 @@ namespace NKqp {
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits);
+ const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
+ NWilson::TTraceId traceId = {});
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
- TIntrusivePtr<TKqpCounters> counters);
+ TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {});
namespace NComputeActor {
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 3c6d20918cc..aeb2cafe70a 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -35,8 +35,9 @@ public:
TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
- : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ NWilson::TTraceId traceId)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
{
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
@@ -305,10 +306,11 @@ private:
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ NWilson::TTraceId traceId)
{
return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits);
+ functionRegistry, settings, memoryLimits, std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 8152d361782..1351c6a35b7 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -103,8 +103,9 @@ public:
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
- : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters,
+ NWilson::TTraceId traceId)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
, Counters(counters)
@@ -1045,10 +1046,11 @@ private:
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters,
+ NWilson::TTraceId traceId)
{
return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits, counters);
+ functionRegistry, settings, memoryLimits, counters, std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 2b8438bafba..4728b59d004 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/client/minikql_compile/db_key_resolver.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
@@ -133,7 +134,7 @@ public:
TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
TKqpRequestCounters::TPtr counters)
- : TBase(std::move(request), database, userToken, counters)
+ : TBase(std::move(request), database, userToken, counters, TWilsonKqp::DataExecuter, "DataExecuter")
{
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
@@ -181,6 +182,10 @@ public:
return;
}
+ if (ExecuterTableResolveSpan) {
+ ExecuterTableResolveSpan.End();
+ }
+
Execute();
}
@@ -503,6 +508,11 @@ private:
LOG_D("All shards prepared, become ExecuteState.");
Become(&TKqpDataExecuter::ExecuteState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END);
+ }
+
ExecutePlanned();
}
@@ -1231,7 +1241,11 @@ private:
ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0);
}
- Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(ev, shardId, true));
+ auto traceId = ExecuterSpan.GetTraceId();
+
+ LOG_D("ExecuteDatashardTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));
+
+ Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(ev, shardId, true), 0, 0, std::move(traceId));
auto result = ShardStates.emplace(shardId, std::move(shardState));
YQL_ENSURE(result.second);
@@ -1263,7 +1277,7 @@ private:
return false;
};
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits);
+ auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits, ExecuterSpan.GetTraceId());
auto computeActorId = Register(computeActor);
task.ComputeActorId = computeActorId;
@@ -1274,6 +1288,7 @@ private:
}
void Execute() {
+ NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
RequestControls.Reqister(TlsActivationContext->AsActorContext());
@@ -1513,9 +1528,17 @@ private:
LOG_T("Create temporary mvcc snapshot, ebcome WaitSnapshotState");
Become(&TKqpDataExecuter::WaitSnapshotState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterWaitSnapshotState, ExecuterSpan.GetTraceId(), "WaitSnapshotState", NWilson::EFlags::AUTO_END);
+ }
+
return;
}
+ if (prepareTasksSpan) {
+ prepareTasksSpan.End();
+ }
ContinueExecute(computeTasks, datashardTxs);
}
@@ -1579,9 +1602,17 @@ private:
if (ImmediateTx) {
LOG_T("Immediate tx, become ExecuteState");
Become(&TKqpDataExecuter::ExecuteState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END);
+ }
} else {
LOG_T("Not immediate tx, become PrepareState");
Become(&TKqpDataExecuter::PrepareState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterPrepareState, ExecuterSpan.GetTraceId(), "PrepareState", NWilson::EFlags::AUTO_END);
+ }
}
}
@@ -1647,6 +1678,7 @@ private:
LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
}
+ NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), datashardTxs.size());
// first, start compute tasks
@@ -1705,6 +1737,10 @@ private:
ExecuteDatashardTransaction(shardId, shardTx, lockTxId);
}
+ if (sendTasksSpan) {
+ sendTasksSpan.End();
+ }
+
LOG_I("Total tasks: " << TasksGraph.GetTasks().size()
<< ", readonly: " << ReadOnlyTx
<< ", datashardTxs: " << datashardTxs.size()
@@ -1773,6 +1809,15 @@ private:
}
LWTRACK(KqpDataExecuterFinalize, ResponseEv->Orbit, TxId, LastShard, response.GetResult().ResultsSize(), response.ByteSize());
+
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = {};
+ }
+
+ if (ExecuterSpan) {
+ ExecuterSpan.EndOk();
+ }
LOG_D("Sending response to: " << Target << ", results: " << Results.size());
Send(Target, ResponseEv.release());
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index e1830b9f5b1..250299766d2 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -10,6 +10,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/base/kikimr_issue.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/kqp/executer/kqp_tasks_graph.h>
@@ -26,6 +27,7 @@
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
@@ -81,11 +83,12 @@ template <class TDerived, EExecType ExecType>
class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
public:
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
- TKqpRequestCounters::TPtr counters)
+ TKqpRequestCounters::TPtr counters, ui64 spanVerbosity = 0, TString spanName = "no_name")
: Request(std::move(request))
, Database(database)
, UserToken(userToken)
, Counters(counters)
+ , ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>();
ResponseEv->Orbit = std::move(Request.Orbit);
@@ -104,6 +107,7 @@ public:
LOG_T("Bootstrap done, become ReadyState");
this->Become(&TKqpExecuterBase::ReadyState);
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterReadyState, ExecuterSpan.GetTraceId(), "ReadyState", NWilson::EFlags::AUTO_END);
}
void ReportEventElapsedTime() {
@@ -166,6 +170,13 @@ protected:
LOG_T("Got request, become WaitResolveState");
this->Become(&TDerived::WaitResolveState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterWaitResolveState, ExecuterSpan.GetTraceId(), "WaitResolveState", NWilson::EFlags::AUTO_END);
+ }
+
+ ExecuterTableResolveSpan = NWilson::TSpan(TWilsonKqp::ExecuterTableResolve, ExecuterStateSpan.GetTraceId(), "ExecuterTableResolve", NWilson::EFlags::AUTO_END);
+
StartResolveTime = now;
if (Stats) {
@@ -184,6 +195,11 @@ protected:
InternalError(issues);
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
+
+ if (ExecuterSpan) {
+ ExecuterSpan.EndError("timeout");
+ }
+
this->Send(Target, abortEv.Release());
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, "timeout");
@@ -199,10 +215,20 @@ protected:
if (cancel) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::CANCELLED, "Request timeout exceeded");
+
+ if (ExecuterSpan) {
+ ExecuterSpan.EndError("timeout");
+ }
+
this->Send(Target, abortEv.Release());
CancelAtActor = {};
} else {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
+
+ if (ExecuterSpan) {
+ ExecuterSpan.EndError("timeout");
+ }
+
this->Send(Target, abortEv.Release());
DeadlineActor = {};
}
@@ -605,6 +631,10 @@ protected:
LWTRACK(KqpBaseExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
+ if (ExecuterSpan) {
+ ExecuterSpan.EndError(response.DebugString());
+ }
+
this->Send(Target, ResponseEv.release());
this->PassAway();
}
@@ -638,6 +668,10 @@ protected:
this->Send(this->SelfId(), new TEvents::TEvPoison);
LOG_T("Terminate, become ZombieState");
this->Become(&TKqpExecuterBase::ZombieState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterZombieState, ExecuterSpan.GetTraceId(), "ZombieState", NWilson::EFlags::AUTO_END);
+ }
} else {
IActor::PassAway();
}
@@ -686,6 +720,9 @@ protected:
TInstant LastResourceUsageUpdate;
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv;
+ NWilson::TSpan ExecuterSpan;
+ NWilson::TSpan ExecuterStateSpan;
+ NWilson::TSpan ExecuterTableResolveSpan;
private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
};
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp
index 5ca5367accd..0c663161f26 100644
--- a/ydb/core/kqp/executer/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp
@@ -9,6 +9,8 @@
#include <ydb/core/kqp/prepare/kqp_query_plan.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/core/base/wilson.h>
+
namespace NKikimr {
namespace NKqp {
@@ -66,6 +68,7 @@ public:
TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters)
: Request(std::move(request))
, Counters(counters)
+ , LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter")
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>();
ResponseEv->Orbit = std::move(Request.Orbit);
@@ -109,6 +112,7 @@ private:
}
void Handle(TEvKqpExecuter::TEvTxRequest::TPtr& ev) {
+ NWilson::TSpan prepareTasksSpan(TWilsonKqp::LiteralExecuterPrepareTasks, LiteralExecuterSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
if (Stats) {
Stats->StartTs = TInstant::Now();
}
@@ -171,6 +175,12 @@ private:
}
});
+ if (prepareTasksSpan) {
+ prepareTasksSpan.EndOk();
+ }
+
+ NWilson::TSpan runTasksSpan(TWilsonKqp::LiteralExecuterRunTasks, LiteralExecuterSpan.GetTraceId(), "RunTasks", NWilson::EFlags::AUTO_END);
+
// task runner settings
NMiniKQL::TKqpComputeContextBase computeCtx;
TDqTaskRunnerContext context = CreateTaskRunnerContext(&computeCtx, &alloc, &typeEnv);
@@ -190,6 +200,10 @@ private:
}
}
+ if (runTasksSpan) {
+ runTasksSpan.End();
+ }
+
Finalize(context, holderFactory);
PassAway();
}
@@ -349,6 +363,10 @@ private:
LWTRACK(KqpLiteralExecuterFinalize, ResponseEv->Orbit, TxId);
+ if (LiteralExecuterSpan) {
+ LiteralExecuterSpan.EndOk();
+ }
+
LOG_D("Sending response to: " << Target << ", results: " << Results.size());
Send(Target, ResponseEv.release());
}
@@ -413,6 +431,10 @@ private:
LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
+ if (LiteralExecuterSpan) {
+ LiteralExecuterSpan.EndError(response.DebugString());
+ }
+
Send(Target, ResponseEv.release());
PassAway();
}
@@ -438,6 +460,7 @@ private:
TVector<TIntrusivePtr<IDqTaskRunner>> TaskRunners;
std::unordered_map<ui64, ui32> TaskId2StageId;
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv;
+ NWilson::TSpan LiteralExecuterSpan;
};
} // anonymous namespace
diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp
index 313c8b0779b..a2258184358 100644
--- a/ydb/core/kqp/executer/kqp_planner.cpp
+++ b/ydb/core/kqp/executer/kqp_planner.cpp
@@ -4,6 +4,7 @@
#include "kqp_shards_resolver.h"
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/kqp/rm/kqp_rm.h>
#include <ydb/core/kqp/rm/kqp_resource_estimation.h>
@@ -26,7 +27,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath)
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId)
: TxId(txId)
, ExecuterId(executer)
, Tasks(std::move(tasks))
@@ -40,6 +41,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
, EnableLlvm(enableLlvm)
, WithSpilling(withSpilling)
, RlPath(rlPath)
+ , KqpPlannerSpan(TWilsonKqp::KqpPlanner, std::move(traceId), "KqpPlanner")
{
if (!Database) {
// a piece of magic for tests
@@ -135,6 +137,11 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
LOG_E("Not enough resources to execute query locally and no information about other nodes");
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
"Not enough resources to execute query locally and no information about other nodes");
+
+ if (KqpPlannerSpan) {
+ KqpPlannerSpan.EndError("Not enough resources to execute query locally and no information about other nodes");
+ }
+
Send(ExecuterId, ev.Release());
PassAway();
return;
@@ -154,6 +161,8 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto plan = planner->Plan(snapshot, std::move(est));
+ long requestsCnt = 0;
+
if (!plan.empty()) {
for (auto& group : plan) {
auto ev = PrepareKqpNodeRequest(group.TaskIds);
@@ -161,7 +170,8 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto target = MakeKqpNodeServiceID(group.NodeId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId())));
+ CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ ++requestsCnt;
}
TVector<ui64> nodes;
@@ -177,15 +187,26 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto target = MakeKqpNodeServiceID(nodeId);
LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId())));
+ CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ ++requestsCnt;
}
Y_VERIFY(ScanTasks.empty());
} else {
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
"Not enough resources to execute query");
+
+ if (KqpPlannerSpan) {
+ KqpPlannerSpan.EndError("Not enough resources to execute query");
+ }
+
Send(ExecuterId, ev.Release());
}
+ if (KqpPlannerSpan) {
+ KqpPlannerSpan.Attribute("RequestsCnt", requestsCnt);
+ KqpPlannerSpan.EndOk();
+ }
+
PassAway();
}
@@ -200,8 +221,8 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho
auto target = MakeKqpNodeServiceID(SelfId().NodeId());
LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
- TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery));
-
+ TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery, 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ long requestsCnt = 1;
TVector<ui64> nodes;
for (const auto& pair: ScanTasks) {
@@ -220,10 +241,16 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho
AddScansToKqpNodeRequest(ev, nodeId);
auto target = MakeKqpNodeServiceID(nodeId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId())));
+ CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ ++requestsCnt;
}
Y_VERIFY(ScanTasks.size() == 0);
+ if (KqpPlannerSpan) {
+ KqpPlannerSpan.Attribute("requestsCnt", requestsCnt);
+ KqpPlannerSpan.EndOk();
+ }
+
PassAway();
}
@@ -352,10 +379,10 @@ IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& token, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath)
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId)
{
return new TKqpPlanner(txId, executer, std::move(tasks), std::move(scanTasks), snapshot,
- database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath);
+ database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, std::move(traceId));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer/kqp_planner.h b/ydb/core/kqp/executer/kqp_planner.h
index ae1cb852c13..5c5cfeabb69 100644
--- a/ydb/core/kqp/executer/kqp_planner.h
+++ b/ydb/core/kqp/executer/kqp_planner.h
@@ -6,6 +6,7 @@
#include <ydb/core/kqp/rm/kqp_rm.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
@@ -35,7 +36,7 @@ public:
THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages,
- bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath);
+ bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId);
void Bootstrap(const TActorContext& ctx);
@@ -71,12 +72,13 @@ private:
const bool WithSpilling;
const TMaybe<NKikimrKqp::TRlPath> RlPath;
THashSet<ui32> TrackingNodes;
+ NWilson::TSpan KqpPlannerSpan;
};
IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath);
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId = {});
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 228b36899c3..da8e800f5e2 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -44,7 +44,7 @@ public:
TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters)
- : TBase(std::move(request), database, userToken, counters)
+ : TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter")
{
YQL_ENSURE(Request.Transactions.size() == 1);
YQL_ENSURE(Request.Locks.empty());
@@ -101,6 +101,10 @@ public:
}
}
+ if (ExecuterTableResolveSpan) {
+ ExecuterTableResolveSpan.End();
+ }
+
if (shardIds.size() > 0) {
LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds));
@@ -636,6 +640,8 @@ private:
NMiniKQL::TScopedAlloc alloc(TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators());
NMiniKQL::TTypeEnvironment typeEnv(alloc);
+ NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
+
NMiniKQL::TMemoryUsageInfo memInfo("PrepareTasks");
NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry);
@@ -812,6 +818,10 @@ private:
return;
}
+ if (prepareTasksSpan) {
+ prepareTasksSpan.End();
+ }
+
LOG_D("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: true"
<< ", " << nScanTasks << " scan tasks on " << scanTasks.size() << " nodes"
<< ", totalShardScans: " << nShardScans << ", execType: Scan"
@@ -820,6 +830,11 @@ private:
ExecuteScanTx(std::move(computeTasks), std::move(scanTasks));
Become(&TKqpScanExecuter::ExecuteState);
+ if (ExecuterStateSpan) {
+ ExecuterStateSpan.End();
+ ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ScanExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END);
+ }
+
}
void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks) {
@@ -838,7 +853,7 @@ private:
auto planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks),
std::move(scanTasks), Request.Snapshot,
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
- Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath);
+ Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan.GetTraceId());
RegisterWithSameMailbox(planner);
}
@@ -877,6 +892,10 @@ private:
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, Results.size());
+ if (ExecuterSpan) {
+ ExecuterSpan.EndOk();
+ }
+
LOG_D("Sending response to: " << Target);
Send(Target, ResponseEv.release());
PassAway();
diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp
index ee471926fb2..44f2a035f5e 100644
--- a/ydb/core/kqp/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/kqp_compile_actor.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/host/kqp_host.h>
@@ -10,6 +11,7 @@
#include <ydb/library/yql/utils/actor_log/log.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/json/json_writer.h>
#include <library/cpp/string_utils/base64/base64.h>
@@ -46,7 +48,7 @@ public:
TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken,
- TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine)
+ TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId)
: Owner(owner)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
@@ -57,6 +59,7 @@ public:
, Config(MakeIntrusive<TKikimrConfiguration>())
, CompilationTimeout(TDuration::MilliSeconds(serviceConfig.GetCompileTimeoutMs()))
, RecompileWithNewEngine(recompileWithNewEngine)
+ , CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
{
Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), Query.Cluster, kqpSettings->Settings, false);
@@ -74,6 +77,10 @@ public:
Counters->ReportCompileStart(DbCounters);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "traceId: verbosity = "
+ << std::to_string(CompileActorSpan.GetTraceId().GetVerbosity()) << ", trace_id = "
+ << std::to_string(CompileActorSpan.GetTraceId().GetTraceId()));
+
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Start compilation"
<< ", self: " << ctx.SelfID
<< ", cluster: " << Query.Cluster
@@ -215,6 +222,10 @@ private:
Counters->ReportCompileFinish(DbCounters);
+ if (CompileActorSpan) {
+ CompileActorSpan.End();
+ }
+
Die(ctx);
}
@@ -390,6 +401,8 @@ private:
TIntrusivePtr<IKqpHost::IAsyncQueryResult> AsyncCompileResult;
std::shared_ptr<TKqpCompileResult> KqpCompileResult;
std::optional<TString> ReplayMessage;
+
+ NWilson::TSpan CompileActorSpan;
};
void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) {
@@ -405,10 +418,10 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken,
- TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine)
+ TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId)
{
return new TKqpCompileActor(owner, kqpSettings, serviceConfig, moduleResolverState, counters, uid,
- std::move(query), userToken, dbCounters, recompileWithNewEngine);
+ std::move(query), userToken, dbCounters, recompileWithNewEngine, std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp
index 0d5f80ce409..a06a1a43c0d 100644
--- a/ydb/core/kqp/kqp_compile_request.cpp
+++ b/ydb/core/kqp/kqp_compile_request.cpp
@@ -4,10 +4,12 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
+#include <ydb/core/base/wilson.h>
#include <util/string/escape.h>
@@ -28,7 +30,8 @@ public:
}
TKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit)
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit,
+ NWilson::TTraceId traceId)
: Owner(owner)
, UserToken(userToken)
, Uid(uid)
@@ -36,7 +39,8 @@ public:
, KeepInCache(keepInCache)
, Deadline(deadline)
, DbCounters(dbCounters)
- , Orbit{std::move(orbit)} {}
+ , Orbit{std::move(orbit)}
+ , CompileRequestSpan(TWilsonKqp::CompileRequest, std::move(traceId), "CompileRequest") {}
void Bootstrap(const TActorContext& ctx) {
LWTRACK(KqpCompileRequestBootstrap,
@@ -51,7 +55,7 @@ public:
auto compileEv = MakeHolder<TEvKqp::TEvCompileRequest>(UserToken, Uid, std::move(query),
KeepInCache, Deadline, DbCounters, std::move(Orbit));
- ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release());
+ ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId());
Become(&TKqpCompileRequestActor::MainState);
}
@@ -66,12 +70,22 @@ public:
const auto& stats = ev->Get()->Stats;
if (compileResult->Status != Ydb::StatusIds::SUCCESS || !stats.GetFromCache()) {
+
+ if (CompileRequestSpan) {
+ CompileRequestSpan.End();
+ }
+
ctx.Send(Owner, ev->Release().Release());
Die(ctx);
return;
}
if (!NavigateTables(*compileResult->PreparedQuery, compileResult->Query->Database, ctx)) {
+
+ if (CompileRequestSpan) {
+ CompileRequestSpan.End();
+ }
+
ctx.Send(Owner, ev->Release().Release());
Die(ctx);
return;
@@ -82,6 +96,11 @@ public:
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext &ctx) {
if (ValidateTables(*ev->Get(), ctx)) {
+
+ if (CompileRequestSpan) {
+ CompileRequestSpan.EndOk();
+ }
+
ctx.Send(Owner, DeferredResponse.Release());
Die(ctx);
return;
@@ -95,7 +114,7 @@ public:
auto recompileEv = MakeHolder<TEvKqp::TEvRecompileRequest>(UserToken, compileResult.Uid, compileResult.Query,
Deadline, DbCounters, std::move(DeferredResponse->Orbit));
- ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release());
+ ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId());
DeferredResponse.Reset();
}
@@ -295,6 +314,11 @@ private:
void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) {
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues), std::move(Orbit));
+
+ if (CompileRequestSpan) {
+ CompileRequestSpan.EndError(issues.ToOneLineString());
+ }
+
ctx.Send(Owner, responseEv.Release());
Die(ctx);
}
@@ -312,13 +336,24 @@ private:
THolder<TEvKqp::TEvCompileResponse> DeferredResponse;
NLWTrace::TOrbit Orbit;
+ NWilson::TSpan CompileRequestSpan;
};
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit)
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit,
+ NWilson::TTraceId traceId)
{
- return new TKqpCompileRequestActor(owner, userToken, uid, std::move(query), keepInCache, deadline, dbCounters, std::move(orbit));
+ return new TKqpCompileRequestActor(
+ owner,
+ userToken,
+ uid,
+ std::move(query),
+ keepInCache,
+ deadline,
+ dbCounters,
+ std::move(orbit),
+ std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp
index 2c1ee8e2f78..efba6222ffd 100644
--- a/ydb/core/kqp/kqp_compile_service.cpp
+++ b/ydb/core/kqp/kqp_compile_service.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -11,6 +12,7 @@
#include <ydb/library/aclib/aclib.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/cache/cache.h>
@@ -187,7 +189,8 @@ private:
struct TKqpCompileRequest {
TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache,
- const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
+ const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
+ NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {})
: Sender(sender)
, Query(std::move(query))
, Uid(uid)
@@ -195,7 +198,8 @@ struct TKqpCompileRequest {
, UserToken(userToken)
, Deadline(deadline)
, DbCounters(dbCounters)
- , Orbit(std::move(orbit)) {}
+ , Orbit(std::move(orbit))
+ , CompileServiceSpan(std::move(span)) {}
TActorId Sender;
TKqpQueryId Query;
@@ -207,6 +211,7 @@ struct TKqpCompileRequest {
TActorId CompileActor;
NLWTrace::TOrbit Orbit;
+ NWilson::TSpan CompileServiceSpan;
};
class TKqpRequestsQueue {
@@ -415,13 +420,17 @@ private:
}
catch (const std::exception& e) {
LogException("TEvCompileRequest", ev->Sender, e, ctx);
- ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit));
+ ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {});
}
}
void PerformRequest(TEvKqp::TEvCompileRequest::TPtr& ev, const TActorContext& ctx) {
auto& request = *ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Perform request, TraceId.SpanIdPtr: " << ev->TraceId.GetSpanIdPtr());
+
+ NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, std::move(ev->TraceId), "CompileService");
+
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Received compile request"
<< ", sender: " << ev->Sender
<< ", queryUid: " << (request.Uid ? *request.Uid : "<empty>")
@@ -447,8 +456,7 @@ private:
<< ", sender: " << ev->Sender
<< ", queryUid: " << *request.Uid);
-
- ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit));
+ ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
} else {
LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
@@ -466,7 +474,7 @@ private:
<< ", queryUid: " << *request.Uid);
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid);
- ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit));
+ ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -489,7 +497,7 @@ private:
<< ", sender: " << ev->Sender
<< ", queryUid: " << compileResult->Uid);
- ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit));
+ ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -501,7 +509,8 @@ private:
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
- request.KeepInCache, request.UserToken, request.Deadline, dbCounters, std::move(ev->Get()->Orbit));
+ request.KeepInCache, request.UserToken, request.Deadline, dbCounters,
+ std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);
@@ -512,7 +521,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() <<
"Exceeded maximum number of requests in compile service queue.");
- ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit));
+ ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
}
@@ -529,7 +538,7 @@ private:
}
catch (const std::exception& e) {
LogException("TEvRecompileRequest", ev->Sender, e, ctx);
- ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit));
+ ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {});
}
}
@@ -546,8 +555,12 @@ private:
if (compileResult || request.Query) {
Counters->ReportCompileRequestCompile(dbCounters);
+ NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
+
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
- true, request.UserToken, request.Deadline, dbCounters, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit());
+ true, request.UserToken, request.Deadline, dbCounters,
+ ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
+ std::move(CompileServiceSpan));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);
@@ -558,7 +571,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() <<
"Exceeded maximum number of requests in compile service queue.");
- ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit));
+ ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
}
} else {
@@ -567,7 +580,10 @@ private:
<< ", queryUid: " << request.Uid);
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << request.Uid);
- ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit));
+
+ NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
+
+ ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -611,7 +627,7 @@ private:
auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query);
for (auto& request : requests) {
LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString());
- Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit));
+ Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit), std::move(request.CompileServiceSpan));
}
} else {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
@@ -620,11 +636,11 @@ private:
}
LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString());
- Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit));
+ Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
}
catch (const std::exception& e) {
LogException("TEvCompileResponse", ev->Sender, e, ctx);
- ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit));
+ ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
}
ProcessQueue(ctx);
@@ -682,7 +698,7 @@ private:
Counters->ReportCompileRequestTimeout(request->DbCounters);
NYql::TIssue issue(NYql::TPosition(), "Compilation timed out.");
- ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit));
+ ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit), std::move(request->CompileServiceSpan));
} else {
StartCompilation(std::move(*request), ctx);
}
@@ -695,7 +711,7 @@ private:
bool recompileWithNewEngine = Config.GetForceNewEnginePercent() > 0;
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, ModuleResolverState, Counters,
- request.Uid, request.Query, request.UserToken, request.DbCounters, recompileWithNewEngine);
+ request.Uid, request.Query, request.UserToken, request.DbCounters, recompileWithNewEngine, request.CompileServiceSpan.GetTraceId());
auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap,
AppData(ctx)->UserPoolId);
@@ -714,7 +730,7 @@ private:
}
void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
- const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit)
+ const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
const auto& query = compileResult->Query;
LWTRACK(KqpCompileServiceReply,
@@ -735,34 +751,38 @@ private:
responseEv->ForceNewEngineLevel = Config.GetForceNewEngineLevel();
}
+ if (span) {
+ span.End();
+ }
+
ctx.Send(sender, responseEv.Release());
}
void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
- const TActorContext& ctx, NLWTrace::TOrbit orbit)
+ const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
NKqpProto::TKqpStatsCompile stats;
stats.SetFromCache(true);
LWTRACK(KqpCompileServiceReplyFromCache, orbit);
- Reply(sender, compileResult, stats, ctx, std::move(orbit));
+ Reply(sender, compileResult, stats, ctx, std::move(orbit), std::move(span));
}
void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status,
- const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit)
+ const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReplyError, orbit);
- Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit));
+ Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span));
}
void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,
- const TActorContext& ctx, NLWTrace::TOrbit orbit)
+ const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation.");
issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message));
LWTRACK(KqpCompileServiceReplyInternalError, orbit);
- ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit));
+ ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit), std::move(span));
}
static void LogException(const TString& scope, const TActorId& sender, const std::exception& e,
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h
index 8adae6b1d8d..0a34d68f845 100644
--- a/ydb/core/kqp/kqp_impl.h
+++ b/ydb/core/kqp/kqp_impl.h
@@ -52,10 +52,11 @@ IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& servic
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const NKikimrConfig::TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken,
- TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine);
+ TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId = {});
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {});
+ TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
+ NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {});
struct TKqpWorkerSettings {
TString Cluster;
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 18cebe401fd..cfb8f81be38 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -16,6 +16,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/cputime.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/sys_view/service/sysview_service.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
@@ -29,6 +30,9 @@
#include <util/string/printf.h>
#include <util/string/escape.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
+#include <library/cpp/actors/wilson/wilson_trace.h>
+
LWTRACE_USING(KQP_PROVIDER);
namespace NKikimr {
@@ -83,6 +87,7 @@ struct TKqpQueryState {
TString UserToken;
NLWTrace::TOrbit Orbit;
+ NWilson::TSpan KqpSessionSpan;
TString TxId; // User tx
bool Commit = false;
@@ -312,6 +317,10 @@ public:
YQL_ENSURE(queryRequest.HasAction());
auto action = queryRequest.GetAction();
+ auto id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>());
+ LOG_I("Wilson Tracing started, id: " + std::to_string(id.GetTraceId()));
+ QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
+
LWTRACK(KqpSessionQueryRequest,
QueryState->Orbit,
queryRequest.GetDatabase(),
@@ -426,7 +435,8 @@ public:
auto compileRequestActor = CreateKqpCompileRequestActor(SelfId(), QueryState->UserToken, uid,
std::move(query), keepInCache, compileDeadline, Settings.DbCounters,
- QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit());
+ QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit(),
+ QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId());
TlsActivationContext->ExecutorThread.RegisterActor(compileRequestActor);
@@ -1001,6 +1011,9 @@ public:
if (QueryState) {
request.Orbit = std::move(QueryState->Orbit);
}
+ request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId();
+ LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
+
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
(QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(),
RequestCounters);
@@ -1488,8 +1501,14 @@ public:
}
if (status == Ydb::StatusIds::SUCCESS) {
+ if (QueryState && QueryState->KqpSessionSpan) {
+ QueryState->KqpSessionSpan.EndOk();
+ }
LWTRACK(KqpSessionReplySuccess, QueryState->Orbit, record.GetArena() ? record.GetArena()->SpaceUsed() : 0);
} else {
+ if (QueryState && QueryState->KqpSessionSpan) {
+ QueryState->KqpSessionSpan.EndError(response.DebugString());
+ }
LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status);
}
Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId);
@@ -1947,6 +1966,7 @@ private:
TActorId IdleTimerActorId;
ui32 IdleTimerId = 0;
std::optional<TSessionShutdownState> ShutdownState;
+
};
} // namespace
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index 632b36b924a..34c23cd0a4c 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -14,10 +14,13 @@
#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
+#include <ydb/core/base/wilson.h>
+
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <util/string/join.h>
@@ -117,6 +120,8 @@ private:
}
void HandleWork(TEvKqpNode::TEvStartKqpTasksRequest::TPtr& ev) {
+ NWilson::TSpan sendTasksSpan(TWilsonKqp::KqpNodeSendTasks, NWilson::TTraceId(ev->TraceId), "KqpNode.SendTasks", NWilson::EFlags::AUTO_END);
+
auto& msg = ev->Get()->Record;
auto requester = ev->Sender;
@@ -314,12 +319,12 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters);
+ CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateAsyncIoFactory(),
- nullptr, runtimeSettings, memoryLimits);
+ nullptr, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
computeActor = CaFactory->CreateKqpComputeActor(request.Executer, txId, std::move(dqTask),
diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
index 7dfb1a294e6..94ff3791e6d 100644
--- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
+++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
@@ -3,6 +3,7 @@
#include "operation.h"
#include <ydb/core/util/pb.h>
+#include <ydb/core/base/wilson.h>
namespace NKikimr {
namespace NDataShard {
@@ -18,6 +19,7 @@ TDataShard::TTxProposeTransactionBase::TTxProposeTransactionBase(TDataShard *sel
, Kind(static_cast<EOperationKind>(Ev->Get()->GetTxKind()))
, TxId(Ev->Get()->GetTxId())
, Acked(!delayed)
+ , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END)
{
}
@@ -66,6 +68,10 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
TActorId target = Op ? Op->GetTarget() : Ev->Get()->GetSource();
ui64 cookie = Op ? Op->GetCookie() : Ev->Cookie;
+
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.EndOk();
+ }
ctx.Send(target, result.Release(), 0, cookie);
return true;
@@ -84,6 +90,10 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
// Unsuccessful operation parse.
if (op->IsAborted()) {
Y_VERIFY(op->Result());
+
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.EndError("Unsuccessful operation parse");
+ }
ctx.Send(op->GetTarget(), op->Result().Release());
return true;
}
@@ -158,6 +168,10 @@ void TDataShard::TTxProposeTransactionBase::Complete(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TTxProposeTransactionBase::Complete at " << Self->TabletID());
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.End();
+ }
+
if (Op) {
Y_VERIFY(!Op->GetExecutionPlan().empty());
if (!CompleteList.empty()) {
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index a73e24b1991..9447bf36e87 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -4,6 +4,8 @@
#include "datashard_impl.h"
#include "execution_unit_kind.h"
+#include <library/cpp/actors/wilson/wilson_span.h>
+
namespace NKikimr {
namespace NDataShard {
@@ -106,6 +108,7 @@ protected:
TInstant CommitStart;
bool Acked;
bool Rescheduled = false;
+ NWilson::TSpan ProposeTransactionSpan;
};
class TDataShard::TTxReadSet : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index c71f512809c..1925de10c38 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -8,6 +8,7 @@
#include "dq_compute_memory_quota.h"
#include <ydb/core/base/kikimr_issue.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/protos/services.pb.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
@@ -20,6 +21,7 @@
#include <ydb/library/yql/dq/actors/dq.h>
#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <util/generic/size_literals.h>
#include <util/string/join.h>
@@ -169,7 +171,8 @@ protected:
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
bool ownMemoryQuota = true, bool passExceptions = false,
- const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr)
+ const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr,
+ NWilson::TTraceId traceId = {})
: ExecuterId(executerId)
, TxId(txId)
, Task(std::move(task))
@@ -183,6 +186,7 @@ protected:
, MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr)
, Running(!Task.GetCreateSuspended())
, PassExceptions(passExceptions)
+ , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
{
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
@@ -195,7 +199,8 @@ protected:
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr)
+ const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr,
+ NWilson::TTraceId traceId = {})
: ExecuterId(executerId)
, TxId(txId)
, Task(task)
@@ -207,6 +212,7 @@ protected:
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(InitMemoryQuota())
, Running(!Task.GetCreateSuspended())
+ , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
{
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
@@ -571,6 +577,10 @@ protected:
}
IssuesToMessage(issues, record.MutableIssues());
+ if (ComputeActorSpan) {
+ ComputeActorSpan.End();
+ }
+
this->Send(ExecuterId, execEv.Release());
if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) {
@@ -1023,6 +1033,14 @@ protected:
<< "Timeout event from compute actor " << this->SelfId()
<< ", TxId: " << TxId << ", task: " << Task.GetId());
+ if (ComputeActorSpan) {
+ ComputeActorSpan.EndError(
+ TStringBuilder()
+ << "Timeout event from compute actor " << this->SelfId()
+ << ", TxId: " << TxId << ", task: " << Task.GetId()
+ );
+ }
+
this->Send(ExecuterId, abortEv.Release());
TerminateSources("timeout exceeded", false);
@@ -1115,6 +1133,11 @@ protected:
this->TerminateSources(issues, success);
if (ev->Sender != ExecuterId) {
+
+ if (ComputeActorSpan) {
+ ComputeActorSpan.End();
+ }
+
NActors::TActivationContext::Send(ev->Forward(ExecuterId));
}
@@ -1777,6 +1800,10 @@ protected:
CA_LOG_D("Send stats to executor actor " << ExecuterId << " TaskId: " << Task.GetId()
<< " Stats: " << dbgPrintStats());
+ if (ComputeActorSpan) {
+ ComputeActorSpan.End();
+ }
+
this->Send(ExecuterId, evState.release(), NActors::IEventHandle::FlagTrackDelivery);
LastSendStatsTime = now;
@@ -1825,6 +1852,7 @@ private:
bool Running = true;
TInstant LastSendStatsTime;
bool PassExceptions = false;
+ NWilson::TSpan ComputeActorSpan;
protected:
bool MonCountersProvided = false;
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage;