aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-21 10:36:29 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-21 10:36:29 +0300
commit510ee3d03d00a5081280a750f14a220c18a91807 (patch)
treea813a5fb5a5876028da81cd164d296db0e085619
parentf559e0402b83a2857f0c911f5256451d5b25031d (diff)
downloadydb-510ee3d03d00a5081280a750f14a220c18a91807.tar.gz
span with profile
-rw-r--r--library/cpp/actors/core/log.h10
-rw-r--r--library/cpp/actors/wilson/CMakeLists.txt3
-rw-r--r--library/cpp/actors/wilson/wilson_event.cpp4
-rw-r--r--library/cpp/actors/wilson/wilson_profile_span.cpp162
-rw-r--r--library/cpp/actors/wilson/wilson_profile_span.h70
-rw-r--r--library/cpp/actors/wilson/wilson_trace.cpp4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp32
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp13
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h6
11 files changed, 298 insertions, 14 deletions
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index 60469f5193..e7cb9392a3 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -32,6 +32,16 @@
0ull) \
)
+#define IS_EMERG_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_EMERG, component)
+#define IS_ALERT_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_ALERT, component)
+#define IS_CRIT_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_CRIT, component)
+#define IS_ERROR_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_ERROR, component)
+#define IS_WARN_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_WARN, component)
+#define IS_NOTICE_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_NOTICE, component)
+#define IS_INFO_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_INFO, component)
+#define IS_DEBUG_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_DEBUG, component)
+#define IS_TRACE_LOG_ENABLED(component) IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NActors::NLog::PRI_TRACE, component)
+
#define LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, ...) \
do { \
::NActors::NLog::TSettings* mSettings = static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings()); \
diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt
index 74661ec192..91f380ee05 100644
--- a/library/cpp/actors/wilson/CMakeLists.txt
+++ b/library/cpp/actors/wilson/CMakeLists.txt
@@ -17,6 +17,9 @@ target_link_libraries(cpp-actors-wilson PUBLIC
actors-wilson-protos
)
target_sources(cpp-actors-wilson PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_event.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_span.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_profile_span.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_trace.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_uploader.cpp
)
diff --git a/library/cpp/actors/wilson/wilson_event.cpp b/library/cpp/actors/wilson/wilson_event.cpp
new file mode 100644
index 0000000000..ad51550d91
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_event.cpp
@@ -0,0 +1,4 @@
+#include "wilson_event.h"
+
+namespace NWilson {
+}
diff --git a/library/cpp/actors/wilson/wilson_profile_span.cpp b/library/cpp/actors/wilson/wilson_profile_span.cpp
new file mode 100644
index 0000000000..1748b28068
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_profile_span.cpp
@@ -0,0 +1,162 @@
+#include "wilson_profile_span.h"
+#include <library/cpp/json/writer/json.h>
+
+namespace NWilson {
+
+void TProfileSpan::AddMax(const TString& eventId, const TString& /*info*/) {
+ if (!Enabled) {
+ return;
+ }
+ auto it = PairInstances.find(eventId);
+ if (it == PairInstances.end()) {
+ PairInstances.emplace(eventId, TMinMaxPair::BuildMax(Now()));
+ } else {
+ it->second.AddMax(Now());
+ }
+}
+
+void TProfileSpan::AddMin(const TString& eventId, const TString& /*info*/) {
+ if (!Enabled) {
+ return;
+ }
+ auto it = PairInstances.find(eventId);
+ if (it == PairInstances.end()) {
+ PairInstances.emplace(eventId, TMinMaxPair::BuildMin(Now()));
+ } else {
+ it->second.AddMin(Now());
+ }
+}
+
+TString TProfileSpan::ProfileToString() const {
+ if (!Enabled) {
+ return "DISABLED";
+ }
+ TStringBuilder sb;
+ sb << "----FullDuration = " << Now() - StartTime << ";";
+ sb << "----Durations:";
+ FlushNoGuards();
+ {
+ NJsonWriter::TBuf sout;
+ ResultTimes.InsertValue("--current_guards_count", CurrentJsonPath.size());
+ ResultTimes.InsertValue("--duration", (Now() - StartTime).MicroSeconds() * 0.000001);
+ sout.WriteJsonValue(&ResultTimes, true, EFloatToStringMode::PREC_POINT_DIGITS, 6);
+ sb << sout.Str();
+ }
+ sb << ";";
+ sb << "----Pairs:{";
+ for (auto&& i : PairInstances) {
+ sb << i.first << ":" << i.second.ToString() << ";";
+ }
+ sb << "}";
+ return sb;
+}
+
+void TProfileSpan::FlushNoGuards() const {
+ if (!Enabled) {
+ return;
+ }
+ if (CurrentJsonPath.empty()) {
+ NJson::TJsonValue* currentNodeOutside;
+ if (!ResultTimes.GetValuePointer("--outside_duration", &currentNodeOutside)) {
+ currentNodeOutside = &ResultTimes.InsertValue("--outside_duration", 0);
+ currentNodeOutside->SetType(NJson::JSON_DOUBLE);
+ }
+ currentNodeOutside->SetValue(currentNodeOutside->GetDoubleRobust() + (Now() - LastNoGuards).MicroSeconds() * 0.000001);
+ LastNoGuards = Now();
+ }
+}
+
+NWilson::TProfileSpan::TMinMaxPair TProfileSpan::TMinMaxPair::BuildMin(const TInstant value) {
+ TMinMaxPair result;
+ result.MinMinInstance = value;
+ result.MaxMinInstance = value;
+ return result;
+}
+
+NWilson::TProfileSpan::TMinMaxPair TProfileSpan::TMinMaxPair::BuildMax(const TInstant value) {
+ TMinMaxPair result;
+ result.MaxInstance = value;
+ return result;
+}
+
+void TProfileSpan::TMinMaxPair::AddMax(const TInstant instance) {
+ if (!MaxInstance) {
+ MaxInstance = instance;
+ } else {
+ MaxInstance = Max(*MaxInstance, instance);
+ }
+}
+
+void TProfileSpan::TMinMaxPair::AddMin(const TInstant instance) {
+ if (!MinMinInstance) {
+ MinMinInstance = instance;
+ } else {
+ MinMinInstance = Min(*MinMinInstance, instance);
+ }
+ if (!MaxMinInstance) {
+ MaxMinInstance = instance;
+ } else {
+ MaxMinInstance = Max(*MaxMinInstance, instance);
+ }
+}
+
+TString TProfileSpan::TMinMaxPair::ToString() const {
+ TStringBuilder sb;
+ sb << "[";
+ if (MinMinInstance) {
+ sb << MinMinInstance->MicroSeconds();
+ } else {
+ sb << "UNDEFINED";
+ }
+ sb << "-";
+ if (MaxMinInstance) {
+ sb << MaxMinInstance->MicroSeconds();
+ } else {
+ sb << "UNDEFINED";
+ }
+ sb << ",";
+ if (MaxInstance) {
+ sb << MaxInstance->MicroSeconds();
+ } else {
+ sb << "UNDEFINED";
+ }
+ if (MaxInstance && MinMinInstance) {
+ sb << ",";
+ sb << *MaxInstance - *MaxMinInstance << "-" << *MaxInstance - *MinMinInstance;
+ }
+ sb << "]";
+ return sb;
+}
+
+TProfileSpan::TGuard::~TGuard() {
+ if (!Owner.Enabled) {
+ return;
+ }
+ Y_VERIFY(CurrentNodeDuration->IsDouble());
+ CurrentNodeDuration->SetValue((Now() - Start).MicroSeconds() * 0.000001 + CurrentNodeDuration->GetDoubleRobust());
+ Y_VERIFY(Owner.CurrentJsonPath.size());
+ Owner.CurrentJsonPath.pop_back();
+ if (Owner.CurrentJsonPath.empty()) {
+ Owner.LastNoGuards = Now();
+ }
+}
+
+TProfileSpan::TGuard::TGuard(const TString& event, TProfileSpan& owner, const TString& /*info*/)
+ : Owner(owner) {
+ if (!Owner.Enabled) {
+ return;
+ }
+ Owner.FlushNoGuards();
+ NJson::TJsonValue* currentNode = Owner.CurrentJsonPath.empty() ? &Owner.ResultTimes : Owner.CurrentJsonPath.back();
+ NJson::TJsonValue* currentNodeParent;
+ if (!currentNode->GetValuePointer(event, &currentNodeParent)) {
+ currentNodeParent = &currentNode->InsertValue(event, NJson::JSON_MAP);
+ }
+ Owner.CurrentJsonPath.emplace_back(currentNodeParent);
+ if (!currentNodeParent->GetValuePointer("--duration", &CurrentNodeDuration)) {
+ CurrentNodeDuration = &currentNodeParent->InsertValue("--duration", 0);
+ CurrentNodeDuration->SetType(NJson::JSON_DOUBLE);
+ }
+}
+
+} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_profile_span.h b/library/cpp/actors/wilson/wilson_profile_span.h
new file mode 100644
index 0000000000..20bde7857c
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_profile_span.h
@@ -0,0 +1,70 @@
+#pragma once
+#include "wilson_span.h"
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NWilson {
+
+class TProfileSpan: public TSpan {
+private:
+ using TBase = TSpan;
+ class TMinMaxPair {
+ private:
+ std::optional<TInstant> MinMinInstance;
+ std::optional<TInstant> MaxMinInstance;
+ std::optional<TInstant> MaxInstance;
+ public:
+ static TMinMaxPair BuildMin(const TInstant value);
+ static TMinMaxPair BuildMax(const TInstant value);
+ void AddMax(const TInstant instance);
+ void AddMin(const TInstant instance);
+ TString ToString() const;
+ };
+ mutable NJson::TJsonValue ResultTimes;
+ std::map<TString, TMinMaxPair> PairInstances;
+ std::vector<NJson::TJsonValue*> CurrentJsonPath;
+ mutable TInstant LastNoGuards = Now();
+ const TInstant StartTime = Now();
+ bool Enabled = true;
+
+ void FlushNoGuards() const;
+ TProfileSpan() = default;
+public:
+ using TBase::TBase;
+ TString ProfileToString() const;
+
+ TProfileSpan& SetEnabled(const bool value) {
+ Enabled = value;
+ return *this;
+ }
+
+ class TGuard {
+ private:
+ TProfileSpan& Owner;
+ const TInstant Start = Now();
+ NJson::TJsonValue* CurrentNodeDuration;
+ public:
+ TGuard(const TString& event, TProfileSpan& owner, const TString& info);
+ ~TGuard();
+ };
+
+ template <class TEventId, class T = TString>
+ TGuard StartStackTimeGuard(const TEventId event, const T& info = Default<T>()) {
+ return TGuard(::ToString(event), *this, ::ToString(info));
+ }
+
+ template <class TEventId, class T = TString>
+ void AddMin(const TEventId event, const T& info = Default<T>()) {
+ AddMin(::ToString(event), ::ToString(info));
+ }
+
+ template <class TEventId, class T = TString>
+ void AddMax(const TEventId event, const T& info = Default<T>()) {
+ AddMax(::ToString(event), ::ToString(info));
+ }
+
+ void AddMin(const TString& eventId, const TString& info);
+ void AddMax(const TString& eventId, const TString& info);
+
+};
+
+} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_trace.cpp b/library/cpp/actors/wilson/wilson_trace.cpp
new file mode 100644
index 0000000000..73bed31da3
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_trace.cpp
@@ -0,0 +1,4 @@
+#include "wilson_trace.h"
+
+namespace NWilson {
+}
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index e69f3f66c0..466953906d 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -49,7 +49,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons
NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
- const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {});
+ const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index ed720761fe..76c8aa87fb 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/kqp/runtime/kqp_channel_storage.h>
#include <ydb/core/kqp/runtime/kqp_tasks_runner.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
@@ -19,6 +20,7 @@
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/wilson/wilson_profile_span.h>
#include <util/generic/deque.h>
@@ -122,8 +124,10 @@ public:
, Snapshot(snapshot)
, ShardsScanningPolicy(shardsScanningPolicy)
, Counters(counters)
- , InFlightShards(ShardsScanningPolicy)
+ , KqpComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, TBase::ComputeActorSpan.GetTraceId(), "KqpScanActor", NWilson::EFlags::AUTO_END)
+ , InFlightShards(ShardsScanningPolicy, KqpComputeActorSpan)
{
+ KqpComputeActorSpan.SetEnabled(IS_DEBUG_LOG_ENABLED(NKikimrServices::KQP_COMPUTE));
YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString());
YQL_ENSURE(!Meta.GetReads().empty());
YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
@@ -191,8 +195,8 @@ public:
STFUNC(StateFunc) {
try {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqpCompute::TEvCostData, HandleExecute)
- hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
+ hFunc(TEvKqpCompute::TEvCostData, HandleExecute);
+ hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
hFunc(TEvKqpCompute::TEvScanData, HandleExecute);
hFunc(TEvKqpCompute::TEvScanError, HandleExecute);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute);
@@ -353,6 +357,7 @@ private:
}
void StartCostsRequest(TShardCostsState::TPtr state) {
+ KqpComputeActorSpan.AddMin("Costs");
TSmallVec<TSerializedTableRange> serializedTableRanges = TShardCostsState::BuildSerializedTableRanges(state->GetReadData());
THolder<TEvDataShard::TEvKqpScan> ev = BuildEvKqpScan(state->GetScanId(), 0, serializedTableRanges);
ev->Record.SetCostDataOnly(true);
@@ -361,6 +366,7 @@ private:
}
void HandleExecute(TEvKqpCompute::TEvCostData::TPtr& ev) {
+ KqpComputeActorSpan.AddMax("Costs");
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* read = nullptr;
TSmallVec<TSerializedTableRange> ranges;
Y_VERIFY(InFlightShards.ProcessCostReply(ev, read, ranges));
@@ -440,6 +446,8 @@ private:
}
void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) {
+ auto gTime = KqpComputeActorSpan.StartStackTimeGuard("ProcessPendingScanDataItem");
+
auto& msg = *ev->Get();
auto state = GetShardState(msg, ev->Sender);
@@ -497,10 +505,7 @@ private:
Stats.CompleteShard(state);
StopReadChunk(*state);
CA_LOG_T("TRACE:" << InFlightShards.TraceToString() << ":" << CalculateFreeSpace());
-
- if (!StartTableScan()) {
- stopFinally = true;
- }
+ stopFinally = !StartTableScan();
}
if (stopFinally) {
ScanData->Finish();
@@ -508,6 +513,7 @@ private:
CA_LOG_D("EVLOGKQP(scans_count:" << ScansCounter << ";max_in_flight:" << MaxInFlight << ")"
<< Endl << InFlightShards.GetDurationStats()
<< Endl << InFlightShards.StatisticsToString()
+ << KqpComputeActorSpan.ProfileToString()
);
if (ScanData->BasicStats) {
ScanData->BasicStats->AffectedShards = InFlightShards.GetAffectedShards().size();
@@ -542,7 +548,10 @@ private:
if (state->State == EShardState::Running || state->State == EShardState::PostRunning) {
ProcessPendingScanDataItem(ev, enqueuedAt);
- DoExecute();
+ {
+ auto gTime = KqpComputeActorSpan.StartStackTimeGuard("DoExecute");
+ DoExecute();
+ }
} else {
TerminateExpiredScan(ev->Sender, "Cancel expired scan");
}
@@ -964,7 +973,10 @@ private:
void EnqueueResolveShard(TShardState::TPtr state) {
CA_LOG_D("Enqueue for resolve " << state->TabletId << " chunk " << state->ScannerIdx);
YQL_ENSURE(StopReadChunk(*state));
- DoExecute();
+ {
+ auto gTime = KqpComputeActorSpan.StartStackTimeGuard("DoExecute");
+ DoExecute();
+ }
PendingResolveShards.emplace_back(*state);
if (PendingResolveShards.size() == 1) {
ResolveNextShard();
@@ -972,6 +984,7 @@ private:
}
bool StopReadChunk(const TShardState& state) {
+ auto gTime = KqpComputeActorSpan.StartStackTimeGuard("StopReadChunk");
CA_LOG_D("Unlink from tablet " << state.TabletId << " chunk " << state.ScannerIdx << " and stop reading from it.");
const ui64 tabletId = state.TabletId;
const ui32 scannerIdx = state.ScannerIdx;
@@ -1167,6 +1180,7 @@ private:
std::deque<TShardState> PendingShards;
std::deque<TShardState> PendingResolveShards;
+ NWilson::TProfileSpan KqpComputeActorSpan;
TInFlightShards InFlightShards;
ui32 ScansCounter = 0;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
index 02065e56d5..5a6cb6efe2 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
@@ -1,4 +1,5 @@
#include "kqp_scan_compute_manager.h"
+#include <ydb/core/base/wilson.h>
#include <util/string/builder.h>
namespace NKikimr::NKqp::NComputeActor {
@@ -93,6 +94,10 @@ void TInFlightShards::ClearAll() {
AllocatedGenerations.clear();
StatesByIndex.clear();
NeedAckStates.clear();
+ if (CostsDataSpan) {
+ CostsDataSpan->End();
+ CostsDataSpan.Destroy();
+ }
}
bool TInFlightShards::ProcessCostReply(TEvKqpCompute::TEvCostData::TPtr ev, const TShardCostsState::TReadData*& readData, TSmallVec<TSerializedTableRange>& result) {
@@ -106,10 +111,18 @@ bool TInFlightShards::ProcessCostReply(TEvKqpCompute::TEvCostData::TPtr ev, cons
}
CostRequestsByShardId.erase(it->second->GetShardId());
CostRequestsByScanId.erase(it);
+ if (CostRequestsByScanId.empty()) {
+ Y_VERIFY(CostsDataSpan);
+ CostsDataSpan->End();
+ CostsDataSpan.Destroy();
+ }
return true;
}
TShardCostsState::TPtr TInFlightShards::PrepareCostRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) {
+ if (!CostsDataSpan) {
+ CostsDataSpan = MakeHolder<NWilson::TSpan>(NKikimr::TWilsonKqp::ComputeActor, KqpProfileSpan.GetTraceId(), "Costs");
+ }
const ui32 scanId = CostRequestsByScanId.size() + 1;
auto costsState = std::make_shared<TShardCostsState>(scanId, &read);
Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second);
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index c1741fd0ec..14dd5a02a7 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -2,6 +2,7 @@
#include "kqp_compute_actor.h"
#include "kqp_compute_state.h"
#include "kqp_scan_compute_stat.h"
+#include <library/cpp/actors/wilson/wilson_profile_span.h>
namespace NKikimr::NKqp::NComputeActor {
@@ -20,9 +21,12 @@ private:
std::map<ui64, TShardCostsState::TPtr> CostRequestsByShardId;
const TShardsScanningPolicy& ScanningPolicy;
bool IsActiveFlag = true;
+ NWilson::TProfileSpan& KqpProfileSpan;
+ THolder<NWilson::TSpan> CostsDataSpan;
public:
- TInFlightShards(const TShardsScanningPolicy& scanningPolicy)
+ TInFlightShards(const TShardsScanningPolicy& scanningPolicy, NWilson::TProfileSpan& kqpProfileSpan)
: ScanningPolicy(scanningPolicy)
+ , KqpProfileSpan(kqpProfileSpan)
{
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index eab5bcf8a0..d4c5702b67 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -190,9 +190,9 @@ protected:
, MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr)
, WatermarksTracker(this->SelfId(), TxId, Task.GetId())
, DqComputeActorMetrics(taskCounters)
+ , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
, Running(!Task.GetCreateSuspended())
, PassExceptions(passExceptions)
- , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
{
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
@@ -220,8 +220,8 @@ protected:
, MemoryQuota(InitMemoryQuota())
, WatermarksTracker(this->SelfId(), TxId, Task.GetId())
, DqComputeActorMetrics(taskCounters)
- , Running(!Task.GetCreateSuspended())
, ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
+ , Running(!Task.GetCreateSuspended())
{
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
@@ -2003,11 +2003,11 @@ protected:
THolder<TDqMemoryQuota> MemoryQuota;
TDqComputeActorWatermarks WatermarksTracker;
TDqComputeActorMetrics DqComputeActorMetrics;
+ NWilson::TSpan ComputeActorSpan;
private:
bool Running = true;
TInstant LastSendStatsTime;
bool PassExceptions = false;
- NWilson::TSpan ComputeActorSpan;
protected:
bool MonCountersProvided = false;
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage;