diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-12-21 20:01:00 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-12-21 20:01:00 +0300 |
commit | edc52b72be24334865c06097f3f274f88df7b3aa (patch) | |
tree | e57b0298e2e690af1a9585d0bcda8914a48bf64e | |
parent | 48f3e49f4b8f6684b3610c940c97c236ed1e0104 (diff) | |
download | ydb-edc52b72be24334865c06097f3f274f88df7b3aa.tar.gz |
add read iterator metrics
-rw-r--r-- | ydb/core/protos/counters_datashard.proto | 31 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 124 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 5 |
5 files changed, 140 insertions, 23 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index de095f9a4b..0a1e1dd451 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -22,6 +22,7 @@ enum ESimpleCounters { COUNTER_MVCC_ENABLED = 12 [(CounterOpts) = {Name: "MvccEnabled"}]; COUNTER_CHANGE_QUEUE_SIZE = 13 [(CounterOpts) = {Name: "ChangeQueueSize"}]; COUNTER_READ_ITERATORS_WAITING = 14 [(CounterOpts) = {Name: "ReadIteratorsWaiting"}]; + COUNTER_READ_ITERATORS_COUNT = 15 [(CounterOpts) = {Name: "ReadIteratorsCount"}]; } enum ECumulativeCounters { @@ -115,6 +116,12 @@ enum ECumulativeCounters { COUNTER_TX_COMPACT_BORROWED = 87 [(CounterOpts) = {Name: "TxCompactBorrowed"}]; COUNTER_CHANGE_RECORDS_ENQUEUED = 88 [(CounterOpts) = {Name: "ChangeRecordsEnqueued"}]; COUNTER_CHANGE_RECORDS_REMOVED = 89 [(CounterOpts) = {Name: "ChangeRecordsRemoved"}]; + COUNTER_READ_ITERATOR_NO_QUOTA = 90 [(CounterOpts) = {Name: "ReadIteratorNoQuota"}]; + COUNTER_READ_ITERATOR_MAX_ROWS_REACHED = 91 [(CounterOpts) = {Name: "ReadIteratorMaxRowsReached"}]; + COUNTER_READ_ITERATOR_MAX_TIME_REACHED = 92 [(CounterOpts) = {Name: "ReadIteratorMaxTimeReached"}]; + COUNTER_READ_ITERATOR_ROWS_READ = 93 [(CounterOpts) = {Name: "ReadIteratorRowsRead"}]; + COUNTER_READ_ITERATOR_BYTES_READ = 94 [(CounterOpts) = {Name: "ReadIteratorBytesRead"}]; + COUNTER_READ_ITERATOR_CANCEL = 95 [(CounterOpts) = {Name: "ReadIteratorCancel"}]; } enum EPercentileCounters { @@ -340,6 +347,30 @@ enum EPercentileCounters { Ranges: { Value: 15000 Name: "15000"}, Ranges: { Value: 30000 Name: "30000"} }]; + + COUNTER_READ_ITERATOR_LIFETIME_MS = 18 [(CounterOpts) = { + Name: "ReadIteratorLifetimeMs", + Ranges: { Value: 0 Name: "0"}, + Ranges: { Value: 1 Name: "1"}, + Ranges: { Value: 2 Name: "2"}, + Ranges: { Value: 5 Name: "5"}, + Ranges: { Value: 10 Name: "10"}, + Ranges: { Value: 25 Name: "25"}, + Ranges: { Value: 50 Name: "50"}, + Ranges: { Value: 125 Name: "125"}, + Ranges: { Value: 250 Name: "250"}, + Ranges: { Value: 500 Name: "500"}, + Ranges: { Value: 1000 Name: "1000"}, + }]; + + COUNTER_READ_ITERATOR_ITERATION_LATENCY_MS = 19 [(CounterOpts) = { + Name: "ReadIteratorIterationLatencyMs", + Ranges: { Value: 0 Name: "0"}, + Ranges: { Value: 1 Name: "1"}, + Ranges: { Value: 2 Name: "2"}, + Ranges: { Value: 5 Name: "5"}, + Ranges: { Value: 10 Name: "10"}, + }]; } enum ETxTypes { diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 6b35b2b68e..ac9412df64 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -226,6 +226,7 @@ class TReader { const TReadIteratorState& State; IBlockBuilder& BlockBuilder; const TShortTableInfo& TableInfo; + const TMonotonic StartTs; TDataShard* Self; std::vector<NScheme::TTypeInfo> ColumnTypes; @@ -238,7 +239,7 @@ class TReader { ui64 BytesInResult = 0; - bool HadInvisibleRowSkips_ = false; + ui64 InvisibleRowSkips = 0; bool HadInconsistentResult_ = false; NHPTimer::STime StartTime; @@ -259,10 +260,12 @@ public: TReader(TReadIteratorState& state, IBlockBuilder& blockBuilder, const TShortTableInfo& tableInfo, + TMonotonic ts, TDataShard* self) : State(state) , BlockBuilder(blockBuilder) , TableInfo(tableInfo) + , StartTs(ts) , Self(self) , FirstUnprocessedQuery(State.FirstUnprocessedQuery) { @@ -351,7 +354,7 @@ public: NTable::TSelectStats stats; auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); RowsSinceLastCheck += 1 + stats.InvisibleRowSkips; - HadInvisibleRowSkips_ |= stats.InvisibleRowSkips > 0; + InvisibleRowSkips += stats.InvisibleRowSkips; if (ready == NTable::EReady::Page) { return EReadStatus::NeedData; } @@ -455,21 +458,58 @@ public: return false; } - void FillResult(TEvDataShard::TEvReadResult& result) { + void FillResult(TEvDataShard::TEvReadResult& result, TDataShard& datashard, TReadIteratorState& state) { auto& record = result.Record; record.MutableStatus()->SetCode(Ydb::StatusIds::SUCCESS); + auto now = AppData()->MonotonicTimeProvider->Now(); + auto delta = now - StartTs; + datashard.IncCounter(COUNTER_READ_ITERATOR_ITERATION_LATENCY_MS, delta.MilliSeconds()); + + // note that in all metrics below we treat key prefix read as key read + // and not as range read + const bool isKeysRequest = !State.Request->Keys.empty(); + if (HasUnreadQueries()) { if (OutOfQuota()) { + datashard.IncCounter(COUNTER_READ_ITERATOR_NO_QUOTA); record.SetLimitReached(true); + } else if (HasMaxRowsInResult()) { + datashard.IncCounter(COUNTER_READ_ITERATOR_MAX_ROWS_REACHED); + } else { + datashard.IncCounter(COUNTER_READ_ITERATOR_MAX_TIME_REACHED); } } else { + state.IsFinished = true; record.SetFinished(true); + auto fullDelta = now - State.StartTs; + datashard.IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, fullDelta.MilliSeconds()); + + if (isKeysRequest) { + datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_ROW, State.Request->Keys.size()); + datashard.IncCounter(COUNTER_SELECT_ROWS_PER_REQUEST, State.Request->Keys.size()); + } else { + datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE, State.Request->Ranges.size()); + } } + if (!isKeysRequest) + datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROW_SKIPS, InvisibleRowSkips); + BytesInResult = BlockBuilder.Bytes(); + if (BytesInResult) { + datashard.IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead); + datashard.IncCounter(COUNTER_READ_ITERATOR_BYTES_READ, BytesInResult); + if (isKeysRequest) { + // backward compatibility + datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_ROW_BYTES, BytesInResult); + } else { + // backward compatibility + datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROWS, RowsRead); + datashard.IncCounter(COUNTER_RANGE_READ_ROWS_PER_REQUEST, RowsRead); + datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_BYTES, BytesInResult); + } - if (BytesInResult = BlockBuilder.Bytes()) { switch (State.Format) { case NKikimrTxDataShard::ARROW: { auto& arrowBuilder = static_cast<NArrow::TArrowBatchBuilder&>(BlockBuilder); @@ -516,7 +556,7 @@ public: } ui64 GetRowsRead() const { return RowsRead; } - bool HadInvisibleRowSkips() const { return HadInvisibleRowSkips_; } + bool HadInvisibleRowSkips() const { return InvisibleRowSkips > 0; } bool HadInconsistentResult() const { return HadInconsistentResult_; } private: @@ -573,7 +613,7 @@ private: BlockBuilder.AddRow(TDbTupleRef(), rowValues); ++RowsRead; - HadInvisibleRowSkips_ |= iter->Stats.InvisibleRowSkips > 0; + InvisibleRowSkips += iter->Stats.InvisibleRowSkips; RowsSinceLastCheck += 1 + ResetRowStats(iter->Stats); if (ShouldStop()) { return EReadStatus::StoppedByLimit; @@ -582,7 +622,7 @@ private: // last iteration to Page or Gone also might have deleted or invisible rows RowsSinceLastCheck += ResetRowStats(iter->Stats); - HadInvisibleRowSkips_ |= iter->Stats.InvisibleRowSkips > 0; + InvisibleRowSkips += iter->Stats.InvisibleRowSkips; // TODO: consider restart when Page and too few data read // (how much is too few, less than user's limit?) @@ -1182,7 +1222,7 @@ public: << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries() << ", firstUnprocessed# " << state.FirstUnprocessedQuery); - Reader->FillResult(*Result); + Reader->FillResult(*Result, *Self, state); Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); } @@ -1299,7 +1339,13 @@ private: Y_ASSERT(Result); - Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self)); + Reader.reset(new TReader( + state, + *BlockBuilder, + TableInfo, + AppData()->MonotonicTimeProvider->Now(), + Self)); + return Reader->Read(txc, ctx); } @@ -1693,7 +1739,13 @@ public: TDataShardLocksDb locksDb(*Self, txc); TSetupSysLocks guardLocks(state.LockId, state.LockNodeId, *Self, &locksDb); - Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self)); + Reader.reset(new TReader( + state, + *BlockBuilder, + TableInfo, + AppData()->MonotonicTimeProvider->Now(), + Self)); + if (Reader->Read(txc, ctx)) { SendResult(txc, ctx); return true; @@ -1787,7 +1839,7 @@ public: << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries() << ", firstUnprocessed# " << state.FirstUnprocessedQuery); - Reader->FillResult(*Result); + Reader->FillResult(*Result, *Self, state); Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); if (Reader->HasUnreadQueries()) { @@ -2024,7 +2076,12 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct sessionId = ev->InterconnectSession; } - ReadIterators.emplace(readId, new TReadIteratorState(sessionId, isHeadRead)); + ReadIterators.emplace( + readId, + new TReadIteratorState(sessionId, isHeadRead, AppData()->MonotonicTimeProvider->Now())); + + SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); + Executor()->Execute(new TTxReadViaPipeline(this, ev), ctx); } @@ -2125,15 +2182,33 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record); TReadIteratorId readId(ev->Sender, record.GetReadId()); - DeleteReadIterator(readId); + auto it = ReadIterators.find(readId); + if (it == ReadIterators.end()) + return; + + const auto& state = it->second; + if (!state->IsFinished) { + auto now = AppData()->MonotonicTimeProvider->Now(); + auto delta = now - state->StartTs; + IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds()); + IncCounter(COUNTER_READ_ITERATOR_CANCEL); + } + + DeleteReadIterator(it); } void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " CancelReadIterators #" << ReadIterators.size()); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " CancelReadIterators#" << ReadIterators.size()); + auto now = AppData()->MonotonicTimeProvider->Now(); for (const auto& iterator: ReadIterators) { const auto& readIteratorId = iterator.first; + const auto& state = iterator.second; + if (!state->IsFinished) { + auto delta = now - state->StartTs; + IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds()); + } std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); SetStatusError(result->Record, code, issue); @@ -2145,12 +2220,8 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr ReadIterators.clear(); ReadIteratorSessions.clear(); -} -void TDataShard::DeleteReadIterator(const TReadIteratorId& readId) { - auto it = ReadIterators.find(readId); - if (it != ReadIterators.end()) - DeleteReadIterator(it); + SetCounter(COUNTER_READ_ITERATORS_COUNT, 0); } void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) { @@ -2163,6 +2234,7 @@ void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) { } } ReadIterators.erase(it); + SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); } void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx) { @@ -2174,13 +2246,25 @@ void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, cons LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " closed session# " << sessionId << ", iterators# " << session.Iterators.size()); + auto now = AppData()->MonotonicTimeProvider->Now(); for (const auto& readId: session.Iterators) { // we don't send anything to client, because it's up // to client to detect disconnect - ReadIterators.erase(readId); + auto it = ReadIterators.find(readId); + if (it == ReadIterators.end()) + continue; + + const auto& state = it->second; + if (!state->IsFinished) { + auto delta = now - state->StartTs; + IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds()); + } + + ReadIterators.erase(it); } ReadIteratorSessions.erase(itSession); + SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); } } // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index f23838a66a..1ec662e9e7 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1732,7 +1732,6 @@ public: bool CheckChangesQueueOverflow() const; - void DeleteReadIterator(const TReadIteratorId& readId); void DeleteReadIterator(TReadIteratorsMap::iterator it); void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx); void ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx); diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index dc39356ec0..261b67f43d 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -2803,7 +2803,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { Y_UNIT_TEST(ShouldCalculateQuota) { - NDataShard::TReadIteratorState state({}, false); + NDataShard::TReadIteratorState state({}, false, {}); state.Quota.Rows = 100; state.Quota.Bytes = 1000; state.ConsumeSeqNo(10, 100); // seqno1 diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 6d984c50d9..478753baa4 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -64,9 +64,10 @@ struct TReadIteratorState { }; public: - TReadIteratorState(const TActorId& sessionId, bool isHeadRead) + TReadIteratorState(const TActorId& sessionId, bool isHeadRead, TMonotonic ts) : IsHeadRead(isHeadRead) , SessionId(sessionId) + , StartTs(ts) {} bool IsExhausted() const { return State == EState::Exhausted; } @@ -189,6 +190,8 @@ public: TQuota AckedReads; TActorId SessionId; + TMonotonic StartTs; + bool IsFinished = false; // note that we send SeqNo's starting from 1 ui64 SeqNo = 0; |