diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-02-07 14:46:05 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-02-07 14:46:05 +0300 |
commit | da52d09c630930d5714319c800c74bfda8cc3fb0 (patch) | |
tree | 8fc4b2bfdd8f0ad5e0c829a4d0a8e6d20bde5777 | |
parent | bb37a8b74e61bb96576eca047448e51db75669ca (diff) | |
download | ydb-da52d09c630930d5714319c800c74bfda8cc3fb0.tar.gz |
PR from branch users/eivanov89/-read-iterator-logs
add COUNTER_READ_ITERATORS_EXHAUSTED_COUNT
cleanup read iterator logging
-rw-r--r-- | ydb/core/protos/counters_datashard.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 47 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 8 |
3 files changed, 47 insertions, 9 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 1b3ac6a39a..a43c735c47 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -23,6 +23,7 @@ enum ESimpleCounters { COUNTER_CHANGE_QUEUE_SIZE = 13 [(CounterOpts) = {Name: "ChangeQueueSize"}]; COUNTER_READ_ITERATORS_WAITING = 14 [(CounterOpts) = {Name: "ReadIteratorsWaiting"}]; COUNTER_READ_ITERATORS_COUNT = 15 [(CounterOpts) = {Name: "ReadIteratorsCount"}]; + COUNTER_READ_ITERATORS_EXHAUSTED_COUNT = 16 [(CounterOpts) = {Name: "ReadIteratorsExhaustedCount"}]; } enum ECumulativeCounters { diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index dc61aa88d3..dfbbce3655 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -476,6 +476,10 @@ public: || FirstUnprocessedQuery < State.Request->Ranges.size(); } + size_t GetQueriesCount() const { + return State.Request->Keys.size() + State.Request->Ranges.size(); + } + void UpdateCycles() { GetTimeFast(&EndTime); } @@ -602,6 +606,7 @@ public: } ui64 GetRowsRead() const { return RowsRead; } + ui64 GetBytesRead() const { return BytesInResult > 0 ? BytesInResult : BlockBuilder.Bytes(); } bool HadInvisibleRowSkips() const { return InvisibleRowSkips > 0; } bool HadInconsistentResult() const { return HadInconsistentResult_; } @@ -1317,7 +1322,7 @@ public: auto it = Self->ReadIterators.find(readId); if (it == Self->ReadIterators.end()) { // the one who removed the iterator should have reply to user - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " has been invalidated before TReadOperation::SendResult()"); return; } @@ -1356,7 +1361,11 @@ public: Y_ASSERT(BlockBuilder); LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries() + << " sends rowCount# " << Reader->GetRowsRead() << ", bytes# " << Reader->GetBytesRead() + << ", quota rows left# " << (state.Quota.Rows - Reader->GetRowsRead()) + << ", quota bytes left# " << (state.Quota.Bytes - Reader->GetBytesRead()) + << ", hasUnreadQueries# " << Reader->HasUnreadQueries() + << ", total queries# " << Reader->GetQueriesCount() << ", firstUnprocessed# " << state.FirstUnprocessedQuery); Reader->FillResult(*Result, state); @@ -1394,9 +1403,13 @@ public: ctx.Send( Self->SelfId(), new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId())); + } else { + Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() + << " read iterator# " << readId << " exhausted"); } } else { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " finished in read"); Self->DeleteReadIterator(it); } @@ -1870,7 +1883,7 @@ public: Y_ASSERT(Result); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue: reader# " << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId << ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery); @@ -2005,7 +2018,11 @@ public: Y_ASSERT(BlockBuilder); LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " readContinue iterator# " << readId - << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries() + << " sends rowCount# " << Reader->GetRowsRead() << ", bytes# " << Reader->GetBytesRead() + << ", quota rows left# " << (state.Quota.Rows - Reader->GetRowsRead()) + << ", quota bytes left# " << (state.Quota.Bytes - Reader->GetBytesRead()) + << ", hasUnreadQueries# " << Reader->HasUnreadQueries() + << ", total queries# " << Reader->GetQueriesCount() << ", firstUnprocessed# " << state.FirstUnprocessedQuery); Reader->FillResult(*Result, state); @@ -2020,8 +2037,9 @@ public: Self->SelfId(), new TEvDataShard::TEvReadContinue(request->Reader, request->ReadId)); } else { + Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() - << " Read quota exhausted for " << request->Reader << "," << request->ReadId); + << " read iterator# " << readId << " exhausted"); } } else { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId @@ -2328,13 +2346,14 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& record.GetMaxBytes()); if (wasExhausted && !state.IsExhausted()) { + DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); ctx.Send( SelfId(), new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId())); } - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck: " << record - << ", " << (wasExhausted ? "read continued" : "quota updated") + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck for read iterator# " << readId + << ": " << record << ", " << (wasExhausted ? "read continued" : "quota updated") << ", bytesLeft# " << state.Quota.Bytes << ", rowsLeft# " << state.Quota.Rows); } @@ -2343,7 +2362,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte if (!record.HasReadId()) return; - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record); + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record); TReadIteratorId readId(ev->Sender, record.GetReadId()); auto it = ReadIterators.find(readId); @@ -2386,6 +2405,7 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr ReadIteratorSessions.clear(); SetCounter(COUNTER_READ_ITERATORS_COUNT, 0); + SetCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT, 0); } void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) { @@ -2397,6 +2417,9 @@ void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) { session.Iterators.erase(it->first); } } + if (state->IsExhausted()) { + DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); + } ReadIterators.erase(it); SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); } @@ -2411,6 +2434,7 @@ void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, cons << " closed session# " << sessionId << ", iterators# " << session.Iterators.size()); auto now = AppData()->MonotonicTimeProvider->Now(); + ui64 exhaustedCount = 0; for (const auto& readId: session.Iterators) { // we don't send anything to client, because it's up // to client to detect disconnect @@ -2424,11 +2448,16 @@ void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, cons IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds()); } + if (state->IsExhausted()) { + ++exhaustedCount; + } + ReadIterators.erase(it); } ReadIteratorSessions.erase(itSession); SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); + DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT, exhaustedCount); } } // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index ef53f5e38e..f16916b80f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1360,6 +1360,14 @@ public: TabletCounters->Simple()[counter].Set(num); } + void DecCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const { + TabletCounters->Simple()[counter].Sub(num); + } + + void IncCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const { + TabletCounters->Simple()[counter].Add(num); + } + void IncCounter(NDataShard::ECumulativeCounters counter, ui64 num = 1) const { TabletCounters->Cumulative()[counter].Increment(num); } |