aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-02-07 14:46:05 +0300
committereivanov89 <eivanov89@ydb.tech>2023-02-07 14:46:05 +0300
commitda52d09c630930d5714319c800c74bfda8cc3fb0 (patch)
tree8fc4b2bfdd8f0ad5e0c829a4d0a8e6d20bde5777
parentbb37a8b74e61bb96576eca047448e51db75669ca (diff)
downloadydb-da52d09c630930d5714319c800c74bfda8cc3fb0.tar.gz
PR from branch users/eivanov89/-read-iterator-logs
add COUNTER_READ_ITERATORS_EXHAUSTED_COUNT cleanup read iterator logging
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp47
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h8
3 files changed, 47 insertions, 9 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index 1b3ac6a39a..a43c735c47 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -23,6 +23,7 @@ enum ESimpleCounters {
COUNTER_CHANGE_QUEUE_SIZE = 13 [(CounterOpts) = {Name: "ChangeQueueSize"}];
COUNTER_READ_ITERATORS_WAITING = 14 [(CounterOpts) = {Name: "ReadIteratorsWaiting"}];
COUNTER_READ_ITERATORS_COUNT = 15 [(CounterOpts) = {Name: "ReadIteratorsCount"}];
+ COUNTER_READ_ITERATORS_EXHAUSTED_COUNT = 16 [(CounterOpts) = {Name: "ReadIteratorsExhaustedCount"}];
}
enum ECumulativeCounters {
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index dc61aa88d3..dfbbce3655 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -476,6 +476,10 @@ public:
|| FirstUnprocessedQuery < State.Request->Ranges.size();
}
+ size_t GetQueriesCount() const {
+ return State.Request->Keys.size() + State.Request->Ranges.size();
+ }
+
void UpdateCycles() {
GetTimeFast(&EndTime);
}
@@ -602,6 +606,7 @@ public:
}
ui64 GetRowsRead() const { return RowsRead; }
+ ui64 GetBytesRead() const { return BytesInResult > 0 ? BytesInResult : BlockBuilder.Bytes(); }
bool HadInvisibleRowSkips() const { return InvisibleRowSkips > 0; }
bool HadInconsistentResult() const { return HadInconsistentResult_; }
@@ -1317,7 +1322,7 @@ public:
auto it = Self->ReadIterators.find(readId);
if (it == Self->ReadIterators.end()) {
// the one who removed the iterator should have reply to user
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " has been invalidated before TReadOperation::SendResult()");
return;
}
@@ -1356,7 +1361,11 @@ public:
Y_ASSERT(BlockBuilder);
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
+ << " sends rowCount# " << Reader->GetRowsRead() << ", bytes# " << Reader->GetBytesRead()
+ << ", quota rows left# " << (state.Quota.Rows - Reader->GetRowsRead())
+ << ", quota bytes left# " << (state.Quota.Bytes - Reader->GetBytesRead())
+ << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
+ << ", total queries# " << Reader->GetQueriesCount()
<< ", firstUnprocessed# " << state.FirstUnprocessedQuery);
Reader->FillResult(*Result, state);
@@ -1394,9 +1403,13 @@ public:
ctx.Send(
Self->SelfId(),
new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId()));
+ } else {
+ Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
+ << " read iterator# " << readId << " exhausted");
}
} else {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " finished in read");
Self->DeleteReadIterator(it);
}
@@ -1870,7 +1883,7 @@ public:
Y_ASSERT(Result);
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
<< " ReadContinue: reader# " << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId
<< ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery);
@@ -2005,7 +2018,11 @@ public:
Y_ASSERT(BlockBuilder);
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " readContinue iterator# " << readId
- << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
+ << " sends rowCount# " << Reader->GetRowsRead() << ", bytes# " << Reader->GetBytesRead()
+ << ", quota rows left# " << (state.Quota.Rows - Reader->GetRowsRead())
+ << ", quota bytes left# " << (state.Quota.Bytes - Reader->GetBytesRead())
+ << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
+ << ", total queries# " << Reader->GetQueriesCount()
<< ", firstUnprocessed# " << state.FirstUnprocessedQuery);
Reader->FillResult(*Result, state);
@@ -2020,8 +2037,9 @@ public:
Self->SelfId(),
new TEvDataShard::TEvReadContinue(request->Reader, request->ReadId));
} else {
+ Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
- << " Read quota exhausted for " << request->Reader << "," << request->ReadId);
+ << " read iterator# " << readId << " exhausted");
}
} else {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
@@ -2328,13 +2346,14 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
record.GetMaxBytes());
if (wasExhausted && !state.IsExhausted()) {
+ DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
ctx.Send(
SelfId(),
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
}
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck: " << record
- << ", " << (wasExhausted ? "read continued" : "quota updated")
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck for read iterator# " << readId
+ << ": " << record << ", " << (wasExhausted ? "read continued" : "quota updated")
<< ", bytesLeft# " << state.Quota.Bytes << ", rowsLeft# " << state.Quota.Rows);
}
@@ -2343,7 +2362,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte
if (!record.HasReadId())
return;
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record);
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record);
TReadIteratorId readId(ev->Sender, record.GetReadId());
auto it = ReadIterators.find(readId);
@@ -2386,6 +2405,7 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr
ReadIteratorSessions.clear();
SetCounter(COUNTER_READ_ITERATORS_COUNT, 0);
+ SetCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT, 0);
}
void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) {
@@ -2397,6 +2417,9 @@ void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) {
session.Iterators.erase(it->first);
}
}
+ if (state->IsExhausted()) {
+ DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
+ }
ReadIterators.erase(it);
SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());
}
@@ -2411,6 +2434,7 @@ void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, cons
<< " closed session# " << sessionId << ", iterators# " << session.Iterators.size());
auto now = AppData()->MonotonicTimeProvider->Now();
+ ui64 exhaustedCount = 0;
for (const auto& readId: session.Iterators) {
// we don't send anything to client, because it's up
// to client to detect disconnect
@@ -2424,11 +2448,16 @@ void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, cons
IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds());
}
+ if (state->IsExhausted()) {
+ ++exhaustedCount;
+ }
+
ReadIterators.erase(it);
}
ReadIteratorSessions.erase(itSession);
SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());
+ DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT, exhaustedCount);
}
} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index ef53f5e38e..f16916b80f 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1360,6 +1360,14 @@ public:
TabletCounters->Simple()[counter].Set(num);
}
+ void DecCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const {
+ TabletCounters->Simple()[counter].Sub(num);
+ }
+
+ void IncCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const {
+ TabletCounters->Simple()[counter].Add(num);
+ }
+
void IncCounter(NDataShard::ECumulativeCounters counter, ui64 num = 1) const {
TabletCounters->Cumulative()[counter].Increment(num);
}