aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-07 12:23:37 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-07 12:23:37 +0300
commitfd191c37beaf0651cc6f10f4cc76e03b23b8f85c (patch)
treed90733e52307eaf14982a14a81581db995af12a8
parent21b23100b7762bb785ac36bf473389453124377a (diff)
downloadydb-fd191c37beaf0651cc6f10f4cc76e03b23b8f85c.tar.gz
add logging and signals, correct config defaults
improve stats
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp70
-rw-r--r--ydb/core/tx/conveyor/service/service.cpp20
-rw-r--r--ydb/core/tx/conveyor/service/service.h2
-rw-r--r--ydb/core/tx/conveyor/usage/config.h2
4 files changed, 53 insertions, 41 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 8c17c5b83b9..648e6b0179e 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -92,6 +92,7 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
+ auto g = Stats.MakeGuard("processing");
ScanActorId = ctx.SelfID;
TimeoutActorId = CreateLongTimer(ctx, Deadline - TInstant::Now(),
@@ -111,6 +112,7 @@ public:
private:
STATEFN(StateScan) {
+ auto g = Stats.MakeGuard("processing");
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpCompute::TEvScanDataAck, HandleScan);
hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, HandleScan);
@@ -158,9 +160,12 @@ private:
}
void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN) << SelfId();
- Stats.TaskProcessed();
+ auto g = Stats.MakeGuard("task_result");
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ "Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
if (ev->Get()->GetErrorMessage()) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)
+ << "Scan " << ScanActorId << " got finished error " << ev->Get()->GetErrorMessage() << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId;
DataTasksProcessor->Stop();
SendScanError(ev->Get()->GetErrorMessage());
Finish();
@@ -173,7 +178,7 @@ private:
}
void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) {
- Stats.TaskProcessed();
+ auto g = Stats.MakeGuard("ack");
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " got ScanDataAck"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
@@ -194,7 +199,7 @@ private:
}
void HandleScan(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) {
- auto g = Stats.MakeGuard("EvResult");
+ auto g = Stats.MakeGuard("blob");
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " blobs response:"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
@@ -366,7 +371,6 @@ private:
// * or there is an in-flight blob read or ScanData message for which
// we will get a reply and will be able to proceed futher
if (!ScanIterator || InFlightScanDataMessages != 0 || InFlightReads != 0) {
- Stats.StartWait();
return;
}
}
@@ -375,7 +379,6 @@ private:
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " is hanging"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
- Stats.StartWait();
}
void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) {
@@ -646,24 +649,15 @@ private:
TBlobStats MissBlobs;
THashMap<TString, TDuration> GuardedDurations;
THashMap<TString, TInstant> StartGuards;
- std::optional<TInstant> FirstReceived;
- TInstant LastWaitStart;
- TDuration WaitReceive;
+ THashMap<TString, TInstant> SectionFirst;
+ THashMap<TString, TInstant> SectionLast;
public:
- void StartWait() {
- Y_VERIFY_DEBUG(!LastWaitStart);
- LastWaitStart = Now();
- }
-
TString DebugString() const {
+ const TInstant now = TInstant::Now();
TStringBuilder sb;
sb << "SCAN_STATS;";
sb << "start=" << StartInstant << ";";
- if (FirstReceived) {
- sb << "frw=" << *FirstReceived - StartInstant << ";";
- }
- sb << "srw=" << WaitReceive << ";";
sb << "d=" << FinishInstant - StartInstant << ";";
if (RequestsCount) {
sb << "req:{count=" << RequestsCount << ";bytes=" << RequestedBytes << ";bytes_avg=" << RequestedBytes / RequestsCount << "};";
@@ -672,11 +666,28 @@ private:
} else {
sb << "NO_REQUESTS;";
}
+ std::map<ui32, std::vector<TString>> points;
+ for (auto&& i : SectionFirst) {
+ points[(i.second - StartInstant).MilliSeconds()].emplace_back("f_" + i.first);
+ }
+ for (auto&& i : SectionLast) {
+ auto it = StartGuards.find(i.first);
+ if (it != StartGuards.end()) {
+ points[(now - StartInstant).MilliSeconds()].emplace_back("l_" + i.first);
+ } else {
+ points[(i.second - StartInstant).MilliSeconds()].emplace_back("l_" + i.first);
+ }
+ }
+ sb << "tline:(";
+ for (auto&& i : points) {
+ sb << Sprintf("%0.3f", 0.001 * i.first) << ":" << JoinSeq(",", i.second) << ";";
+ }
+ sb << ");";
for (auto&& i : GuardedDurations) {
auto it = StartGuards.find(i.first);
TDuration delta;
if (it != StartGuards.end()) {
- delta = Now() - it->second;
+ delta = now - it->second;
}
sb << i.first << "=" << i.second + delta << ";";
}
@@ -693,12 +704,17 @@ private:
: Owner(owner)
, SectionName(sectionName)
{
+ if (!Owner.SectionFirst.contains(SectionName)) {
+ Owner.SectionFirst.emplace(SectionName, Start);
+ }
Y_VERIFY(Owner.StartGuards.emplace(SectionName, Start).second);
}
~TGuard() {
- Owner.GuardedDurations[SectionName] += Now() - Start;
+ const TInstant finish = TInstant::Now();
+ Owner.GuardedDurations[SectionName] += finish - Start;
Owner.StartGuards.erase(SectionName);
+ Owner.SectionLast[SectionName] = finish;
}
};
@@ -715,21 +731,7 @@ private:
}
}
- void TaskProcessed() {
- if (LastWaitStart) {
- WaitReceive += Now() - LastWaitStart;
- LastWaitStart = TInstant::Zero();
- }
- }
-
void BlobReceived(const NBlobCache::TBlobRange& br, const bool fromCache, const TInstant replyInstant) {
- if (!FirstReceived) {
- FirstReceived = Now();
- }
- if (LastWaitStart) {
- WaitReceive += Now() - LastWaitStart;
- LastWaitStart = TInstant::Zero();
- }
auto it = StartBlobRequest.find(br);
Y_VERIFY(it != StartBlobRequest.end());
const TDuration d = replyInstant - it->second;
diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp
index 710f372276b..45e030ef46e 100644
--- a/ydb/core/tx/conveyor/service/service.cpp
+++ b/ydb/core/tx/conveyor/service/service.cpp
@@ -12,25 +12,29 @@ TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, T
: Config(config)
, ConveyorName(conveyorName)
, WaitingQueueSize(conveyorSignals->GetCounter("WaitingQueueSize"))
+ , WaitingQueueSizeLimit(conveyorSignals->GetCounter("WaitingQueueSizeLimit"))
, WorkersCount(conveyorSignals->GetCounter("WorkersCount"))
, WorkersCountLimit(conveyorSignals->GetCounter("WorkersCountLimit"))
, IncomingRate(conveyorSignals->GetCounter("Incoming", true))
- , SolutionsRate(conveyorSignals->GetCounter("Solved", true)) {
+ , SolutionsRate(conveyorSignals->GetCounter("Solved", true))
+ , OverlimitRate(conveyorSignals->GetCounter("Overlimit", true))
+{
}
void TDistributor::Bootstrap() {
- ALS_NOTICE(NKikimrServices::TX_CONVEYOR) << "conveyor registered: " << SelfId();
+ const ui32 workersCount = Config.GetWorkersCountDef(NKqp::TStagePredictor::GetUsableThreads());
+ ALS_NOTICE(NKikimrServices::TX_CONVEYOR) << "action=conveyor_registered;actor_id=" << SelfId() << ";workers_count=" << workersCount << ";limit=" << Config.GetQueueSizeLimit();
TServiceOperator::Register(Config);
- for (ui32 i = 0; i < Config.GetWorkersCountDef(NKqp::TStagePredictor::GetUsableThreads()); ++i) {
+ for (ui32 i = 0; i < workersCount; ++i) {
Workers.emplace_back(Register(new TWorker()));
}
WorkersCountLimit->Set(Workers.size());
+ WaitingQueueSizeLimit->Set(Config.GetQueueSizeLimit());
Become(&TDistributor::StateMain);
}
void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) {
- ALS_DEBUG(NKikimrServices::TX_CONVEYOR) << "action=processed;owner=" << ev->Get()->GetOwnerId() << ";workers=" << Workers.size() << ";waiting=" << Waiting.size();
SolutionsRate->Inc();
if (Waiting.size()) {
Send(ev->Sender, new TEvInternal::TEvNewTask(Waiting.front()));
@@ -38,13 +42,15 @@ void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) {
} else {
Workers.emplace_back(ev->Sender);
}
- if (!*ev->Get()) {
+ if (ev->Get()->GetErrorMessage()) {
+ ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "action=on_error;owner=" << ev->Get()->GetOwnerId() << ";workers=" << Workers.size() << ";waiting=" << Waiting.size();
Send(ev->Get()->GetOwnerId(), new TEvExecution::TEvTaskProcessedResult(ev->Get()->GetErrorMessage()));
} else {
Send(ev->Get()->GetOwnerId(), new TEvExecution::TEvTaskProcessedResult(ev->Get()->GetResult()));
}
WaitingQueueSize->Set(Waiting.size());
WorkersCount->Set(Workers.size());
+ ALS_DEBUG(NKikimrServices::TX_CONVEYOR) << "action=processed;owner=" << ev->Get()->GetOwnerId() << ";workers=" << Workers.size() << ";waiting=" << Waiting.size();
}
void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) {
@@ -56,8 +62,10 @@ void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) {
} else if (Waiting.size() < Config.GetQueueSizeLimit()) {
Waiting.emplace_back(ev->Get()->GetTask(), ev->Sender);
} else {
+ ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "action=overlimit;sender=" << ev->Sender << ";workers=" << Workers.size() << ";waiting=" << Waiting.size();
+ OverlimitRate->Inc();
Send(ev->Sender, new TEvExecution::TEvTaskProcessedResult("scan conveyor overloaded (" +
- ::ToString(Waiting.size()) + " > " + ::ToString(Config.GetQueueSizeLimit()) + ")"));
+ ::ToString(Waiting.size()) + " >= " + ::ToString(Config.GetQueueSizeLimit()) + ")"));
}
WaitingQueueSize->Set(Waiting.size());
WorkersCount->Set(Workers.size());
diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h
index 149cab231af..5e9d49a625a 100644
--- a/ydb/core/tx/conveyor/service/service.h
+++ b/ydb/core/tx/conveyor/service/service.h
@@ -14,10 +14,12 @@ private:
std::vector<TActorId> Workers;
std::deque<TWorkerTask> Waiting;
const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSize;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSizeLimit;
const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCount;
const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCountLimit;
const ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRate;
const ::NMonitoring::TDynamicCounters::TCounterPtr SolutionsRate;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr OverlimitRate;
void HandleMain(TEvExecution::TEvNewTask::TPtr& ev);
void HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev);
diff --git a/ydb/core/tx/conveyor/usage/config.h b/ydb/core/tx/conveyor/usage/config.h
index 0afbe53cefe..dbce08107ce 100644
--- a/ydb/core/tx/conveyor/usage/config.h
+++ b/ydb/core/tx/conveyor/usage/config.h
@@ -7,7 +7,7 @@ namespace NKikimr::NConveyor {
class TConfig {
private:
YDB_OPT(ui32, WorkersCount);
- YDB_READONLY(ui32, QueueSizeLimit, 256);
+ YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024);
YDB_READONLY_FLAG(Enabled, true);
public:
bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config);