diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-07 12:23:37 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-07 12:23:37 +0300 |
commit | fd191c37beaf0651cc6f10f4cc76e03b23b8f85c (patch) | |
tree | d90733e52307eaf14982a14a81581db995af12a8 | |
parent | 21b23100b7762bb785ac36bf473389453124377a (diff) | |
download | ydb-fd191c37beaf0651cc6f10f4cc76e03b23b8f85c.tar.gz |
add logging and signals, correct config defaults
improve stats
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 70 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/service/service.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/service/service.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/usage/config.h | 2 |
4 files changed, 53 insertions, 41 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 8c17c5b83b9..648e6b0179e 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -92,6 +92,7 @@ public: } void Bootstrap(const TActorContext& ctx) { + auto g = Stats.MakeGuard("processing"); ScanActorId = ctx.SelfID; TimeoutActorId = CreateLongTimer(ctx, Deadline - TInstant::Now(), @@ -111,6 +112,7 @@ public: private: STATEFN(StateScan) { + auto g = Stats.MakeGuard("processing"); switch (ev->GetTypeRewrite()) { hFunc(TEvKqpCompute::TEvScanDataAck, HandleScan); hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, HandleScan); @@ -158,9 +160,12 @@ private: } void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN) << SelfId(); - Stats.TaskProcessed(); + auto g = Stats.MakeGuard("task_result"); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + "Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); if (ev->Get()->GetErrorMessage()) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN) + << "Scan " << ScanActorId << " got finished error " << ev->Get()->GetErrorMessage() << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId; DataTasksProcessor->Stop(); SendScanError(ev->Get()->GetErrorMessage()); Finish(); @@ -173,7 +178,7 @@ private: } void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) { - Stats.TaskProcessed(); + auto g = Stats.MakeGuard("ack"); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId @@ -194,7 +199,7 @@ private: } void HandleScan(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) { - auto g = Stats.MakeGuard("EvResult"); + auto g = Stats.MakeGuard("blob"); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " blobs response:" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); @@ -366,7 +371,6 @@ private: // * or there is an in-flight blob read or ScanData message for which // we will get a reply and will be able to proceed futher if (!ScanIterator || InFlightScanDataMessages != 0 || InFlightReads != 0) { - Stats.StartWait(); return; } } @@ -375,7 +379,6 @@ private: LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " is hanging" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); - Stats.StartWait(); } void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) { @@ -646,24 +649,15 @@ private: TBlobStats MissBlobs; THashMap<TString, TDuration> GuardedDurations; THashMap<TString, TInstant> StartGuards; - std::optional<TInstant> FirstReceived; - TInstant LastWaitStart; - TDuration WaitReceive; + THashMap<TString, TInstant> SectionFirst; + THashMap<TString, TInstant> SectionLast; public: - void StartWait() { - Y_VERIFY_DEBUG(!LastWaitStart); - LastWaitStart = Now(); - } - TString DebugString() const { + const TInstant now = TInstant::Now(); TStringBuilder sb; sb << "SCAN_STATS;"; sb << "start=" << StartInstant << ";"; - if (FirstReceived) { - sb << "frw=" << *FirstReceived - StartInstant << ";"; - } - sb << "srw=" << WaitReceive << ";"; sb << "d=" << FinishInstant - StartInstant << ";"; if (RequestsCount) { sb << "req:{count=" << RequestsCount << ";bytes=" << RequestedBytes << ";bytes_avg=" << RequestedBytes / RequestsCount << "};"; @@ -672,11 +666,28 @@ private: } else { sb << "NO_REQUESTS;"; } + std::map<ui32, std::vector<TString>> points; + for (auto&& i : SectionFirst) { + points[(i.second - StartInstant).MilliSeconds()].emplace_back("f_" + i.first); + } + for (auto&& i : SectionLast) { + auto it = StartGuards.find(i.first); + if (it != StartGuards.end()) { + points[(now - StartInstant).MilliSeconds()].emplace_back("l_" + i.first); + } else { + points[(i.second - StartInstant).MilliSeconds()].emplace_back("l_" + i.first); + } + } + sb << "tline:("; + for (auto&& i : points) { + sb << Sprintf("%0.3f", 0.001 * i.first) << ":" << JoinSeq(",", i.second) << ";"; + } + sb << ");"; for (auto&& i : GuardedDurations) { auto it = StartGuards.find(i.first); TDuration delta; if (it != StartGuards.end()) { - delta = Now() - it->second; + delta = now - it->second; } sb << i.first << "=" << i.second + delta << ";"; } @@ -693,12 +704,17 @@ private: : Owner(owner) , SectionName(sectionName) { + if (!Owner.SectionFirst.contains(SectionName)) { + Owner.SectionFirst.emplace(SectionName, Start); + } Y_VERIFY(Owner.StartGuards.emplace(SectionName, Start).second); } ~TGuard() { - Owner.GuardedDurations[SectionName] += Now() - Start; + const TInstant finish = TInstant::Now(); + Owner.GuardedDurations[SectionName] += finish - Start; Owner.StartGuards.erase(SectionName); + Owner.SectionLast[SectionName] = finish; } }; @@ -715,21 +731,7 @@ private: } } - void TaskProcessed() { - if (LastWaitStart) { - WaitReceive += Now() - LastWaitStart; - LastWaitStart = TInstant::Zero(); - } - } - void BlobReceived(const NBlobCache::TBlobRange& br, const bool fromCache, const TInstant replyInstant) { - if (!FirstReceived) { - FirstReceived = Now(); - } - if (LastWaitStart) { - WaitReceive += Now() - LastWaitStart; - LastWaitStart = TInstant::Zero(); - } auto it = StartBlobRequest.find(br); Y_VERIFY(it != StartBlobRequest.end()); const TDuration d = replyInstant - it->second; diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp index 710f372276b..45e030ef46e 100644 --- a/ydb/core/tx/conveyor/service/service.cpp +++ b/ydb/core/tx/conveyor/service/service.cpp @@ -12,25 +12,29 @@ TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, T : Config(config) , ConveyorName(conveyorName) , WaitingQueueSize(conveyorSignals->GetCounter("WaitingQueueSize")) + , WaitingQueueSizeLimit(conveyorSignals->GetCounter("WaitingQueueSizeLimit")) , WorkersCount(conveyorSignals->GetCounter("WorkersCount")) , WorkersCountLimit(conveyorSignals->GetCounter("WorkersCountLimit")) , IncomingRate(conveyorSignals->GetCounter("Incoming", true)) - , SolutionsRate(conveyorSignals->GetCounter("Solved", true)) { + , SolutionsRate(conveyorSignals->GetCounter("Solved", true)) + , OverlimitRate(conveyorSignals->GetCounter("Overlimit", true)) +{ } void TDistributor::Bootstrap() { - ALS_NOTICE(NKikimrServices::TX_CONVEYOR) << "conveyor registered: " << SelfId(); + const ui32 workersCount = Config.GetWorkersCountDef(NKqp::TStagePredictor::GetUsableThreads()); + ALS_NOTICE(NKikimrServices::TX_CONVEYOR) << "action=conveyor_registered;actor_id=" << SelfId() << ";workers_count=" << workersCount << ";limit=" << Config.GetQueueSizeLimit(); TServiceOperator::Register(Config); - for (ui32 i = 0; i < Config.GetWorkersCountDef(NKqp::TStagePredictor::GetUsableThreads()); ++i) { + for (ui32 i = 0; i < workersCount; ++i) { Workers.emplace_back(Register(new TWorker())); } WorkersCountLimit->Set(Workers.size()); + WaitingQueueSizeLimit->Set(Config.GetQueueSizeLimit()); Become(&TDistributor::StateMain); } void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) { - ALS_DEBUG(NKikimrServices::TX_CONVEYOR) << "action=processed;owner=" << ev->Get()->GetOwnerId() << ";workers=" << Workers.size() << ";waiting=" << Waiting.size(); SolutionsRate->Inc(); if (Waiting.size()) { Send(ev->Sender, new TEvInternal::TEvNewTask(Waiting.front())); @@ -38,13 +42,15 @@ void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) { } else { Workers.emplace_back(ev->Sender); } - if (!*ev->Get()) { + if (ev->Get()->GetErrorMessage()) { + ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "action=on_error;owner=" << ev->Get()->GetOwnerId() << ";workers=" << Workers.size() << ";waiting=" << Waiting.size(); Send(ev->Get()->GetOwnerId(), new TEvExecution::TEvTaskProcessedResult(ev->Get()->GetErrorMessage())); } else { Send(ev->Get()->GetOwnerId(), new TEvExecution::TEvTaskProcessedResult(ev->Get()->GetResult())); } WaitingQueueSize->Set(Waiting.size()); WorkersCount->Set(Workers.size()); + ALS_DEBUG(NKikimrServices::TX_CONVEYOR) << "action=processed;owner=" << ev->Get()->GetOwnerId() << ";workers=" << Workers.size() << ";waiting=" << Waiting.size(); } void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) { @@ -56,8 +62,10 @@ void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) { } else if (Waiting.size() < Config.GetQueueSizeLimit()) { Waiting.emplace_back(ev->Get()->GetTask(), ev->Sender); } else { + ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "action=overlimit;sender=" << ev->Sender << ";workers=" << Workers.size() << ";waiting=" << Waiting.size(); + OverlimitRate->Inc(); Send(ev->Sender, new TEvExecution::TEvTaskProcessedResult("scan conveyor overloaded (" + - ::ToString(Waiting.size()) + " > " + ::ToString(Config.GetQueueSizeLimit()) + ")")); + ::ToString(Waiting.size()) + " >= " + ::ToString(Config.GetQueueSizeLimit()) + ")")); } WaitingQueueSize->Set(Waiting.size()); WorkersCount->Set(Workers.size()); diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h index 149cab231af..5e9d49a625a 100644 --- a/ydb/core/tx/conveyor/service/service.h +++ b/ydb/core/tx/conveyor/service/service.h @@ -14,10 +14,12 @@ private: std::vector<TActorId> Workers; std::deque<TWorkerTask> Waiting; const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSize; + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSizeLimit; const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCount; const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCountLimit; const ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRate; const ::NMonitoring::TDynamicCounters::TCounterPtr SolutionsRate; + const ::NMonitoring::TDynamicCounters::TCounterPtr OverlimitRate; void HandleMain(TEvExecution::TEvNewTask::TPtr& ev); void HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev); diff --git a/ydb/core/tx/conveyor/usage/config.h b/ydb/core/tx/conveyor/usage/config.h index 0afbe53cefe..dbce08107ce 100644 --- a/ydb/core/tx/conveyor/usage/config.h +++ b/ydb/core/tx/conveyor/usage/config.h @@ -7,7 +7,7 @@ namespace NKikimr::NConveyor { class TConfig { private: YDB_OPT(ui32, WorkersCount); - YDB_READONLY(ui32, QueueSizeLimit, 256); + YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024); YDB_READONLY_FLAG(Enabled, true); public: bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config); |