diff options
author | ssmike <ssmike@ydb.tech> | 2023-07-24 15:36:43 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-07-24 15:36:43 +0300 |
commit | 441a59ffc7f7913545a87c67e8c66b4a4dfc11e5 (patch) | |
tree | f4d8516dee8b961a6687d3e664ebe598e26016bd | |
parent | c3f6b5b8a4c8e976c40f36f3ae4e36011ed24ab2 (diff) | |
download | ydb-441a59ffc7f7913545a87c67e8c66b4a4dfc11e5.tar.gz |
Remote fetch stats for iterator reads
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 27 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 68 |
5 files changed, 78 insertions, 25 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 26d1b4a266..bec70b79a1 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -787,6 +787,9 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co SentIteratorCancels = KqpGroup->GetCounter("IteratorReads/SentCancels", true); CreatedIterators = KqpGroup->GetCounter("IteratorReads/Created", true); ReadActorsCount = KqpGroup->GetCounter("IteratorReads/ReadActorCount", false); + ReadActorRemoteFetch = KqpGroup->GetCounter("IteratorReads/ReadActorRemoteFetch", true); + ReadActorRemoteFirstFetch = KqpGroup->GetCounter("IteratorReads/ReadActorRemoteFirstFetch", true); + ReadActorAbsentNodeId = KqpGroup->GetCounter("IteratorReads/AbsentNodeId", true); StreamLookupActorsCount = KqpGroup->GetCounter("IteratorReads/StreamLookupActorCount", false); ReadActorRetries = KqpGroup->GetCounter("IteratorReads/Retries", true); DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 07f2629609..7cfc101f01 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -384,6 +384,9 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr SentIteratorCancels; ::NMonitoring::TDynamicCounters::TCounterPtr CreatedIterators; ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorsCount; + ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRemoteFirstFetch; + ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRemoteFetch; + ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorAbsentNodeId; ::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupActorsCount; ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries; ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails; diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b8c7c34d67..caaf1fc52b 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -131,6 +131,10 @@ public: size_t RetryAttempt = 0; size_t SuccessBatches = 0; + TMaybe<ui32> NodeId = {}; + bool IsFirst = false; + + TShardState(ui64 tabletId) : TabletId(tabletId) { @@ -875,6 +879,11 @@ public: Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), IEventHandle::FlagTrackDelivery); + if (!FirstShardStarted) { + state->IsFirst = true; + } + FirstShardStarted = true; + if (auto delay = ShardTimeout()) { TlsActivationContext->Schedule(*delay, new IEventHandle(SelfId(), SelfId(), new TEvRetryShard(id, Reads[id].LastSeqNo))); } @@ -905,6 +914,22 @@ public: return; } + if (!record.HasNodeId()) { + Counters->ReadActorAbsentNodeId->Inc(); + } else if (record.GetNodeId() != SelfId().NodeId()) { + auto* state = Reads[id].Shard; + if (!state->NodeId) { + state->NodeId = record.GetNodeId(); + CA_LOG_D("Node mismatch for tablet " << state->TabletId << " " << *state->NodeId << " != SelfId: " << SelfId().NodeId()); + if (state->IsFirst) { + Counters->ReadActorRemoteFirstFetch->Inc(); + } + Counters->ReadActorRemoteFetch->Inc(); + } + } else { + CA_LOG_T("Node match for tablet " << Reads[id].Shard->TabletId); + } + Counters->DataShardIteratorMessages->Inc(); if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { Counters->DataShardIteratorFails->Inc(); @@ -1402,6 +1427,8 @@ private: NActors::TActorId PipeCacheId; size_t TotalRetries = 0; + + bool FirstShardStarted = false; }; diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 6b1a4fffb1..3d2dd9ad3b 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1726,6 +1726,8 @@ message TEvReadResult { optional uint64 RowCount = 12; + optional uint32 NodeId = 13; + // Data for the possibly partial result oneof ReadResult { TArrowBatch ArrowBatch = 900; diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 29b502d665..81e84575e2 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -996,6 +996,13 @@ private: } }; +std::unique_ptr<TEvDataShard::TEvReadResult> MakeEvReadResult(ui32 nodeId) { + auto result = std::make_unique<TEvDataShard::TEvReadResult>(); + result->Record.SetNodeId(nodeId); + return result; +} + + const NHPTimer::STime TReader::MaxCyclesPerIteration = /* 10ms */ (NHPTimer::GetCyclesPerSecond() + 99) / 100; @@ -1214,7 +1221,7 @@ public: Y_VERIFY(ok, "Unexpected failure to attach a blocked operation"); } Reader.reset(); - Result.reset(new TEvDataShard::TEvReadResult()); + Result = MakeEvReadResult(ctx.SelfID.NodeId()); return EExecutionStatus::Continue; } @@ -1242,7 +1249,7 @@ public: AddDependency(it->second); // Make sure current incomplete result will not be sent - Result.reset(new TEvDataShard::TEvReadResult()); + Result = MakeEvReadResult(ctx.SelfID.NodeId()); return EExecutionStatus::Continue; } @@ -1256,7 +1263,7 @@ public: SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true); // Make sure current incomplete result will not be sent - Result.reset(new TEvDataShard::TEvReadResult()); + Result = MakeEvReadResult(ctx.SelfID.NodeId()); return EExecutionStatus::Reschedule; } @@ -1309,7 +1316,7 @@ public: auto& state = *it->second; Y_VERIFY(state.State == TReadIteratorState::EState::Init); - Result.reset(new TEvDataShard::TEvReadResult()); + Result = MakeEvReadResult(ctx.SelfID.NodeId()); const auto& record = Request->Record; @@ -1454,7 +1461,7 @@ public: if (!Result) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TReadOperation::Execute() finished without Result, aborting"); - Result.reset(new TEvDataShard::TEvReadResult()); + Result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); @@ -1822,7 +1829,8 @@ public: } ReplyError( Ydb::StatusIds::INTERNAL_ERROR, - TStringBuilder() << "Failed to sync follower: " << errMessage); + TStringBuilder() << "Failed to sync follower: " << errMessage, + ctx.SelfID.NodeId()); return true; } } @@ -1846,7 +1854,8 @@ public: Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId << ", tableId: " << state.PathId.LocalPathId - << ", from wrong owner: " << Self->GetPathOwnerId()); + << ", from wrong owner: " << Self->GetPathOwnerId(), + ctx.SelfID.NodeId()); return true; } @@ -1855,7 +1864,8 @@ public: if (it == Self->TableInfos.end()) { ReplyError( Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Unknown table id: " << tableId); + TStringBuilder() << "Unknown table id: " << tableId, + ctx.SelfID.NodeId()); return true; } @@ -1863,14 +1873,16 @@ public: if (userTableInfo->IsBackup) { ReplyError( Ydb::StatusIds::BAD_REQUEST, - "Can't read from a backup table"); + "Can't read from a backup table", + ctx.SelfID.NodeId()); return true; } if (!Self->IsMvccEnabled()) { ReplyError( Ydb::StatusIds::UNSUPPORTED, - "Cannot use read iterators without mvcc"); + "Cannot use read iterators without mvcc", + ctx.SelfID.NodeId()); return true; } @@ -1940,7 +1952,8 @@ public: TStringBuilder() << "Table id " << tableId << " has no snapshot at " << state.ReadVersion << " shard " << Self->TabletID() << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() - << (Self->IsFollower() ? " RO replica" : "")); + << (Self->IsFollower() ? " RO replica" : ""), + ctx.SelfID.NodeId()); return true; } } @@ -1950,26 +1963,30 @@ public: if (Self->IsFollower()) { ReplyError( Ydb::StatusIds::UNSUPPORTED, - "Followers don't support system table reads"); + "Followers don't support system table reads", + ctx.SelfID.NodeId()); return true; } if (!state.IsHeadRead) { ReplyError( Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Cannot read system table using snapshot " << state.ReadVersion); + TStringBuilder() << "Cannot read system table using snapshot " << state.ReadVersion, + ctx.SelfID.NodeId()); return true; } if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) { ReplyError( Ydb::StatusIds::BAD_REQUEST, - "Cannot read from user tables using system tables"); + "Cannot read from user tables using system tables", + ctx.SelfID.NodeId()); return true; } if (record.GetResultFormat() != NKikimrTxDataShard::CELLVEC) { ReplyError( Ydb::StatusIds::UNSUPPORTED, TStringBuilder() << "Unsupported result format " - << (int)record.GetResultFormat() << " when reading from system tables"); + << (int)record.GetResultFormat() << " when reading from system tables", + ctx.SelfID.NodeId()); return true; } if (record.GetTableId().HasSchemaVersion()) { @@ -1977,7 +1994,8 @@ public: Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Cannot request system table at shard " << record.GetTableId().GetOwnerId() << ", localTid: " << record.GetTableId().GetTableId() - << ", with schema: " << record.GetTableId().GetSchemaVersion()); + << ", with schema: " << record.GetTableId().GetSchemaVersion(), + ctx.SelfID.NodeId()); return true; } @@ -2056,8 +2074,8 @@ public: } } - void ReplyError(Ydb::StatusIds::StatusCode code, const TString& message) { - Reply = std::make_unique<TEvDataShard::TEvReadResult>(); + void ReplyError(Ydb::StatusIds::StatusCode code, const TString& message, ui32 nodeId) { + Reply = MakeEvReadResult(nodeId); SetStatusError(Reply->Record, code, message); Reply->Record.SetReadId(ReadId.ReadId); } @@ -2139,7 +2157,7 @@ public: << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId << ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery); - Result.reset(new TEvDataShard::TEvReadResult()); + Result = MakeEvReadResult(ctx.SelfID.NodeId()); const auto& tableId = state.PathId.LocalPathId; if (state.PathId.OwnerId == Self->GetPathOwnerId()) { @@ -2338,7 +2356,7 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxReadContinue::Execute() finished without Result, aborting"); - Result.reset(new TEvDataShard::TEvReadResult()); + Result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); @@ -2399,7 +2417,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct auto* request = ev->Get(); const auto& record = request->Record; if (Y_UNLIKELY(!record.HasReadId())) { - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + auto result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing ReadId"); ctx.Send(ev->Sender, result.release()); return; @@ -2414,7 +2432,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct } auto replyWithError = [&] (auto code, const auto& msg) { - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + auto result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError( result->Record, code, @@ -2560,7 +2578,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck: " << record); - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + auto result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing mandatory fields in TEvReadAck"); if (record.HasReadId()) result->Record.SetReadId(record.GetReadId()); @@ -2594,7 +2612,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& << ", current seqNo# " << state.SeqNo; LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, issueStr); - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + auto result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, issueStr); result->Record.SetReadId(readId.ReadId); SendViaSession(state.SessionId, readId.Sender, SelfId(), result.release()); @@ -2669,7 +2687,7 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr IncCounter(COUNTER_READ_ITERATOR_LIFETIME_MS, delta.MilliSeconds()); } - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + auto result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError(result->Record, code, issue); result->Record.SetReadId(iterator.first.ReadId); result->Record.SetSeqNo(state->SeqNo + 1); |