aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-07-24 15:36:43 +0300
committerssmike <ssmike@ydb.tech>2023-07-24 15:36:43 +0300
commit441a59ffc7f7913545a87c67e8c66b4a4dfc11e5 (patch)
treef4d8516dee8b961a6687d3e664ebe598e26016bd
parentc3f6b5b8a4c8e976c40f36f3ae4e36011ed24ab2 (diff)
downloadydb-441a59ffc7f7913545a87c67e8c66b4a4dfc11e5.tar.gz
Remote fetch stats for iterator reads
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp3
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp27
-rw-r--r--ydb/core/protos/tx_datashard.proto2
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp68
5 files changed, 78 insertions, 25 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 26d1b4a266..bec70b79a1 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -787,6 +787,9 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
SentIteratorCancels = KqpGroup->GetCounter("IteratorReads/SentCancels", true);
CreatedIterators = KqpGroup->GetCounter("IteratorReads/Created", true);
ReadActorsCount = KqpGroup->GetCounter("IteratorReads/ReadActorCount", false);
+ ReadActorRemoteFetch = KqpGroup->GetCounter("IteratorReads/ReadActorRemoteFetch", true);
+ ReadActorRemoteFirstFetch = KqpGroup->GetCounter("IteratorReads/ReadActorRemoteFirstFetch", true);
+ ReadActorAbsentNodeId = KqpGroup->GetCounter("IteratorReads/AbsentNodeId", true);
StreamLookupActorsCount = KqpGroup->GetCounter("IteratorReads/StreamLookupActorCount", false);
ReadActorRetries = KqpGroup->GetCounter("IteratorReads/Retries", true);
DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true);
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index 07f2629609..7cfc101f01 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -384,6 +384,9 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr SentIteratorCancels;
::NMonitoring::TDynamicCounters::TCounterPtr CreatedIterators;
::NMonitoring::TDynamicCounters::TCounterPtr ReadActorsCount;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRemoteFirstFetch;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRemoteFetch;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorAbsentNodeId;
::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupActorsCount;
::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries;
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails;
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index b8c7c34d67..caaf1fc52b 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -131,6 +131,10 @@ public:
size_t RetryAttempt = 0;
size_t SuccessBatches = 0;
+ TMaybe<ui32> NodeId = {};
+ bool IsFirst = false;
+
+
TShardState(ui64 tabletId)
: TabletId(tabletId)
{
@@ -875,6 +879,11 @@ public:
Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
+ if (!FirstShardStarted) {
+ state->IsFirst = true;
+ }
+ FirstShardStarted = true;
+
if (auto delay = ShardTimeout()) {
TlsActivationContext->Schedule(*delay, new IEventHandle(SelfId(), SelfId(), new TEvRetryShard(id, Reads[id].LastSeqNo)));
}
@@ -905,6 +914,22 @@ public:
return;
}
+ if (!record.HasNodeId()) {
+ Counters->ReadActorAbsentNodeId->Inc();
+ } else if (record.GetNodeId() != SelfId().NodeId()) {
+ auto* state = Reads[id].Shard;
+ if (!state->NodeId) {
+ state->NodeId = record.GetNodeId();
+ CA_LOG_D("Node mismatch for tablet " << state->TabletId << " " << *state->NodeId << " != SelfId: " << SelfId().NodeId());
+ if (state->IsFirst) {
+ Counters->ReadActorRemoteFirstFetch->Inc();
+ }
+ Counters->ReadActorRemoteFetch->Inc();
+ }
+ } else {
+ CA_LOG_T("Node match for tablet " << Reads[id].Shard->TabletId);
+ }
+
Counters->DataShardIteratorMessages->Inc();
if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
Counters->DataShardIteratorFails->Inc();
@@ -1402,6 +1427,8 @@ private:
NActors::TActorId PipeCacheId;
size_t TotalRetries = 0;
+
+ bool FirstShardStarted = false;
};
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 6b1a4fffb1..3d2dd9ad3b 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1726,6 +1726,8 @@ message TEvReadResult {
optional uint64 RowCount = 12;
+ optional uint32 NodeId = 13;
+
// Data for the possibly partial result
oneof ReadResult {
TArrowBatch ArrowBatch = 900;
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 29b502d665..81e84575e2 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -996,6 +996,13 @@ private:
}
};
+std::unique_ptr<TEvDataShard::TEvReadResult> MakeEvReadResult(ui32 nodeId) {
+ auto result = std::make_unique<TEvDataShard::TEvReadResult>();
+ result->Record.SetNodeId(nodeId);
+ return result;
+}
+
+
const NHPTimer::STime TReader::MaxCyclesPerIteration =
/* 10ms */ (NHPTimer::GetCyclesPerSecond() + 99) / 100;
@@ -1214,7 +1221,7 @@ public:
Y_VERIFY(ok, "Unexpected failure to attach a blocked operation");
}
Reader.reset();
- Result.reset(new TEvDataShard::TEvReadResult());
+ Result = MakeEvReadResult(ctx.SelfID.NodeId());
return EExecutionStatus::Continue;
}
@@ -1242,7 +1249,7 @@ public:
AddDependency(it->second);
// Make sure current incomplete result will not be sent
- Result.reset(new TEvDataShard::TEvReadResult());
+ Result = MakeEvReadResult(ctx.SelfID.NodeId());
return EExecutionStatus::Continue;
}
@@ -1256,7 +1263,7 @@ public:
SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true);
// Make sure current incomplete result will not be sent
- Result.reset(new TEvDataShard::TEvReadResult());
+ Result = MakeEvReadResult(ctx.SelfID.NodeId());
return EExecutionStatus::Reschedule;
}
@@ -1309,7 +1316,7 @@ public:
auto& state = *it->second;
Y_VERIFY(state.State == TReadIteratorState::EState::Init);
- Result.reset(new TEvDataShard::TEvReadResult());
+ Result = MakeEvReadResult(ctx.SelfID.NodeId());
const auto& record = Request->Record;
@@ -1454,7 +1461,7 @@ public:
if (!Result) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TReadOperation::Execute() finished without Result, aborting");
- Result.reset(new TEvDataShard::TEvReadResult());
+ Result = MakeEvReadResult(ctx.SelfID.NodeId());
SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
@@ -1822,7 +1829,8 @@ public:
}
ReplyError(
Ydb::StatusIds::INTERNAL_ERROR,
- TStringBuilder() << "Failed to sync follower: " << errMessage);
+ TStringBuilder() << "Failed to sync follower: " << errMessage,
+ ctx.SelfID.NodeId());
return true;
}
}
@@ -1846,7 +1854,8 @@ public:
Ydb::StatusIds::BAD_REQUEST,
TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId
<< ", tableId: " << state.PathId.LocalPathId
- << ", from wrong owner: " << Self->GetPathOwnerId());
+ << ", from wrong owner: " << Self->GetPathOwnerId(),
+ ctx.SelfID.NodeId());
return true;
}
@@ -1855,7 +1864,8 @@ public:
if (it == Self->TableInfos.end()) {
ReplyError(
Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Unknown table id: " << tableId);
+ TStringBuilder() << "Unknown table id: " << tableId,
+ ctx.SelfID.NodeId());
return true;
}
@@ -1863,14 +1873,16 @@ public:
if (userTableInfo->IsBackup) {
ReplyError(
Ydb::StatusIds::BAD_REQUEST,
- "Can't read from a backup table");
+ "Can't read from a backup table",
+ ctx.SelfID.NodeId());
return true;
}
if (!Self->IsMvccEnabled()) {
ReplyError(
Ydb::StatusIds::UNSUPPORTED,
- "Cannot use read iterators without mvcc");
+ "Cannot use read iterators without mvcc",
+ ctx.SelfID.NodeId());
return true;
}
@@ -1940,7 +1952,8 @@ public:
TStringBuilder() << "Table id " << tableId << " has no snapshot at "
<< state.ReadVersion << " shard " << Self->TabletID()
<< " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
- << (Self->IsFollower() ? " RO replica" : ""));
+ << (Self->IsFollower() ? " RO replica" : ""),
+ ctx.SelfID.NodeId());
return true;
}
}
@@ -1950,26 +1963,30 @@ public:
if (Self->IsFollower()) {
ReplyError(
Ydb::StatusIds::UNSUPPORTED,
- "Followers don't support system table reads");
+ "Followers don't support system table reads",
+ ctx.SelfID.NodeId());
return true;
}
if (!state.IsHeadRead) {
ReplyError(
Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Cannot read system table using snapshot " << state.ReadVersion);
+ TStringBuilder() << "Cannot read system table using snapshot " << state.ReadVersion,
+ ctx.SelfID.NodeId());
return true;
}
if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) {
ReplyError(
Ydb::StatusIds::BAD_REQUEST,
- "Cannot read from user tables using system tables");
+ "Cannot read from user tables using system tables",
+ ctx.SelfID.NodeId());
return true;
}
if (record.GetResultFormat() != NKikimrTxDataShard::CELLVEC) {
ReplyError(
Ydb::StatusIds::UNSUPPORTED,
TStringBuilder() << "Unsupported result format "
- << (int)record.GetResultFormat() << " when reading from system tables");
+ << (int)record.GetResultFormat() << " when reading from system tables",
+ ctx.SelfID.NodeId());
return true;
}
if (record.GetTableId().HasSchemaVersion()) {
@@ -1977,7 +1994,8 @@ public:
Ydb::StatusIds::BAD_REQUEST,
TStringBuilder() << "Cannot request system table at shard " << record.GetTableId().GetOwnerId()
<< ", localTid: " << record.GetTableId().GetTableId()
- << ", with schema: " << record.GetTableId().GetSchemaVersion());
+ << ", with schema: " << record.GetTableId().GetSchemaVersion(),
+ ctx.SelfID.NodeId());
return true;
}
@@ -2056,8 +2074,8 @@ public:
}
}
- void ReplyError(Ydb::StatusIds::StatusCode code, const TString& message) {
- Reply = std::make_unique<TEvDataShard::TEvReadResult>();
+ void ReplyError(Ydb::StatusIds::StatusCode code, const TString& message, ui32 nodeId) {
+ Reply = MakeEvReadResult(nodeId);
SetStatusError(Reply->Record, code, message);
Reply->Record.SetReadId(ReadId.ReadId);
}
@@ -2139,7 +2157,7 @@ public:
<< Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId
<< ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery);
- Result.reset(new TEvDataShard::TEvReadResult());
+ Result = MakeEvReadResult(ctx.SelfID.NodeId());
const auto& tableId = state.PathId.LocalPathId;
if (state.PathId.OwnerId == Self->GetPathOwnerId()) {
@@ -2338,7 +2356,7 @@ public:
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TTxReadContinue::Execute() finished without Result, aborting");
- Result.reset(new TEvDataShard::TEvReadResult());
+ Result = MakeEvReadResult(ctx.SelfID.NodeId());
SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
@@ -2399,7 +2417,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
auto* request = ev->Get();
const auto& record = request->Record;
if (Y_UNLIKELY(!record.HasReadId())) {
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ auto result = MakeEvReadResult(ctx.SelfID.NodeId());
SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing ReadId");
ctx.Send(ev->Sender, result.release());
return;
@@ -2414,7 +2432,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
}
auto replyWithError = [&] (auto code, const auto& msg) {
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ auto result = MakeEvReadResult(ctx.SelfID.NodeId());
SetStatusError(
result->Record,
code,
@@ -2560,7 +2578,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
{
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck: " << record);
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ auto result = MakeEvReadResult(ctx.SelfID.NodeId());
SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing mandatory fields in TEvReadAck");
if (record.HasReadId())
result->Record.SetReadId(record.GetReadId());
@@ -2594,7 +2612,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
<< ", current seqNo# " << state.SeqNo;
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, issueStr);
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ auto result = MakeEvReadResult(ctx.SelfID.NodeId());
SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, issueStr);
result->Record.SetReadId(readId.ReadId);
SendViaSession(state.SessionId, readId.Sender, SelfId(), result.release());
@@ -2669,7 +2687,7 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr
IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds());
}
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ auto result = MakeEvReadResult(ctx.SelfID.NodeId());
SetStatusError(result->Record, code, issue);
result->Record.SetReadId(iterator.first.ReadId);
result->Record.SetSeqNo(state->SeqNo + 1);