diff options
author | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-26 17:27:23 +0300 |
---|---|---|
committer | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-26 17:27:23 +0300 |
commit | 5ed0649ded71642e99fbfad64f95ccf0b8dd18fb (patch) | |
tree | e3b0f67baa07e87e52799649df544062c826af61 | |
parent | c0ec5c1dde9b0d33aba0d5e4e942fc0c64d99b30 (diff) | |
download | ydb-5ed0649ded71642e99fbfad64f95ccf0b8dd18fb.tar.gz |
Add support of Wilson tracing lib for kqp execution
add tracing to scan executer
add tracing to data and literal executer
add local traces
add scan executer coverage
22 files changed, 407 insertions, 68 deletions
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp index c932560254d..dcd458be7c6 100644 --- a/library/cpp/actors/wilson/wilson_span.cpp +++ b/library/cpp/actors/wilson/wilson_span.cpp @@ -53,7 +53,9 @@ namespace NWilson { } void TSpan::Send() { - TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); + if (TlsActivationContext) { + TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); + } Data->Sent = true; } diff --git a/ydb/core/base/wilson.h b/ydb/core/base/wilson.h index 5b62bd51c91..47dd6076988 100644 --- a/ydb/core/base/wilson.h +++ b/ydb/core/base/wilson.h @@ -11,4 +11,40 @@ namespace NKikimr { }; }; + struct TWilsonKqp { + enum { + KqpSession = 10, + CompileRequest = 9, + CompileService = 8, + CompileActor = 7, + + KqpSessionCreateAndSendPropose = 9, + + ExecuterReadyState = 9, + ExecuterWaitResolveState = 9, + ExecuterTableResolve = 9, + ExecuterZombieState = 9, + + ComputeActor = 6, + + LiteralExecuter = 9, + LiteralExecuterPrepareTasks = 9, + LiteralExecuterRunTasks = 9, + + DataExecuter = 9, + DataExecuterPrepareState = 9, + DataExecuterPrepateTasks = 9, + DataExecuterExecuteState = 9, + DataExecuterSendTasksAndTxs = 9, + DataExecuterWaitSnapshotState = 9, + ProposeTransaction = 8, + + ScanExecuter = 9, + ScanExecuterExecuteState = 9, + ScanExecuterPrepareTasks = 9, + KqpPlanner = 8, + KqpNodeSendTasks = 7, + }; + }; + } // NKikimr diff --git a/ydb/core/kqp/CMakeLists.txt b/ydb/core/kqp/CMakeLists.txt index 041ed5ad1ae..7d8bf5d77c3 100644 --- a/ydb/core/kqp/CMakeLists.txt +++ b/ydb/core/kqp/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(ydb-core-kqp PUBLIC cpp-actors-helpers cpp-digest-md5 cpp-string_utils-base64 + cpp-actors-wilson ydb-core-actorlib_impl ydb-core-base core-client-minikql_compile @@ -72,6 +73,7 @@ target_link_libraries(ydb-core-kqp.global PUBLIC cpp-actors-helpers cpp-digest-md5 cpp-string_utils-base64 + cpp-actors-wilson ydb-core-actorlib_impl ydb-core-base core-client-minikql_compile diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h index 05eee6cb38e..f7d18b682dd 100644 --- a/ydb/core/kqp/common/kqp_gateway.h +++ b/ydb/core/kqp/common/kqp_gateway.h @@ -8,6 +8,7 @@ #include <ydb/core/kqp/provider/yql_kikimr_gateway.h> #include <ydb/core/tx/long_tx_service/public/lock_handle.h> +#include <library/cpp/actors/wilson/wilson_trace.h> #include <library/cpp/actors/core/actorid.h> #include <library/cpp/lwtrace/shuttle.h> @@ -128,6 +129,7 @@ public: bool NeedTxId = true; NLWTrace::TOrbit Orbit; + NWilson::TTraceId TraceId; }; struct TExecPhysicalResult : public TGenericResult { diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index d9c57edc276..702918be8af 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -19,13 +19,14 @@ namespace NKqp { IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits); + const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, + NWilson::TTraceId traceId = {}); IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, - TIntrusivePtr<TKqpCounters> counters); + TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {}); namespace NComputeActor { diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 3c6d20918cc..aeb2cafe70a 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -35,8 +35,9 @@ public: TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) - : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + NWilson::TTraceId traceId) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId)) , ComputeCtx(settings.StatsMode) { if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) { @@ -305,10 +306,11 @@ private: IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + NWilson::TTraceId traceId) { return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), - functionRegistry, settings, memoryLimits); + functionRegistry, settings, memoryLimits, std::move(traceId)); } } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 8152d361782..1351c6a35b7 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -103,8 +103,9 @@ public: TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters) - : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters, + NWilson::TTraceId traceId) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId)) , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) , Counters(counters) @@ -1045,10 +1046,11 @@ private: IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters, + NWilson::TTraceId traceId) { return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory), - functionRegistry, settings, memoryLimits, counters); + functionRegistry, settings, memoryLimits, counters, std::move(traceId)); } } // namespace NKqp diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 2b8438bafba..4728b59d004 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -10,6 +10,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/base/wilson.h> #include <ydb/core/client/minikql_compile/db_key_resolver.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> @@ -133,7 +134,7 @@ public: TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters) - : TBase(std::move(request), database, userToken, counters) + : TBase(std::move(request), database, userToken, counters, TWilsonKqp::DataExecuter, "DataExecuter") { YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); @@ -181,6 +182,10 @@ public: return; } + if (ExecuterTableResolveSpan) { + ExecuterTableResolveSpan.End(); + } + Execute(); } @@ -503,6 +508,11 @@ private: LOG_D("All shards prepared, become ExecuteState."); Become(&TKqpDataExecuter::ExecuteState); + if (ExecuterStateSpan) { + ExecuterStateSpan.End(); + ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END); + } + ExecutePlanned(); } @@ -1231,7 +1241,11 @@ private: ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0); } - Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(ev, shardId, true)); + auto traceId = ExecuterSpan.GetTraceId(); + + LOG_D("ExecuteDatashardTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity())); + + Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(ev, shardId, true), 0, 0, std::move(traceId)); auto result = ShardStates.emplace(shardId, std::move(shardState)); YQL_ENSURE(result.second); @@ -1263,7 +1277,7 @@ private: return false; }; - auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits); + auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits, ExecuterSpan.GetTraceId()); auto computeActorId = Register(computeActor); task.ComputeActorId = computeActorId; @@ -1274,6 +1288,7 @@ private: } void Execute() { + NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); RequestControls.Reqister(TlsActivationContext->AsActorContext()); @@ -1513,9 +1528,17 @@ private: LOG_T("Create temporary mvcc snapshot, ebcome WaitSnapshotState"); Become(&TKqpDataExecuter::WaitSnapshotState); + if (ExecuterStateSpan) { + ExecuterStateSpan.End(); + ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterWaitSnapshotState, ExecuterSpan.GetTraceId(), "WaitSnapshotState", NWilson::EFlags::AUTO_END); + } + return; } + if (prepareTasksSpan) { + prepareTasksSpan.End(); + } ContinueExecute(computeTasks, datashardTxs); } @@ -1579,9 +1602,17 @@ private: if (ImmediateTx) { LOG_T("Immediate tx, become ExecuteState"); Become(&TKqpDataExecuter::ExecuteState); + if (ExecuterStateSpan) { + ExecuterStateSpan.End(); + ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END); + } } else { LOG_T("Not immediate tx, become PrepareState"); Become(&TKqpDataExecuter::PrepareState); + if (ExecuterStateSpan) { + ExecuterStateSpan.End(); + ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterPrepareState, ExecuterSpan.GetTraceId(), "PrepareState", NWilson::EFlags::AUTO_END); + } } } @@ -1647,6 +1678,7 @@ private: LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem()); } + NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), datashardTxs.size()); // first, start compute tasks @@ -1705,6 +1737,10 @@ private: ExecuteDatashardTransaction(shardId, shardTx, lockTxId); } + if (sendTasksSpan) { + sendTasksSpan.End(); + } + LOG_I("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: " << ReadOnlyTx << ", datashardTxs: " << datashardTxs.size() @@ -1773,6 +1809,15 @@ private: } LWTRACK(KqpDataExecuterFinalize, ResponseEv->Orbit, TxId, LastShard, response.GetResult().ResultsSize(), response.ByteSize()); + + if (ExecuterStateSpan) { + ExecuterStateSpan.End(); + ExecuterStateSpan = {}; + } + + if (ExecuterSpan) { + ExecuterSpan.EndOk(); + } LOG_D("Sending response to: " << Target << ", results: " << Results.size()); Send(Target, ResponseEv.release()); diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index e1830b9f5b1..250299766d2 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -10,6 +10,7 @@ #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> +#include <ydb/core/base/wilson.h> #include <ydb/core/base/kikimr_issue.h> #include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/core/kqp/executer/kqp_tasks_graph.h> @@ -26,6 +27,7 @@ #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> @@ -81,11 +83,12 @@ template <class TDerived, EExecType ExecType> class TKqpExecuterBase : public TActorBootstrapped<TDerived> { public: TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, - TKqpRequestCounters::TPtr counters) + TKqpRequestCounters::TPtr counters, ui64 spanVerbosity = 0, TString spanName = "no_name") : Request(std::move(request)) , Database(database) , UserToken(userToken) , Counters(counters) + , ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName) { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(); ResponseEv->Orbit = std::move(Request.Orbit); @@ -104,6 +107,7 @@ public: LOG_T("Bootstrap done, become ReadyState"); this->Become(&TKqpExecuterBase::ReadyState); + ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterReadyState, ExecuterSpan.GetTraceId(), "ReadyState", NWilson::EFlags::AUTO_END); } void ReportEventElapsedTime() { @@ -166,6 +170,13 @@ protected: LOG_T("Got request, become WaitResolveState"); this->Become(&TDerived::WaitResolveState); + if (ExecuterStateSpan) { + ExecuterStateSpan.End(); + ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterWaitResolveState, ExecuterSpan.GetTraceId(), "WaitResolveState", NWilson::EFlags::AUTO_END); + } + + ExecuterTableResolveSpan = NWilson::TSpan(TWilsonKqp::ExecuterTableResolve, ExecuterStateSpan.GetTraceId(), "ExecuterTableResolve", NWilson::EFlags::AUTO_END); + StartResolveTime = now; if (Stats) { @@ -184,6 +195,11 @@ protected: InternalError(issues); } else if (statusCode == Ydb::StatusIds::TIMEOUT) { auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded"); + + if (ExecuterSpan) { + ExecuterSpan.EndError("timeout"); + } + this->Send(Target, abortEv.Release()); TerminateComputeActors(Ydb::StatusIds::TIMEOUT, "timeout"); @@ -199,10 +215,20 @@ protected: if (cancel) { auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::CANCELLED, "Request timeout exceeded"); + + if (ExecuterSpan) { + ExecuterSpan.EndError("timeout"); + } + this->Send(Target, abortEv.Release()); CancelAtActor = {}; } else { auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded"); + + if (ExecuterSpan) { + ExecuterSpan.EndError("timeout"); + } + this->Send(Target, abortEv.Release()); DeadlineActor = {}; } @@ -605,6 +631,10 @@ protected: LWTRACK(KqpBaseExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId); + if (ExecuterSpan) { + ExecuterSpan.EndError(response.DebugString()); + } + this->Send(Target, ResponseEv.release()); this->PassAway(); } @@ -638,6 +668,10 @@ protected: this->Send(this->SelfId(), new TEvents::TEvPoison); LOG_T("Terminate, become ZombieState"); this->Become(&TKqpExecuterBase::ZombieState); + if (ExecuterStateSpan) { + ExecuterStateSpan.End(); + ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterZombieState, ExecuterSpan.GetTraceId(), "ZombieState", NWilson::EFlags::AUTO_END); + } } else { IActor::PassAway(); } @@ -686,6 +720,9 @@ protected: TInstant LastResourceUsageUpdate; std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv; + NWilson::TSpan ExecuterSpan; + NWilson::TSpan ExecuterStateSpan; + NWilson::TSpan ExecuterTableResolveSpan; private: static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100); }; diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp index 5ca5367accd..0c663161f26 100644 --- a/ydb/core/kqp/executer/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp @@ -9,6 +9,8 @@ #include <ydb/core/kqp/prepare/kqp_query_plan.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/core/base/wilson.h> + namespace NKikimr { namespace NKqp { @@ -66,6 +68,7 @@ public: TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters) : Request(std::move(request)) , Counters(counters) + , LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter") { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(); ResponseEv->Orbit = std::move(Request.Orbit); @@ -109,6 +112,7 @@ private: } void Handle(TEvKqpExecuter::TEvTxRequest::TPtr& ev) { + NWilson::TSpan prepareTasksSpan(TWilsonKqp::LiteralExecuterPrepareTasks, LiteralExecuterSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END); if (Stats) { Stats->StartTs = TInstant::Now(); } @@ -171,6 +175,12 @@ private: } }); + if (prepareTasksSpan) { + prepareTasksSpan.EndOk(); + } + + NWilson::TSpan runTasksSpan(TWilsonKqp::LiteralExecuterRunTasks, LiteralExecuterSpan.GetTraceId(), "RunTasks", NWilson::EFlags::AUTO_END); + // task runner settings NMiniKQL::TKqpComputeContextBase computeCtx; TDqTaskRunnerContext context = CreateTaskRunnerContext(&computeCtx, &alloc, &typeEnv); @@ -190,6 +200,10 @@ private: } } + if (runTasksSpan) { + runTasksSpan.End(); + } + Finalize(context, holderFactory); PassAway(); } @@ -349,6 +363,10 @@ private: LWTRACK(KqpLiteralExecuterFinalize, ResponseEv->Orbit, TxId); + if (LiteralExecuterSpan) { + LiteralExecuterSpan.EndOk(); + } + LOG_D("Sending response to: " << Target << ", results: " << Results.size()); Send(Target, ResponseEv.release()); } @@ -413,6 +431,10 @@ private: LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId); + if (LiteralExecuterSpan) { + LiteralExecuterSpan.EndError(response.DebugString()); + } + Send(Target, ResponseEv.release()); PassAway(); } @@ -438,6 +460,7 @@ private: TVector<TIntrusivePtr<IDqTaskRunner>> TaskRunners; std::unordered_map<ui64, ui32> TaskId2StageId; std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv; + NWilson::TSpan LiteralExecuterSpan; }; } // anonymous namespace diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp index 313c8b0779b..a2258184358 100644 --- a/ydb/core/kqp/executer/kqp_planner.cpp +++ b/ydb/core/kqp/executer/kqp_planner.cpp @@ -4,6 +4,7 @@ #include "kqp_shards_resolver.h" #include <ydb/core/base/appdata.h> +#include <ydb/core/base/wilson.h> #include <ydb/core/kqp/rm/kqp_rm.h> #include <ydb/core/kqp/rm/kqp_resource_estimation.h> @@ -26,7 +27,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, - bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath) + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId) : TxId(txId) , ExecuterId(executer) , Tasks(std::move(tasks)) @@ -40,6 +41,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: , EnableLlvm(enableLlvm) , WithSpilling(withSpilling) , RlPath(rlPath) + , KqpPlannerSpan(TWilsonKqp::KqpPlanner, std::move(traceId), "KqpPlanner") { if (!Database) { // a piece of magic for tests @@ -135,6 +137,11 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot LOG_E("Not enough resources to execute query locally and no information about other nodes"); auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, "Not enough resources to execute query locally and no information about other nodes"); + + if (KqpPlannerSpan) { + KqpPlannerSpan.EndError("Not enough resources to execute query locally and no information about other nodes"); + } + Send(ExecuterId, ev.Release()); PassAway(); return; @@ -154,6 +161,8 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot auto plan = planner->Plan(snapshot, std::move(est)); + long requestsCnt = 0; + if (!plan.empty()) { for (auto& group : plan) { auto ev = PrepareKqpNodeRequest(group.TaskIds); @@ -161,7 +170,8 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot auto target = MakeKqpNodeServiceID(group.NodeId); TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), - CalcSendMessageFlagsForNode(target.NodeId()))); + CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId())); + ++requestsCnt; } TVector<ui64> nodes; @@ -177,15 +187,26 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot auto target = MakeKqpNodeServiceID(nodeId); LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId); TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), - CalcSendMessageFlagsForNode(target.NodeId()))); + CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId())); + ++requestsCnt; } Y_VERIFY(ScanTasks.empty()); } else { auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, "Not enough resources to execute query"); + + if (KqpPlannerSpan) { + KqpPlannerSpan.EndError("Not enough resources to execute query"); + } + Send(ExecuterId, ev.Release()); } + if (KqpPlannerSpan) { + KqpPlannerSpan.Attribute("RequestsCnt", requestsCnt); + KqpPlannerSpan.EndOk(); + } + PassAway(); } @@ -200,8 +221,8 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho auto target = MakeKqpNodeServiceID(SelfId().NodeId()); LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId); - TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery)); - + TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery, 0, nullptr, KqpPlannerSpan.GetTraceId())); + long requestsCnt = 1; TVector<ui64> nodes; for (const auto& pair: ScanTasks) { @@ -220,10 +241,16 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho AddScansToKqpNodeRequest(ev, nodeId); auto target = MakeKqpNodeServiceID(nodeId); TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), - CalcSendMessageFlagsForNode(target.NodeId()))); + CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId())); + ++requestsCnt; } Y_VERIFY(ScanTasks.size() == 0); + if (KqpPlannerSpan) { + KqpPlannerSpan.Attribute("requestsCnt", requestsCnt); + KqpPlannerSpan.EndOk(); + } + PassAway(); } @@ -352,10 +379,10 @@ IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& token, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, - bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath) + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId) { return new TKqpPlanner(txId, executer, std::move(tasks), std::move(scanTasks), snapshot, - database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath); + database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, std::move(traceId)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer/kqp_planner.h b/ydb/core/kqp/executer/kqp_planner.h index ae1cb852c13..5c5cfeabb69 100644 --- a/ydb/core/kqp/executer/kqp_planner.h +++ b/ydb/core/kqp/executer/kqp_planner.h @@ -6,6 +6,7 @@ #include <ydb/core/kqp/rm/kqp_rm.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> @@ -35,7 +36,7 @@ public: THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, - bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath); + bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId); void Bootstrap(const TActorContext& ctx); @@ -71,12 +72,13 @@ private: const bool WithSpilling; const TMaybe<NKikimrKqp::TRlPath> RlPath; THashSet<ui32> TrackingNodes; + NWilson::TSpan KqpPlannerSpan; }; IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, - bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath); + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId = {}); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 228b36899c3..da8e800f5e2 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -44,7 +44,7 @@ public: TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters) - : TBase(std::move(request), database, userToken, counters) + : TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter") { YQL_ENSURE(Request.Transactions.size() == 1); YQL_ENSURE(Request.Locks.empty()); @@ -101,6 +101,10 @@ public: } } + if (ExecuterTableResolveSpan) { + ExecuterTableResolveSpan.End(); + } + if (shardIds.size() > 0) { LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")"); auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds)); @@ -636,6 +640,8 @@ private: NMiniKQL::TScopedAlloc alloc(TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators()); NMiniKQL::TTypeEnvironment typeEnv(alloc); + NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END); + NMiniKQL::TMemoryUsageInfo memInfo("PrepareTasks"); NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo, &funcRegistry); @@ -812,6 +818,10 @@ private: return; } + if (prepareTasksSpan) { + prepareTasksSpan.End(); + } + LOG_D("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: true" << ", " << nScanTasks << " scan tasks on " << scanTasks.size() << " nodes" << ", totalShardScans: " << nShardScans << ", execType: Scan" @@ -820,6 +830,11 @@ private: ExecuteScanTx(std::move(computeTasks), std::move(scanTasks)); Become(&TKqpScanExecuter::ExecuteState); + if (ExecuterStateSpan) { + ExecuterStateSpan.End(); + ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ScanExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END); + } + } void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks) { @@ -838,7 +853,7 @@ private: auto planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks), std::move(scanTasks), Request.Snapshot, Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, - Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath); + Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan.GetTraceId()); RegisterWithSameMailbox(planner); } @@ -877,6 +892,10 @@ private: LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, Results.size()); + if (ExecuterSpan) { + ExecuterSpan.EndOk(); + } + LOG_D("Sending response to: " << Target); Send(Target, ResponseEv.release()); PassAway(); diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp index ee471926fb2..44f2a035f5e 100644 --- a/ydb/core/kqp/kqp_compile_actor.cpp +++ b/ydb/core/kqp/kqp_compile_actor.cpp @@ -3,6 +3,7 @@ #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> +#include <ydb/core/base/wilson.h> #include <ydb/core/client/minikql_compile/mkql_compile_service.h> #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/host/kqp_host.h> @@ -10,6 +11,7 @@ #include <ydb/library/yql/utils/actor_log/log.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/json/json_writer.h> #include <library/cpp/string_utils/base64/base64.h> @@ -46,7 +48,7 @@ public: TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken, - TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine) + TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId) : Owner(owner) , ModuleResolverState(moduleResolverState) , Counters(counters) @@ -57,6 +59,7 @@ public: , Config(MakeIntrusive<TKikimrConfiguration>()) , CompilationTimeout(TDuration::MilliSeconds(serviceConfig.GetCompileTimeoutMs())) , RecompileWithNewEngine(recompileWithNewEngine) + , CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor") { Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), Query.Cluster, kqpSettings->Settings, false); @@ -74,6 +77,10 @@ public: Counters->ReportCompileStart(DbCounters); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "traceId: verbosity = " + << std::to_string(CompileActorSpan.GetTraceId().GetVerbosity()) << ", trace_id = " + << std::to_string(CompileActorSpan.GetTraceId().GetTraceId())); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Start compilation" << ", self: " << ctx.SelfID << ", cluster: " << Query.Cluster @@ -215,6 +222,10 @@ private: Counters->ReportCompileFinish(DbCounters); + if (CompileActorSpan) { + CompileActorSpan.End(); + } + Die(ctx); } @@ -390,6 +401,8 @@ private: TIntrusivePtr<IKqpHost::IAsyncQueryResult> AsyncCompileResult; std::shared_ptr<TKqpCompileResult> KqpCompileResult; std::optional<TString> ReplayMessage; + + NWilson::TSpan CompileActorSpan; }; void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) { @@ -405,10 +418,10 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken, - TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine) + TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId) { return new TKqpCompileActor(owner, kqpSettings, serviceConfig, moduleResolverState, counters, uid, - std::move(query), userToken, dbCounters, recompileWithNewEngine); + std::move(query), userToken, dbCounters, recompileWithNewEngine, std::move(traceId)); } } // namespace NKqp diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp index 0d5f80ce409..a06a1a43c0d 100644 --- a/ydb/core/kqp/kqp_compile_request.cpp +++ b/ydb/core/kqp/kqp_compile_request.cpp @@ -4,10 +4,12 @@ #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> #include <ydb/core/kqp/common/kqp_lwtrace_probes.h> +#include <ydb/core/base/wilson.h> #include <util/string/escape.h> @@ -28,7 +30,8 @@ public: } TKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit) + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit, + NWilson::TTraceId traceId) : Owner(owner) , UserToken(userToken) , Uid(uid) @@ -36,7 +39,8 @@ public: , KeepInCache(keepInCache) , Deadline(deadline) , DbCounters(dbCounters) - , Orbit{std::move(orbit)} {} + , Orbit{std::move(orbit)} + , CompileRequestSpan(TWilsonKqp::CompileRequest, std::move(traceId), "CompileRequest") {} void Bootstrap(const TActorContext& ctx) { LWTRACK(KqpCompileRequestBootstrap, @@ -51,7 +55,7 @@ public: auto compileEv = MakeHolder<TEvKqp::TEvCompileRequest>(UserToken, Uid, std::move(query), KeepInCache, Deadline, DbCounters, std::move(Orbit)); - ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release()); + ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId()); Become(&TKqpCompileRequestActor::MainState); } @@ -66,12 +70,22 @@ public: const auto& stats = ev->Get()->Stats; if (compileResult->Status != Ydb::StatusIds::SUCCESS || !stats.GetFromCache()) { + + if (CompileRequestSpan) { + CompileRequestSpan.End(); + } + ctx.Send(Owner, ev->Release().Release()); Die(ctx); return; } if (!NavigateTables(*compileResult->PreparedQuery, compileResult->Query->Database, ctx)) { + + if (CompileRequestSpan) { + CompileRequestSpan.End(); + } + ctx.Send(Owner, ev->Release().Release()); Die(ctx); return; @@ -82,6 +96,11 @@ public: void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext &ctx) { if (ValidateTables(*ev->Get(), ctx)) { + + if (CompileRequestSpan) { + CompileRequestSpan.EndOk(); + } + ctx.Send(Owner, DeferredResponse.Release()); Die(ctx); return; @@ -95,7 +114,7 @@ public: auto recompileEv = MakeHolder<TEvKqp::TEvRecompileRequest>(UserToken, compileResult.Uid, compileResult.Query, Deadline, DbCounters, std::move(DeferredResponse->Orbit)); - ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release()); + ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId()); DeferredResponse.Reset(); } @@ -295,6 +314,11 @@ private: void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) { auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues), std::move(Orbit)); + + if (CompileRequestSpan) { + CompileRequestSpan.EndError(issues.ToOneLineString()); + } + ctx.Send(Owner, responseEv.Release()); Die(ctx); } @@ -312,13 +336,24 @@ private: THolder<TEvKqp::TEvCompileResponse> DeferredResponse; NLWTrace::TOrbit Orbit; + NWilson::TSpan CompileRequestSpan; }; IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit) + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit, + NWilson::TTraceId traceId) { - return new TKqpCompileRequestActor(owner, userToken, uid, std::move(query), keepInCache, deadline, dbCounters, std::move(orbit)); + return new TKqpCompileRequestActor( + owner, + userToken, + uid, + std::move(query), + keepInCache, + deadline, + dbCounters, + std::move(orbit), + std::move(traceId)); } } // namespace NKqp diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp index 2c1ee8e2f78..efba6222ffd 100644 --- a/ydb/core/kqp/kqp_compile_service.cpp +++ b/ydb/core/kqp/kqp_compile_service.cpp @@ -4,6 +4,7 @@ #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> +#include <ydb/core/base/wilson.h> #include <ydb/core/cms/console/console.h> #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/kqp/counters/kqp_counters.h> @@ -11,6 +12,7 @@ #include <ydb/library/aclib/aclib.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/cache/cache.h> @@ -187,7 +189,8 @@ private: struct TKqpCompileRequest { TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache, - const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}) + const TString& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, + NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}) : Sender(sender) , Query(std::move(query)) , Uid(uid) @@ -195,7 +198,8 @@ struct TKqpCompileRequest { , UserToken(userToken) , Deadline(deadline) , DbCounters(dbCounters) - , Orbit(std::move(orbit)) {} + , Orbit(std::move(orbit)) + , CompileServiceSpan(std::move(span)) {} TActorId Sender; TKqpQueryId Query; @@ -207,6 +211,7 @@ struct TKqpCompileRequest { TActorId CompileActor; NLWTrace::TOrbit Orbit; + NWilson::TSpan CompileServiceSpan; }; class TKqpRequestsQueue { @@ -415,13 +420,17 @@ private: } catch (const std::exception& e) { LogException("TEvCompileRequest", ev->Sender, e, ctx); - ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit)); + ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {}); } } void PerformRequest(TEvKqp::TEvCompileRequest::TPtr& ev, const TActorContext& ctx) { auto& request = *ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Perform request, TraceId.SpanIdPtr: " << ev->TraceId.GetSpanIdPtr()); + + NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, std::move(ev->TraceId), "CompileService"); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Received compile request" << ", sender: " << ev->Sender << ", queryUid: " << (request.Uid ? *request.Uid : "<empty>") @@ -447,8 +456,7 @@ private: << ", sender: " << ev->Sender << ", queryUid: " << *request.Uid); - - ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit)); + ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); return; } else { LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" @@ -466,7 +474,7 @@ private: << ", queryUid: " << *request.Uid); NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid); - ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit)); + ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); return; } @@ -489,7 +497,7 @@ private: << ", sender: " << ev->Sender << ", queryUid: " << compileResult->Uid); - ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit)); + ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); return; } @@ -501,7 +509,8 @@ private: TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), - request.KeepInCache, request.UserToken, request.Deadline, dbCounters, std::move(ev->Get()->Orbit)); + request.KeepInCache, request.UserToken, request.Deadline, dbCounters, + std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); @@ -512,7 +521,7 @@ private: NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Exceeded maximum number of requests in compile service queue."); - ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit)); + ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); return; } @@ -529,7 +538,7 @@ private: } catch (const std::exception& e) { LogException("TEvRecompileRequest", ev->Sender, e, ctx); - ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit)); + ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {}); } } @@ -546,8 +555,12 @@ private: if (compileResult || request.Query) { Counters->ReportCompileRequestCompile(dbCounters); + NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); + TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, - true, request.UserToken, request.Deadline, dbCounters, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit()); + true, request.UserToken, request.Deadline, dbCounters, + ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), + std::move(CompileServiceSpan)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); @@ -558,7 +571,7 @@ private: NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Exceeded maximum number of requests in compile service queue."); - ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(ev->Get()->Orbit)); + ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); return; } } else { @@ -567,7 +580,10 @@ private: << ", queryUid: " << request.Uid); NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << request.Uid); - ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit)); + + NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); + + ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan)); return; } @@ -611,7 +627,7 @@ private: auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query); for (auto& request : requests) { LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString()); - Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit)); + Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit), std::move(request.CompileServiceSpan)); } } else { if (QueryCache.FindByUid(compileResult->Uid, false)) { @@ -620,11 +636,11 @@ private: } LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString()); - Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit)); + Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); } catch (const std::exception& e) { LogException("TEvCompileResponse", ev->Sender, e, ctx); - ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit)); + ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); } ProcessQueue(ctx); @@ -682,7 +698,7 @@ private: Counters->ReportCompileRequestTimeout(request->DbCounters); NYql::TIssue issue(NYql::TPosition(), "Compilation timed out."); - ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit)); + ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit), std::move(request->CompileServiceSpan)); } else { StartCompilation(std::move(*request), ctx); } @@ -695,7 +711,7 @@ private: bool recompileWithNewEngine = Config.GetForceNewEnginePercent() > 0; auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, ModuleResolverState, Counters, - request.Uid, request.Query, request.UserToken, request.DbCounters, recompileWithNewEngine); + request.Uid, request.Query, request.UserToken, request.DbCounters, recompileWithNewEngine, request.CompileServiceSpan.GetTraceId()); auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap, AppData(ctx)->UserPoolId); @@ -714,7 +730,7 @@ private: } void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult, - const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit) + const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) { const auto& query = compileResult->Query; LWTRACK(KqpCompileServiceReply, @@ -735,34 +751,38 @@ private: responseEv->ForceNewEngineLevel = Config.GetForceNewEngineLevel(); } + if (span) { + span.End(); + } + ctx.Send(sender, responseEv.Release()); } void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult, - const TActorContext& ctx, NLWTrace::TOrbit orbit) + const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) { NKqpProto::TKqpStatsCompile stats; stats.SetFromCache(true); LWTRACK(KqpCompileServiceReplyFromCache, orbit); - Reply(sender, compileResult, stats, ctx, std::move(orbit)); + Reply(sender, compileResult, stats, ctx, std::move(orbit), std::move(span)); } void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status, - const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit) + const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReplyError, orbit); - Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit)); + Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span)); } void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message, - const TActorContext& ctx, NLWTrace::TOrbit orbit) + const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span) { NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation."); issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message)); LWTRACK(KqpCompileServiceReplyInternalError, orbit); - ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit)); + ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit), std::move(span)); } static void LogException(const TString& scope, const TActorId& sender, const std::exception& e, diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h index 8adae6b1d8d..0a34d68f845 100644 --- a/ydb/core/kqp/kqp_impl.h +++ b/ydb/core/kqp/kqp_impl.h @@ -52,10 +52,11 @@ IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& servic IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const NKikimrConfig::TTableServiceConfig& serviceConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TString& userToken, - TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine); + TKqpDbCountersPtr dbCounters, bool recompileWithNewEngine, NWilson::TTraceId traceId = {}); IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TString& userToken, const TMaybe<TString>& uid, - TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}); + TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, + NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {}); struct TKqpWorkerSettings { TString Cluster; diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 18cebe401fd..cfb8f81be38 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -16,6 +16,7 @@ #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/cputime.h> +#include <ydb/core/base/wilson.h> #include <ydb/core/protos/kqp.pb.h> #include <ydb/core/sys_view/service/sysview_service.h> #include <ydb/core/tx/tx_proxy/proxy.h> @@ -29,6 +30,9 @@ #include <util/string/printf.h> #include <util/string/escape.h> +#include <library/cpp/actors/wilson/wilson_span.h> +#include <library/cpp/actors/wilson/wilson_trace.h> + LWTRACE_USING(KQP_PROVIDER); namespace NKikimr { @@ -83,6 +87,7 @@ struct TKqpQueryState { TString UserToken; NLWTrace::TOrbit Orbit; + NWilson::TSpan KqpSessionSpan; TString TxId; // User tx bool Commit = false; @@ -312,6 +317,10 @@ public: YQL_ENSURE(queryRequest.HasAction()); auto action = queryRequest.GetAction(); + auto id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>()); + LOG_I("Wilson Tracing started, id: " + std::to_string(id.GetTraceId())); + QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); + LWTRACK(KqpSessionQueryRequest, QueryState->Orbit, queryRequest.GetDatabase(), @@ -426,7 +435,8 @@ public: auto compileRequestActor = CreateKqpCompileRequestActor(SelfId(), QueryState->UserToken, uid, std::move(query), keepInCache, compileDeadline, Settings.DbCounters, - QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit()); + QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit(), + QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId()); TlsActivationContext->ExecutorThread.RegisterActor(compileRequestActor); @@ -1001,6 +1011,9 @@ public: if (QueryState) { request.Orbit = std::move(QueryState->Orbit); } + request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId(); + LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); + auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, (QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(), RequestCounters); @@ -1488,8 +1501,14 @@ public: } if (status == Ydb::StatusIds::SUCCESS) { + if (QueryState && QueryState->KqpSessionSpan) { + QueryState->KqpSessionSpan.EndOk(); + } LWTRACK(KqpSessionReplySuccess, QueryState->Orbit, record.GetArena() ? record.GetArena()->SpaceUsed() : 0); } else { + if (QueryState && QueryState->KqpSessionSpan) { + QueryState->KqpSessionSpan.EndError(response.DebugString()); + } LWTRACK(KqpSessionReplyError, QueryState->Orbit, TStringBuilder() << status); } Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId); @@ -1947,6 +1966,7 @@ private: TActorId IdleTimerActorId; ui32 IdleTimerId = 0; std::optional<TSessionShutdownState> ShutdownState; + }; } // namespace diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp index 632b36b924a..34c23cd0a4c 100644 --- a/ydb/core/kqp/node/kqp_node.cpp +++ b/ydb/core/kqp/node/kqp_node.cpp @@ -14,10 +14,13 @@ #include <ydb/core/kqp/common/kqp_resolve.h> #include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h> +#include <ydb/core/base/wilson.h> + #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <util/string/join.h> @@ -117,6 +120,8 @@ private: } void HandleWork(TEvKqpNode::TEvStartKqpTasksRequest::TPtr& ev) { + NWilson::TSpan sendTasksSpan(TWilsonKqp::KqpNodeSendTasks, NWilson::TTraceId(ev->TraceId), "KqpNode.SendTasks", NWilson::EFlags::AUTO_END); + auto& msg = ev->Get()->Record; auto requester = ev->Sender; @@ -314,12 +319,12 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters); + CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateAsyncIoFactory(), - nullptr, runtimeSettings, memoryLimits); + nullptr, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { computeActor = CaFactory->CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp index 7dfb1a294e6..94ff3791e6d 100644 --- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp +++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp @@ -3,6 +3,7 @@ #include "operation.h" #include <ydb/core/util/pb.h> +#include <ydb/core/base/wilson.h> namespace NKikimr { namespace NDataShard { @@ -18,6 +19,7 @@ TDataShard::TTxProposeTransactionBase::TTxProposeTransactionBase(TDataShard *sel , Kind(static_cast<EOperationKind>(Ev->Get()->GetTxKind())) , TxId(Ev->Get()->GetTxId()) , Acked(!delayed) + , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END) { } @@ -66,6 +68,10 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa TActorId target = Op ? Op->GetTarget() : Ev->Get()->GetSource(); ui64 cookie = Op ? Op->GetCookie() : Ev->Cookie; + + if (ProposeTransactionSpan) { + ProposeTransactionSpan.EndOk(); + } ctx.Send(target, result.Release(), 0, cookie); return true; @@ -84,6 +90,10 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa // Unsuccessful operation parse. if (op->IsAborted()) { Y_VERIFY(op->Result()); + + if (ProposeTransactionSpan) { + ProposeTransactionSpan.EndError("Unsuccessful operation parse"); + } ctx.Send(op->GetTarget(), op->Result().Release()); return true; } @@ -158,6 +168,10 @@ void TDataShard::TTxProposeTransactionBase::Complete(const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "TTxProposeTransactionBase::Complete at " << Self->TabletID()); + if (ProposeTransactionSpan) { + ProposeTransactionSpan.End(); + } + if (Op) { Y_VERIFY(!Op->GetExecutionPlan().empty()); if (!CompleteList.empty()) { diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index a73e24b1991..9447bf36e87 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -4,6 +4,8 @@ #include "datashard_impl.h" #include "execution_unit_kind.h" +#include <library/cpp/actors/wilson/wilson_span.h> + namespace NKikimr { namespace NDataShard { @@ -106,6 +108,7 @@ protected: TInstant CommitStart; bool Acked; bool Rescheduled = false; + NWilson::TSpan ProposeTransactionSpan; }; class TDataShard::TTxReadSet : public NTabletFlatExecutor::TTransactionBase<TDataShard> { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index c71f512809c..1925de10c38 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -8,6 +8,7 @@ #include "dq_compute_memory_quota.h" #include <ydb/core/base/kikimr_issue.h> +#include <ydb/core/base/wilson.h> #include <ydb/core/protos/services.pb.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> @@ -20,6 +21,7 @@ #include <ydb/library/yql/dq/actors/dq.h> #include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <util/generic/size_literals.h> #include <util/string/join.h> @@ -169,7 +171,8 @@ protected: const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false, - const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr) + const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr, + NWilson::TTraceId traceId = {}) : ExecuterId(executerId) , TxId(txId) , Task(std::move(task)) @@ -183,6 +186,7 @@ protected: , MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr) , Running(!Task.GetCreateSuspended()) , PassExceptions(passExceptions) + , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") { if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { BasicStats = std::make_unique<TBasicStats>(); @@ -195,7 +199,8 @@ protected: IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr) + const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr, + NWilson::TTraceId traceId = {}) : ExecuterId(executerId) , TxId(txId) , Task(task) @@ -207,6 +212,7 @@ protected: , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , MemoryQuota(InitMemoryQuota()) , Running(!Task.GetCreateSuspended()) + , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") { if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { BasicStats = std::make_unique<TBasicStats>(); @@ -571,6 +577,10 @@ protected: } IssuesToMessage(issues, record.MutableIssues()); + if (ComputeActorSpan) { + ComputeActorSpan.End(); + } + this->Send(ExecuterId, execEv.Release()); if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { @@ -1023,6 +1033,14 @@ protected: << "Timeout event from compute actor " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId()); + if (ComputeActorSpan) { + ComputeActorSpan.EndError( + TStringBuilder() + << "Timeout event from compute actor " << this->SelfId() + << ", TxId: " << TxId << ", task: " << Task.GetId() + ); + } + this->Send(ExecuterId, abortEv.Release()); TerminateSources("timeout exceeded", false); @@ -1115,6 +1133,11 @@ protected: this->TerminateSources(issues, success); if (ev->Sender != ExecuterId) { + + if (ComputeActorSpan) { + ComputeActorSpan.End(); + } + NActors::TActivationContext::Send(ev->Forward(ExecuterId)); } @@ -1777,6 +1800,10 @@ protected: CA_LOG_D("Send stats to executor actor " << ExecuterId << " TaskId: " << Task.GetId() << " Stats: " << dbgPrintStats()); + if (ComputeActorSpan) { + ComputeActorSpan.End(); + } + this->Send(ExecuterId, evState.release(), NActors::IEventHandle::FlagTrackDelivery); LastSendStatsTime = now; @@ -1825,6 +1852,7 @@ private: bool Running = true; TInstant LastSendStatsTime; bool PassExceptions = false; + NWilson::TSpan ComputeActorSpan; protected: bool MonCountersProvided = false; ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage; |