aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-12-21 20:01:00 +0300
committereivanov89 <eivanov89@ydb.tech>2022-12-21 20:01:00 +0300
commitedc52b72be24334865c06097f3f274f88df7b3aa (patch)
treee57b0298e2e690af1a9585d0bcda8914a48bf64e
parent48f3e49f4b8f6684b3610c940c97c236ed1e0104 (diff)
downloadydb-edc52b72be24334865c06097f3f274f88df7b3aa.tar.gz
add read iterator metrics
-rw-r--r--ydb/core/protos/counters_datashard.proto31
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp124
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp2
-rw-r--r--ydb/core/tx/datashard/read_iterator.h5
5 files changed, 140 insertions, 23 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index de095f9a4b..0a1e1dd451 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -22,6 +22,7 @@ enum ESimpleCounters {
COUNTER_MVCC_ENABLED = 12 [(CounterOpts) = {Name: "MvccEnabled"}];
COUNTER_CHANGE_QUEUE_SIZE = 13 [(CounterOpts) = {Name: "ChangeQueueSize"}];
COUNTER_READ_ITERATORS_WAITING = 14 [(CounterOpts) = {Name: "ReadIteratorsWaiting"}];
+ COUNTER_READ_ITERATORS_COUNT = 15 [(CounterOpts) = {Name: "ReadIteratorsCount"}];
}
enum ECumulativeCounters {
@@ -115,6 +116,12 @@ enum ECumulativeCounters {
COUNTER_TX_COMPACT_BORROWED = 87 [(CounterOpts) = {Name: "TxCompactBorrowed"}];
COUNTER_CHANGE_RECORDS_ENQUEUED = 88 [(CounterOpts) = {Name: "ChangeRecordsEnqueued"}];
COUNTER_CHANGE_RECORDS_REMOVED = 89 [(CounterOpts) = {Name: "ChangeRecordsRemoved"}];
+ COUNTER_READ_ITERATOR_NO_QUOTA = 90 [(CounterOpts) = {Name: "ReadIteratorNoQuota"}];
+ COUNTER_READ_ITERATOR_MAX_ROWS_REACHED = 91 [(CounterOpts) = {Name: "ReadIteratorMaxRowsReached"}];
+ COUNTER_READ_ITERATOR_MAX_TIME_REACHED = 92 [(CounterOpts) = {Name: "ReadIteratorMaxTimeReached"}];
+ COUNTER_READ_ITERATOR_ROWS_READ = 93 [(CounterOpts) = {Name: "ReadIteratorRowsRead"}];
+ COUNTER_READ_ITERATOR_BYTES_READ = 94 [(CounterOpts) = {Name: "ReadIteratorBytesRead"}];
+ COUNTER_READ_ITERATOR_CANCEL = 95 [(CounterOpts) = {Name: "ReadIteratorCancel"}];
}
enum EPercentileCounters {
@@ -340,6 +347,30 @@ enum EPercentileCounters {
Ranges: { Value: 15000 Name: "15000"},
Ranges: { Value: 30000 Name: "30000"}
}];
+
+ COUNTER_READ_ITERATOR_LIFETIME_MS = 18 [(CounterOpts) = {
+ Name: "ReadIteratorLifetimeMs",
+ Ranges: { Value: 0 Name: "0"},
+ Ranges: { Value: 1 Name: "1"},
+ Ranges: { Value: 2 Name: "2"},
+ Ranges: { Value: 5 Name: "5"},
+ Ranges: { Value: 10 Name: "10"},
+ Ranges: { Value: 25 Name: "25"},
+ Ranges: { Value: 50 Name: "50"},
+ Ranges: { Value: 125 Name: "125"},
+ Ranges: { Value: 250 Name: "250"},
+ Ranges: { Value: 500 Name: "500"},
+ Ranges: { Value: 1000 Name: "1000"},
+ }];
+
+ COUNTER_READ_ITERATOR_ITERATION_LATENCY_MS = 19 [(CounterOpts) = {
+ Name: "ReadIteratorIterationLatencyMs",
+ Ranges: { Value: 0 Name: "0"},
+ Ranges: { Value: 1 Name: "1"},
+ Ranges: { Value: 2 Name: "2"},
+ Ranges: { Value: 5 Name: "5"},
+ Ranges: { Value: 10 Name: "10"},
+ }];
}
enum ETxTypes {
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 6b35b2b68e..ac9412df64 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -226,6 +226,7 @@ class TReader {
const TReadIteratorState& State;
IBlockBuilder& BlockBuilder;
const TShortTableInfo& TableInfo;
+ const TMonotonic StartTs;
TDataShard* Self;
std::vector<NScheme::TTypeInfo> ColumnTypes;
@@ -238,7 +239,7 @@ class TReader {
ui64 BytesInResult = 0;
- bool HadInvisibleRowSkips_ = false;
+ ui64 InvisibleRowSkips = 0;
bool HadInconsistentResult_ = false;
NHPTimer::STime StartTime;
@@ -259,10 +260,12 @@ public:
TReader(TReadIteratorState& state,
IBlockBuilder& blockBuilder,
const TShortTableInfo& tableInfo,
+ TMonotonic ts,
TDataShard* self)
: State(state)
, BlockBuilder(blockBuilder)
, TableInfo(tableInfo)
+ , StartTs(ts)
, Self(self)
, FirstUnprocessedQuery(State.FirstUnprocessedQuery)
{
@@ -351,7 +354,7 @@ public:
NTable::TSelectStats stats;
auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
RowsSinceLastCheck += 1 + stats.InvisibleRowSkips;
- HadInvisibleRowSkips_ |= stats.InvisibleRowSkips > 0;
+ InvisibleRowSkips += stats.InvisibleRowSkips;
if (ready == NTable::EReady::Page) {
return EReadStatus::NeedData;
}
@@ -455,21 +458,58 @@ public:
return false;
}
- void FillResult(TEvDataShard::TEvReadResult& result) {
+ void FillResult(TEvDataShard::TEvReadResult& result, TDataShard& datashard, TReadIteratorState& state) {
auto& record = result.Record;
record.MutableStatus()->SetCode(Ydb::StatusIds::SUCCESS);
+ auto now = AppData()->MonotonicTimeProvider->Now();
+ auto delta = now - StartTs;
+ datashard.IncCounter(COUNTER_READ_ITERATOR_ITERATION_LATENCY_MS, delta.MilliSeconds());
+
+ // note that in all metrics below we treat key prefix read as key read
+ // and not as range read
+ const bool isKeysRequest = !State.Request->Keys.empty();
+
if (HasUnreadQueries()) {
if (OutOfQuota()) {
+ datashard.IncCounter(COUNTER_READ_ITERATOR_NO_QUOTA);
record.SetLimitReached(true);
+ } else if (HasMaxRowsInResult()) {
+ datashard.IncCounter(COUNTER_READ_ITERATOR_MAX_ROWS_REACHED);
+ } else {
+ datashard.IncCounter(COUNTER_READ_ITERATOR_MAX_TIME_REACHED);
}
} else {
+ state.IsFinished = true;
record.SetFinished(true);
+ auto fullDelta = now - State.StartTs;
+ datashard.IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, fullDelta.MilliSeconds());
+
+ if (isKeysRequest) {
+ datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_ROW, State.Request->Keys.size());
+ datashard.IncCounter(COUNTER_SELECT_ROWS_PER_REQUEST, State.Request->Keys.size());
+ } else {
+ datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE, State.Request->Ranges.size());
+ }
}
+ if (!isKeysRequest)
+ datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROW_SKIPS, InvisibleRowSkips);
+
BytesInResult = BlockBuilder.Bytes();
+ if (BytesInResult) {
+ datashard.IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead);
+ datashard.IncCounter(COUNTER_READ_ITERATOR_BYTES_READ, BytesInResult);
+ if (isKeysRequest) {
+ // backward compatibility
+ datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_ROW_BYTES, BytesInResult);
+ } else {
+ // backward compatibility
+ datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROWS, RowsRead);
+ datashard.IncCounter(COUNTER_RANGE_READ_ROWS_PER_REQUEST, RowsRead);
+ datashard.IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_BYTES, BytesInResult);
+ }
- if (BytesInResult = BlockBuilder.Bytes()) {
switch (State.Format) {
case NKikimrTxDataShard::ARROW: {
auto& arrowBuilder = static_cast<NArrow::TArrowBatchBuilder&>(BlockBuilder);
@@ -516,7 +556,7 @@ public:
}
ui64 GetRowsRead() const { return RowsRead; }
- bool HadInvisibleRowSkips() const { return HadInvisibleRowSkips_; }
+ bool HadInvisibleRowSkips() const { return InvisibleRowSkips > 0; }
bool HadInconsistentResult() const { return HadInconsistentResult_; }
private:
@@ -573,7 +613,7 @@ private:
BlockBuilder.AddRow(TDbTupleRef(), rowValues);
++RowsRead;
- HadInvisibleRowSkips_ |= iter->Stats.InvisibleRowSkips > 0;
+ InvisibleRowSkips += iter->Stats.InvisibleRowSkips;
RowsSinceLastCheck += 1 + ResetRowStats(iter->Stats);
if (ShouldStop()) {
return EReadStatus::StoppedByLimit;
@@ -582,7 +622,7 @@ private:
// last iteration to Page or Gone also might have deleted or invisible rows
RowsSinceLastCheck += ResetRowStats(iter->Stats);
- HadInvisibleRowSkips_ |= iter->Stats.InvisibleRowSkips > 0;
+ InvisibleRowSkips += iter->Stats.InvisibleRowSkips;
// TODO: consider restart when Page and too few data read
// (how much is too few, less than user's limit?)
@@ -1182,7 +1222,7 @@ public:
<< " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
<< ", firstUnprocessed# " << state.FirstUnprocessedQuery);
- Reader->FillResult(*Result);
+ Reader->FillResult(*Result, *Self, state);
Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
}
@@ -1299,7 +1339,13 @@ private:
Y_ASSERT(Result);
- Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self));
+ Reader.reset(new TReader(
+ state,
+ *BlockBuilder,
+ TableInfo,
+ AppData()->MonotonicTimeProvider->Now(),
+ Self));
+
return Reader->Read(txc, ctx);
}
@@ -1693,7 +1739,13 @@ public:
TDataShardLocksDb locksDb(*Self, txc);
TSetupSysLocks guardLocks(state.LockId, state.LockNodeId, *Self, &locksDb);
- Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self));
+ Reader.reset(new TReader(
+ state,
+ *BlockBuilder,
+ TableInfo,
+ AppData()->MonotonicTimeProvider->Now(),
+ Self));
+
if (Reader->Read(txc, ctx)) {
SendResult(txc, ctx);
return true;
@@ -1787,7 +1839,7 @@ public:
<< " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
<< ", firstUnprocessed# " << state.FirstUnprocessedQuery);
- Reader->FillResult(*Result);
+ Reader->FillResult(*Result, *Self, state);
Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
if (Reader->HasUnreadQueries()) {
@@ -2024,7 +2076,12 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
sessionId = ev->InterconnectSession;
}
- ReadIterators.emplace(readId, new TReadIteratorState(sessionId, isHeadRead));
+ ReadIterators.emplace(
+ readId,
+ new TReadIteratorState(sessionId, isHeadRead, AppData()->MonotonicTimeProvider->Now()));
+
+ SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());
+
Executor()->Execute(new TTxReadViaPipeline(this, ev), ctx);
}
@@ -2125,15 +2182,33 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record);
TReadIteratorId readId(ev->Sender, record.GetReadId());
- DeleteReadIterator(readId);
+ auto it = ReadIterators.find(readId);
+ if (it == ReadIterators.end())
+ return;
+
+ const auto& state = it->second;
+ if (!state->IsFinished) {
+ auto now = AppData()->MonotonicTimeProvider->Now();
+ auto delta = now - state->StartTs;
+ IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds());
+ IncCounter(COUNTER_READ_ITERATOR_CANCEL);
+ }
+
+ DeleteReadIterator(it);
}
void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " CancelReadIterators #" << ReadIterators.size());
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " CancelReadIterators#" << ReadIterators.size());
+ auto now = AppData()->MonotonicTimeProvider->Now();
for (const auto& iterator: ReadIterators) {
const auto& readIteratorId = iterator.first;
+
const auto& state = iterator.second;
+ if (!state->IsFinished) {
+ auto delta = now - state->StartTs;
+ IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds());
+ }
std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
SetStatusError(result->Record, code, issue);
@@ -2145,12 +2220,8 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr
ReadIterators.clear();
ReadIteratorSessions.clear();
-}
-void TDataShard::DeleteReadIterator(const TReadIteratorId& readId) {
- auto it = ReadIterators.find(readId);
- if (it != ReadIterators.end())
- DeleteReadIterator(it);
+ SetCounter(COUNTER_READ_ITERATORS_COUNT, 0);
}
void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) {
@@ -2163,6 +2234,7 @@ void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) {
}
}
ReadIterators.erase(it);
+ SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());
}
void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx) {
@@ -2174,13 +2246,25 @@ void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, cons
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " closed session# " << sessionId << ", iterators# " << session.Iterators.size());
+ auto now = AppData()->MonotonicTimeProvider->Now();
for (const auto& readId: session.Iterators) {
// we don't send anything to client, because it's up
// to client to detect disconnect
- ReadIterators.erase(readId);
+ auto it = ReadIterators.find(readId);
+ if (it == ReadIterators.end())
+ continue;
+
+ const auto& state = it->second;
+ if (!state->IsFinished) {
+ auto delta = now - state->StartTs;
+ IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds());
+ }
+
+ ReadIterators.erase(it);
}
ReadIteratorSessions.erase(itSession);
+ SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());
}
} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index f23838a66a..1ec662e9e7 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1732,7 +1732,6 @@ public:
bool CheckChangesQueueOverflow() const;
- void DeleteReadIterator(const TReadIteratorId& readId);
void DeleteReadIterator(TReadIteratorsMap::iterator it);
void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx);
void ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx);
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index dc39356ec0..261b67f43d 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -2803,7 +2803,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {
Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
Y_UNIT_TEST(ShouldCalculateQuota) {
- NDataShard::TReadIteratorState state({}, false);
+ NDataShard::TReadIteratorState state({}, false, {});
state.Quota.Rows = 100;
state.Quota.Bytes = 1000;
state.ConsumeSeqNo(10, 100); // seqno1
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 6d984c50d9..478753baa4 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -64,9 +64,10 @@ struct TReadIteratorState {
};
public:
- TReadIteratorState(const TActorId& sessionId, bool isHeadRead)
+ TReadIteratorState(const TActorId& sessionId, bool isHeadRead, TMonotonic ts)
: IsHeadRead(isHeadRead)
, SessionId(sessionId)
+ , StartTs(ts)
{}
bool IsExhausted() const { return State == EState::Exhausted; }
@@ -189,6 +190,8 @@ public:
TQuota AckedReads;
TActorId SessionId;
+ TMonotonic StartTs;
+ bool IsFinished = false;
// note that we send SeqNo's starting from 1
ui64 SeqNo = 0;