diff options
4 files changed, 44 insertions, 13 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 502d384f6e0..e8b31a58b5a 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -710,6 +710,9 @@ TString TRowDispatcher::GetInternalState() { } for (auto& [readActorId, consumer] : sessionInfo.Consumers) { + if (!consumer->Partitions.contains(key.PartitionId)) { + continue; + } const auto& partition = consumer->Partitions[key.PartitionId]; str << " " << consumer->QueryId << " " << LeftPad(readActorId, 32) << " unread bytes " << toHuman(consumer->Stat.QueuedBytes) << " (" << leftPad(consumer->Stat.QueuedRows) << " rows) " @@ -913,7 +916,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) { } LWPROBE(StopSession, ev->Sender.ToString(), it->second->QueryId, ev->Get()->Record.ByteSizeLong()); - LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " query id " << it->second->QueryId); + LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession from " << ev->Sender << " topic " << ev->Get()->Record.GetSource().GetTopicPath() << " query id " << it->second->QueryId); if (!CheckSession(it->second, ev)) { return; } @@ -942,8 +945,11 @@ void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) { partitionId}; TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey]; TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId]; - Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId)); - sessionInfo.Consumers.erase(consumer->ReadActorId); + if (!sessionInfo.Consumers.contains(consumer->ReadActorId)) { + LOG_ROW_DISPATCHER_ERROR("Wrong readActorId " << consumer->ReadActorId << ", no such consumer"); + } else { + sessionInfo.Consumers.erase(consumer->ReadActorId); + } if (sessionInfo.Consumers.empty()) { LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId); topicSessionInfo.Sessions.erase(partition.TopicSessionId); @@ -989,10 +995,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) } LWPROBE(NewDataArrived, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong()); LOG_ROW_DISPATCHER_TRACE("Forward TEvNewDataArrived from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId); - auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()]; - partition.PendingNewDataArrived = true; - it->second->Counters.NewDataArrived++; - it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation); + auto consumerInfoPtr = it->second; + auto partitionIt = consumerInfoPtr->Partitions.find(ev->Get()->Record.GetPartitionId()); + if (partitionIt == consumerInfoPtr->Partitions.end()) { + // Ignore TEvNewDataArrived because read actor now read others partitions. + return; + } + partitionIt->second.PendingNewDataArrived = true; + consumerInfoPtr->Counters.NewDataArrived++; + consumerInfoPtr->EventsQueue.Send(ev->Release().Release(), it->second->Generation); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) { @@ -1004,10 +1015,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) { LWPROBE(MessageBatch, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong()); LOG_ROW_DISPATCHER_TRACE("Forward TEvMessageBatch from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId); Metrics.RowsSent->Add(ev->Get()->Record.MessagesSize()); - auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()]; - partition.PendingGetNextBatch = false; - it->second->Counters.MessageBatch++; - it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation); + auto consumerInfoPtr = it->second; + auto partitionIt = consumerInfoPtr->Partitions.find(ev->Get()->Record.GetPartitionId()); + if (partitionIt == consumerInfoPtr->Partitions.end()) { + // Ignore TEvMessageBatch because read actor now read others partitions. + return; + } + partitionIt->second.PendingGetNextBatch = false; + consumerInfoPtr->Counters.MessageBatch++; + consumerInfoPtr->EventsQueue.Send(ev->Release().Release(), it->second->Generation); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index 340195f7328..88151dc0e80 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -449,6 +449,18 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { MockNoSession(ReadActorId3); ExpectStopSession(topicSessionId); } + + Y_UNIT_TEST_F(IgnoreWrongPartitionId, TFixture) { + MockAddSession(Source1, {PartitionId0}, ReadActorId1); + auto topicSessionId = ExpectRegisterTopicSession(); + ExpectStartSessionAck(ReadActorId1); + ExpectStartSession(topicSessionId); + + MockNewDataArrived(PartitionId1, topicSessionId, ReadActorId1); + + MockStopSession(Source1, ReadActorId1); + ExpectStopSession(topicSessionId); + } } } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index e0dc13b4def..dacd6946ada 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -636,7 +636,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev } auto partitionIt = session->Partitions.find(ev->Get()->Record.GetPartitionId()); if (partitionIt == session->Partitions.end()) { - SRC_LOG_E("TEvNewDataArrived: wrong partition id " << ev->Get()->Record.GetPartitionId()); + SRC_LOG_E("Received TEvNewDataArrived from " << ev->Sender << " with wrong partition id " << ev->Get()->Record.GetPartitionId()); Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << ev->Get()->Record.GetPartitionId())}); return; } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp index ba9b9825f24..f4c4162a9f2 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp @@ -73,9 +73,12 @@ void TDqPqReadActorBase::LoadState(const TSourceState& state) { minStartingMessageTs = Min(minStartingMessageTs, TInstant::MilliSeconds(stateProto.GetStartingMessageTimestampMs())); ingressBytes += stateProto.GetIngressBytes(); } + TStringStream str; + str << "SessionId: " << GetSessionId() << " Restoring offset: "; for (const auto& [key, value] : PartitionToOffset) { - SRC_LOG_D("SessionId: " << GetSessionId() << " Restoring offset: cluster " << key.first << ", partition id " << key.second << ", offset: " << value); + str << "{" << key.first << "," << key.second << "," << value << "},"; } + SRC_LOG_D(str.Str()); StartingMessageTimestamp = minStartingMessageTs; IngressStats.Bytes += ingressBytes; IngressStats.Chunks++; |
