diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-29 17:38:02 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-29 17:38:02 +0300 |
commit | 47548c73c08693b554c78ee16069c628553abb8b (patch) | |
tree | 84f767d8d3579c7388b9a46322f020b23b7476da | |
parent | e3ec0b86507a1915e9ac47235efab317582fea67 (diff) | |
download | ydb-47548c73c08693b554c78ee16069c628553abb8b.tar.gz |
provide logging context into bottom functions on stack
new signals
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 113 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/scan.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/scan.h | 10 |
4 files changed, 55 insertions, 79 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 073d771917b..6a1d76291d6 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -96,9 +96,7 @@ public: void Bootstrap(const TActorContext& ctx) { auto g = Stats.MakeGuard("processing"); ScanActorId = ctx.SelfID; - - TimeoutActorId = CreateLongTimer(ctx, Deadline - TInstant::Now(), - new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup)); + Schedule(Deadline, new TEvents::TEvWakeup); Y_VERIFY(!ScanIterator); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(MakeTasksProcessor(), ScanCountersPool); @@ -120,6 +118,9 @@ private: STATEFN(StateScan) { auto g = Stats.MakeGuard("processing"); + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN) + ("SelfId", SelfId())("TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen) + ); switch (ev->GetTypeRewrite()) { hFunc(TEvKqpCompute::TEvScanDataAck, HandleScan); hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, HandleScan); @@ -163,9 +164,7 @@ private: for (auto&& s : i.second) { size += s.Size; } - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " blobs request:" << i.first << "/" << i.second.size() << "/" << size - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + ACFL_DEBUG("event", "ReadNextBlob")("blob_id", i.first)("ranges_count", i.second.size())("size", size); Stats.RequestSent(i.second); Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(i.second), std::move(readOpts))); } @@ -174,14 +173,12 @@ private: void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) { auto g = Stats.MakeGuard("task_result"); - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); if (ev->Get()->GetErrorMessage()) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN) - << "Scan " << ScanActorId << " got finished error " << ev->Get()->GetErrorMessage() << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId; + ACFL_DEBUG("event", "TEvTaskProcessedResult")("error", ev->Get()->GetErrorMessage()); SendScanError(ev->Get()->GetErrorMessage()); Finish(); } else { + ACFL_DEBUG("event", "TEvTaskProcessedResult"); auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()); Y_VERIFY_DEBUG(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult())); ScanIterator->Apply(t); @@ -191,10 +188,6 @@ private: void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) { auto g = Stats.MakeGuard("ack"); - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " got ScanDataAck" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId - << " freeSpace: " << ev->Get()->FreeSpace << " " << ChunksLimiter.DebugString()); if (!ComputeActorId) { ComputeActorId = ev->Sender; @@ -203,28 +196,22 @@ private: Y_VERIFY(ev->Get()->Generation == ScanGen); ChunksLimiter = TChunksLimiter(ev->Get()->FreeSpace, ev->Get()->MaxChunksCount); + ACFL_DEBUG("event", "TEvScanDataAck")("info", ChunksLimiter.DebugString()); ContinueProcessing(); } void HandleScan(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) { auto g = Stats.MakeGuard("blob"); - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " blobs response:" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + ACFL_DEBUG("event", "TEvReadBlobRangeResult"); --InFlightReads; auto& event = *ev->Get(); const auto& blobRange = event.BlobRange; - ScanCountersPool.Aggregations->RemoveFlightReadInfo(blobRange.Size); Stats.BlobReceived(blobRange, event.FromCache, event.ConstructTime); if (event.Status != NKikimrProto::EReplyStatus::OK) { TString strStatus = NKikimrProto::EReplyStatus_Name(event.Status); - LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " got TEvReadBlobRangeResult error" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId - << " blob: " << ev->Get()->BlobRange - << " status: " << strStatus); + ACFL_WARN("event", "TEvReadBlobRangeResult")("error", strStatus)("blob", ev->Get()->BlobRange); SendScanError(strStatus); return Finish(); } @@ -234,11 +221,7 @@ private: InFlightReadBytes -= blobRange.Size; - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " got TEvReadBlobRangeResult" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId - << " blob: " << ev->Get()->BlobRange << ";" - << ChunksLimiter.DebugString()); + ACFL_TRACE("event", "TEvReadBlobRangeResult")("blob", ev->Get()->BlobRange)("chunks_limiter", ChunksLimiter.DebugString()); if (ScanIterator) { { @@ -250,33 +233,27 @@ private: } // Returns true if it was able to produce new batch - bool ProduceResults() { + bool ProduceResults() noexcept { auto g = Stats.MakeGuard("ProduceResults"); - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " producing result: start" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build()("method", "produce result")); + + ACFL_DEBUG("stage", "start")("iterator", ScanIterator->DebugString()); Y_VERIFY(!Finished); Y_VERIFY(ScanIterator); if (!ChunksLimiter.HasMore()) { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " producing result: bytes limit exhausted" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + ACFL_DEBUG("stage", "bytes limit exhausted")("limit", ChunksLimiter.DebugString()); return false; } if (ScanIterator->Finished()) { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " producing result: scan iterator is finished" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + ACFL_DEBUG("stage", "scan iterator is finished")("iterator", ScanIterator->DebugString()); return false; } auto result = ScanIterator->GetBatch(); if (!result.ErrorString.empty()) { - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " producing result: got error '" << result.ErrorString - << "' txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", result.ErrorString); SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size())); ScanIterator.reset(); @@ -287,28 +264,22 @@ private: if (ResultYqlSchema.empty() && DataFormat != NKikimrTxDataShard::EScanDataFormat::ARROW) { ResultYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetResultYqlSchema(); } - if (!result.ResultBatch) { - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " producing result: no data is ready yet" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + + auto& batch = result.ResultBatch; + if (!batch) { + ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString()); return false; } - auto& batch = result.ResultBatch; int numRows = batch->num_rows(); int numColumns = batch->num_columns(); if (!numRows) { - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " producing result: got empty batch" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + ACFL_DEBUG("stage", "got empty batch")("iterator", ScanIterator->DebugString()); return true; } - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " producing result: got ready result" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId - << " blob (" << numColumns << " columns, " << numRows << " rows)" - << " format: " << NKikimrTxDataShard::EScanDataFormat_Name(DataFormat)); + ACFL_DEBUG("stage", "ready result")("iterator", ScanIterator->DebugString())("format", NKikimrTxDataShard::EScanDataFormat_Name(DataFormat)) + ("columns", numColumns)("rows", numRows); switch (DataFormat) { case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: @@ -334,17 +305,13 @@ private: Y_VERIFY(numRows == 0, "Got non-empty result batch without last key"); } SendResult(false, false); - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " producing result: finished" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + ACFL_DEBUG("stage", "finished")("iterator", ScanIterator->DebugString()); return true; } void ContinueProcessingStep() { if (!ScanIterator) { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " iterator is not initialized" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + ACFL_DEBUG("event", "ContinueProcessingStep")("stage", "iterator is not initialized"); return; } @@ -376,6 +343,7 @@ private: return; } } + ScanCountersPool.Hanging->Add(1); Y_VERIFY_DEBUG(false); // The loop has finished without any progress! LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, @@ -383,7 +351,7 @@ private: << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); } - void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) { + void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) noexcept { auto& msg = ev->Get()->Record; TString reason = ev->Get()->GetIssues().ToOneLineString(); @@ -424,7 +392,6 @@ private: "Scan " << ScanActorId << " guard execution timeout" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); - TimeoutActorId = {}; Finish(); } @@ -551,12 +518,8 @@ private: } void Finish() { - if (TimeoutActorId) { - Send(TimeoutActorId, new TEvents::TEvPoison); - } - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " finished"); + "Scan " << ScanActorId << " finished for tablet " << TabletId); Send(ColumnShardActorId, new TEvPrivate::TEvReadFinished(RequestCookie, TxId)); ReportStats(); @@ -594,7 +557,6 @@ private: const TInstant Deadline; TConcreteScanCounters ScanCountersPool; - TActorId TimeoutActorId; TMaybe<TString> AbortReason; TChunksLimiter ChunksLimiter; @@ -609,10 +571,12 @@ private: ui64 Bytes = 0; TDuration ReadingDurationSum; TDuration ReadingDurationMax; - NMonitoring::THistogramPtr DurationsCounter; + NMonitoring::THistogramPtr BlobDurationsCounter; + NMonitoring::THistogramPtr ByteDurationsCounter; public: - TBlobStats(const NMonitoring::THistogramPtr durationsCounter) - : DurationsCounter(durationsCounter) + TBlobStats(const NMonitoring::THistogramPtr blobDurationsCounter, const NMonitoring::THistogramPtr byteDurationsCounter) + : BlobDurationsCounter(blobDurationsCounter) + , ByteDurationsCounter(byteDurationsCounter) { } @@ -621,7 +585,8 @@ private: ReadingDurationMax = Max(ReadingDurationMax, d); ++PartsCount; Bytes += br.Size; - DurationsCounter->Collect(d.MilliSeconds()); + BlobDurationsCounter->Collect(d.MilliSeconds()); + ByteDurationsCounter->Collect((i64)d.MilliSeconds(), br.Size); } TString DebugString() const { TStringBuilder sb; @@ -653,8 +618,8 @@ private: public: TScanStats(const TConcreteScanCounters& counters) - : CacheBlobs(counters.HistogramCacheBlobsDuration) - , MissBlobs(counters.HistogramMissCacheBlobsDuration) + : CacheBlobs(counters.HistogramCacheBlobsCountDuration, counters.HistogramCacheBlobBytesDuration) + , MissBlobs(counters.HistogramMissCacheBlobsCountDuration, counters.HistogramMissCacheBlobBytesDuration) { } diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 3353ad31262..37e86afe291 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -32,6 +32,9 @@ public: virtual NOlap::TPartialReadResult GetBatch() = 0; virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); } virtual size_t ReadyResultsCount() const = 0; + virtual TString DebugString() const { + return "NO_DATA"; + } }; } diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp index cdfee85a6f6..ec06d0ccdf8 100644 --- a/ydb/core/tx/columnshard/counters/scan.cpp +++ b/ydb/core/tx/columnshard/counters/scan.cpp @@ -30,9 +30,13 @@ TScanCounters::TScanCounters(const TString& module) , TwoPhasesFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesFilterUsefulBytes")) , TwoPhasesPostFilterFetchedBytes(TBase::GetDeriviative("TwoPhasesPostFilterFetchedBytes")) , TwoPhasesPostFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesPostFilterUsefulBytes")) - , HistogramCacheBlobsDuration(TBase::GetHistogram("CacheBlobsDurationMs", NMonitoring::ExponentialHistogram(12, 2))) - , HistogramMissCacheBlobsDuration(TBase::GetHistogram("MissCacheBlobsDurationMs", NMonitoring::ExponentialHistogram(12, 2))) + , Hanging(TBase::GetDeriviative("Hanging")) + + , HistogramCacheBlobsCountDuration(TBase::GetHistogram("CacheBlobsCountDurationMs", NMonitoring::ExponentialHistogram(13, 2))) + , HistogramMissCacheBlobsCountDuration(TBase::GetHistogram("MissCacheBlobsCountDurationMs", NMonitoring::ExponentialHistogram(13, 2))) + , HistogramCacheBlobBytesDuration(TBase::GetHistogram("CacheBlobBytesDurationMs", NMonitoring::ExponentialHistogram(13, 2))) + , HistogramMissCacheBlobBytesDuration(TBase::GetHistogram("MissCacheBlobBytesDurationMs", NMonitoring::ExponentialHistogram(13, 2))) { } diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index a28672c8413..1e1597e533a 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -116,15 +116,19 @@ public: NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterFetchedBytes; NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterUsefulBytes; - NMonitoring::THistogramPtr HistogramCacheBlobsDuration; - NMonitoring::THistogramPtr HistogramMissCacheBlobsDuration; + NMonitoring::TDynamicCounters::TCounterPtr Hanging; + + NMonitoring::THistogramPtr HistogramCacheBlobsCountDuration; + NMonitoring::THistogramPtr HistogramMissCacheBlobsCountDuration; + NMonitoring::THistogramPtr HistogramCacheBlobBytesDuration; + NMonitoring::THistogramPtr HistogramMissCacheBlobBytesDuration; TScanCounters(const TString& module = "Scan"); void OnProcessingOverloaded() { ProcessingOverload->Add(1); } - void OnReadingOverloaded() { + void OnReadingOverloaded() const { ReadingOverload->Add(1); } |