aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-29 17:38:02 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-29 17:38:02 +0300
commit47548c73c08693b554c78ee16069c628553abb8b (patch)
tree84f767d8d3579c7388b9a46322f020b23b7476da
parente3ec0b86507a1915e9ac47235efab317582fea67 (diff)
downloadydb-47548c73c08693b554c78ee16069c628553abb8b.tar.gz
provide logging context into bottom functions on stack
new signals
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp113
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h3
-rw-r--r--ydb/core/tx/columnshard/counters/scan.cpp8
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h10
4 files changed, 55 insertions, 79 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 073d771917b..6a1d76291d6 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -96,9 +96,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
auto g = Stats.MakeGuard("processing");
ScanActorId = ctx.SelfID;
-
- TimeoutActorId = CreateLongTimer(ctx, Deadline - TInstant::Now(),
- new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup));
+ Schedule(Deadline, new TEvents::TEvWakeup);
Y_VERIFY(!ScanIterator);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(MakeTasksProcessor(), ScanCountersPool);
@@ -120,6 +118,9 @@ private:
STATEFN(StateScan) {
auto g = Stats.MakeGuard("processing");
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN)
+ ("SelfId", SelfId())("TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen)
+ );
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpCompute::TEvScanDataAck, HandleScan);
hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, HandleScan);
@@ -163,9 +164,7 @@ private:
for (auto&& s : i.second) {
size += s.Size;
}
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " blobs request:" << i.first << "/" << i.second.size() << "/" << size
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ ACFL_DEBUG("event", "ReadNextBlob")("blob_id", i.first)("ranges_count", i.second.size())("size", size);
Stats.RequestSent(i.second);
Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(i.second), std::move(readOpts)));
}
@@ -174,14 +173,12 @@ private:
void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
auto g = Stats.MakeGuard("task_result");
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
if (ev->Get()->GetErrorMessage()) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)
- << "Scan " << ScanActorId << " got finished error " << ev->Get()->GetErrorMessage() << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId;
+ ACFL_DEBUG("event", "TEvTaskProcessedResult")("error", ev->Get()->GetErrorMessage());
SendScanError(ev->Get()->GetErrorMessage());
Finish();
} else {
+ ACFL_DEBUG("event", "TEvTaskProcessedResult");
auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult());
Y_VERIFY_DEBUG(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()));
ScanIterator->Apply(t);
@@ -191,10 +188,6 @@ private:
void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) {
auto g = Stats.MakeGuard("ack");
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " got ScanDataAck"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
- << " freeSpace: " << ev->Get()->FreeSpace << " " << ChunksLimiter.DebugString());
if (!ComputeActorId) {
ComputeActorId = ev->Sender;
@@ -203,28 +196,22 @@ private:
Y_VERIFY(ev->Get()->Generation == ScanGen);
ChunksLimiter = TChunksLimiter(ev->Get()->FreeSpace, ev->Get()->MaxChunksCount);
+ ACFL_DEBUG("event", "TEvScanDataAck")("info", ChunksLimiter.DebugString());
ContinueProcessing();
}
void HandleScan(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) {
auto g = Stats.MakeGuard("blob");
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " blobs response:"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ ACFL_DEBUG("event", "TEvReadBlobRangeResult");
--InFlightReads;
auto& event = *ev->Get();
const auto& blobRange = event.BlobRange;
- ScanCountersPool.Aggregations->RemoveFlightReadInfo(blobRange.Size);
Stats.BlobReceived(blobRange, event.FromCache, event.ConstructTime);
if (event.Status != NKikimrProto::EReplyStatus::OK) {
TString strStatus = NKikimrProto::EReplyStatus_Name(event.Status);
- LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " got TEvReadBlobRangeResult error"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
- << " blob: " << ev->Get()->BlobRange
- << " status: " << strStatus);
+ ACFL_WARN("event", "TEvReadBlobRangeResult")("error", strStatus)("blob", ev->Get()->BlobRange);
SendScanError(strStatus);
return Finish();
}
@@ -234,11 +221,7 @@ private:
InFlightReadBytes -= blobRange.Size;
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " got TEvReadBlobRangeResult"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
- << " blob: " << ev->Get()->BlobRange << ";"
- << ChunksLimiter.DebugString());
+ ACFL_TRACE("event", "TEvReadBlobRangeResult")("blob", ev->Get()->BlobRange)("chunks_limiter", ChunksLimiter.DebugString());
if (ScanIterator) {
{
@@ -250,33 +233,27 @@ private:
}
// Returns true if it was able to produce new batch
- bool ProduceResults() {
+ bool ProduceResults() noexcept {
auto g = Stats.MakeGuard("ProduceResults");
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " producing result: start"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build()("method", "produce result"));
+
+ ACFL_DEBUG("stage", "start")("iterator", ScanIterator->DebugString());
Y_VERIFY(!Finished);
Y_VERIFY(ScanIterator);
if (!ChunksLimiter.HasMore()) {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " producing result: bytes limit exhausted"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ ACFL_DEBUG("stage", "bytes limit exhausted")("limit", ChunksLimiter.DebugString());
return false;
}
if (ScanIterator->Finished()) {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " producing result: scan iterator is finished"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ ACFL_DEBUG("stage", "scan iterator is finished")("iterator", ScanIterator->DebugString());
return false;
}
auto result = ScanIterator->GetBatch();
if (!result.ErrorString.empty()) {
- LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " producing result: got error '" << result.ErrorString
- << "' txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", result.ErrorString);
SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size()));
ScanIterator.reset();
@@ -287,28 +264,22 @@ private:
if (ResultYqlSchema.empty() && DataFormat != NKikimrTxDataShard::EScanDataFormat::ARROW) {
ResultYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetResultYqlSchema();
}
- if (!result.ResultBatch) {
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " producing result: no data is ready yet"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+
+ auto& batch = result.ResultBatch;
+ if (!batch) {
+ ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
return false;
}
- auto& batch = result.ResultBatch;
int numRows = batch->num_rows();
int numColumns = batch->num_columns();
if (!numRows) {
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " producing result: got empty batch"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ ACFL_DEBUG("stage", "got empty batch")("iterator", ScanIterator->DebugString());
return true;
}
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " producing result: got ready result"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
- << " blob (" << numColumns << " columns, " << numRows << " rows)"
- << " format: " << NKikimrTxDataShard::EScanDataFormat_Name(DataFormat));
+ ACFL_DEBUG("stage", "ready result")("iterator", ScanIterator->DebugString())("format", NKikimrTxDataShard::EScanDataFormat_Name(DataFormat))
+ ("columns", numColumns)("rows", numRows);
switch (DataFormat) {
case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED:
@@ -334,17 +305,13 @@ private:
Y_VERIFY(numRows == 0, "Got non-empty result batch without last key");
}
SendResult(false, false);
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " producing result: finished"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ ACFL_DEBUG("stage", "finished")("iterator", ScanIterator->DebugString());
return true;
}
void ContinueProcessingStep() {
if (!ScanIterator) {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " iterator is not initialized"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ ACFL_DEBUG("event", "ContinueProcessingStep")("stage", "iterator is not initialized");
return;
}
@@ -376,6 +343,7 @@ private:
return;
}
}
+ ScanCountersPool.Hanging->Add(1);
Y_VERIFY_DEBUG(false);
// The loop has finished without any progress!
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
@@ -383,7 +351,7 @@ private:
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
}
- void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) {
+ void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) noexcept {
auto& msg = ev->Get()->Record;
TString reason = ev->Get()->GetIssues().ToOneLineString();
@@ -424,7 +392,6 @@ private:
"Scan " << ScanActorId << " guard execution timeout"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
- TimeoutActorId = {};
Finish();
}
@@ -551,12 +518,8 @@ private:
}
void Finish() {
- if (TimeoutActorId) {
- Send(TimeoutActorId, new TEvents::TEvPoison);
- }
-
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " finished");
+ "Scan " << ScanActorId << " finished for tablet " << TabletId);
Send(ColumnShardActorId, new TEvPrivate::TEvReadFinished(RequestCookie, TxId));
ReportStats();
@@ -594,7 +557,6 @@ private:
const TInstant Deadline;
TConcreteScanCounters ScanCountersPool;
- TActorId TimeoutActorId;
TMaybe<TString> AbortReason;
TChunksLimiter ChunksLimiter;
@@ -609,10 +571,12 @@ private:
ui64 Bytes = 0;
TDuration ReadingDurationSum;
TDuration ReadingDurationMax;
- NMonitoring::THistogramPtr DurationsCounter;
+ NMonitoring::THistogramPtr BlobDurationsCounter;
+ NMonitoring::THistogramPtr ByteDurationsCounter;
public:
- TBlobStats(const NMonitoring::THistogramPtr durationsCounter)
- : DurationsCounter(durationsCounter)
+ TBlobStats(const NMonitoring::THistogramPtr blobDurationsCounter, const NMonitoring::THistogramPtr byteDurationsCounter)
+ : BlobDurationsCounter(blobDurationsCounter)
+ , ByteDurationsCounter(byteDurationsCounter)
{
}
@@ -621,7 +585,8 @@ private:
ReadingDurationMax = Max(ReadingDurationMax, d);
++PartsCount;
Bytes += br.Size;
- DurationsCounter->Collect(d.MilliSeconds());
+ BlobDurationsCounter->Collect(d.MilliSeconds());
+ ByteDurationsCounter->Collect((i64)d.MilliSeconds(), br.Size);
}
TString DebugString() const {
TStringBuilder sb;
@@ -653,8 +618,8 @@ private:
public:
TScanStats(const TConcreteScanCounters& counters)
- : CacheBlobs(counters.HistogramCacheBlobsDuration)
- , MissBlobs(counters.HistogramMissCacheBlobsDuration)
+ : CacheBlobs(counters.HistogramCacheBlobsCountDuration, counters.HistogramCacheBlobBytesDuration)
+ , MissBlobs(counters.HistogramMissCacheBlobsCountDuration, counters.HistogramMissCacheBlobBytesDuration)
{
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index 3353ad31262..37e86afe291 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -32,6 +32,9 @@ public:
virtual NOlap::TPartialReadResult GetBatch() = 0;
virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); }
virtual size_t ReadyResultsCount() const = 0;
+ virtual TString DebugString() const {
+ return "NO_DATA";
+ }
};
}
diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp
index cdfee85a6f6..ec06d0ccdf8 100644
--- a/ydb/core/tx/columnshard/counters/scan.cpp
+++ b/ydb/core/tx/columnshard/counters/scan.cpp
@@ -30,9 +30,13 @@ TScanCounters::TScanCounters(const TString& module)
, TwoPhasesFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesFilterUsefulBytes"))
, TwoPhasesPostFilterFetchedBytes(TBase::GetDeriviative("TwoPhasesPostFilterFetchedBytes"))
, TwoPhasesPostFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesPostFilterUsefulBytes"))
- , HistogramCacheBlobsDuration(TBase::GetHistogram("CacheBlobsDurationMs", NMonitoring::ExponentialHistogram(12, 2)))
- , HistogramMissCacheBlobsDuration(TBase::GetHistogram("MissCacheBlobsDurationMs", NMonitoring::ExponentialHistogram(12, 2)))
+ , Hanging(TBase::GetDeriviative("Hanging"))
+
+ , HistogramCacheBlobsCountDuration(TBase::GetHistogram("CacheBlobsCountDurationMs", NMonitoring::ExponentialHistogram(13, 2)))
+ , HistogramMissCacheBlobsCountDuration(TBase::GetHistogram("MissCacheBlobsCountDurationMs", NMonitoring::ExponentialHistogram(13, 2)))
+ , HistogramCacheBlobBytesDuration(TBase::GetHistogram("CacheBlobBytesDurationMs", NMonitoring::ExponentialHistogram(13, 2)))
+ , HistogramMissCacheBlobBytesDuration(TBase::GetHistogram("MissCacheBlobBytesDurationMs", NMonitoring::ExponentialHistogram(13, 2)))
{
}
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index a28672c8413..1e1597e533a 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -116,15 +116,19 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterFetchedBytes;
NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterUsefulBytes;
- NMonitoring::THistogramPtr HistogramCacheBlobsDuration;
- NMonitoring::THistogramPtr HistogramMissCacheBlobsDuration;
+ NMonitoring::TDynamicCounters::TCounterPtr Hanging;
+
+ NMonitoring::THistogramPtr HistogramCacheBlobsCountDuration;
+ NMonitoring::THistogramPtr HistogramMissCacheBlobsCountDuration;
+ NMonitoring::THistogramPtr HistogramCacheBlobBytesDuration;
+ NMonitoring::THistogramPtr HistogramMissCacheBlobBytesDuration;
TScanCounters(const TString& module = "Scan");
void OnProcessingOverloaded() {
ProcessingOverload->Add(1);
}
- void OnReadingOverloaded() {
+ void OnReadingOverloaded() const {
ReadingOverload->Add(1);
}