summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp38
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp12
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp5
4 files changed, 44 insertions, 13 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index 502d384f6e0..e8b31a58b5a 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -710,6 +710,9 @@ TString TRowDispatcher::GetInternalState() {
}
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
+ if (!consumer->Partitions.contains(key.PartitionId)) {
+ continue;
+ }
const auto& partition = consumer->Partitions[key.PartitionId];
str << " " << consumer->QueryId << " " << LeftPad(readActorId, 32) << " unread bytes "
<< toHuman(consumer->Stat.QueuedBytes) << " (" << leftPad(consumer->Stat.QueuedRows) << " rows) "
@@ -913,7 +916,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
}
LWPROBE(StopSession, ev->Sender.ToString(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
- LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " query id " << it->second->QueryId);
+ LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession from " << ev->Sender << " topic " << ev->Get()->Record.GetSource().GetTopicPath() << " query id " << it->second->QueryId);
if (!CheckSession(it->second, ev)) {
return;
}
@@ -942,8 +945,11 @@ void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) {
partitionId};
TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId];
- Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId));
- sessionInfo.Consumers.erase(consumer->ReadActorId);
+ if (!sessionInfo.Consumers.contains(consumer->ReadActorId)) {
+ LOG_ROW_DISPATCHER_ERROR("Wrong readActorId " << consumer->ReadActorId << ", no such consumer");
+ } else {
+ sessionInfo.Consumers.erase(consumer->ReadActorId);
+ }
if (sessionInfo.Consumers.empty()) {
LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId);
topicSessionInfo.Sessions.erase(partition.TopicSessionId);
@@ -989,10 +995,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev)
}
LWPROBE(NewDataArrived, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
LOG_ROW_DISPATCHER_TRACE("Forward TEvNewDataArrived from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId);
- auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()];
- partition.PendingNewDataArrived = true;
- it->second->Counters.NewDataArrived++;
- it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
+ auto consumerInfoPtr = it->second;
+ auto partitionIt = consumerInfoPtr->Partitions.find(ev->Get()->Record.GetPartitionId());
+ if (partitionIt == consumerInfoPtr->Partitions.end()) {
+ // Ignore TEvNewDataArrived because read actor now read others partitions.
+ return;
+ }
+ partitionIt->second.PendingNewDataArrived = true;
+ consumerInfoPtr->Counters.NewDataArrived++;
+ consumerInfoPtr->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
}
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
@@ -1004,10 +1015,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
LWPROBE(MessageBatch, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
LOG_ROW_DISPATCHER_TRACE("Forward TEvMessageBatch from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId);
Metrics.RowsSent->Add(ev->Get()->Record.MessagesSize());
- auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()];
- partition.PendingGetNextBatch = false;
- it->second->Counters.MessageBatch++;
- it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
+ auto consumerInfoPtr = it->second;
+ auto partitionIt = consumerInfoPtr->Partitions.find(ev->Get()->Record.GetPartitionId());
+ if (partitionIt == consumerInfoPtr->Partitions.end()) {
+ // Ignore TEvMessageBatch because read actor now read others partitions.
+ return;
+ }
+ partitionIt->second.PendingGetNextBatch = false;
+ consumerInfoPtr->Counters.MessageBatch++;
+ consumerInfoPtr->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
}
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) {
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
index 340195f7328..88151dc0e80 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
@@ -449,6 +449,18 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
MockNoSession(ReadActorId3);
ExpectStopSession(topicSessionId);
}
+
+ Y_UNIT_TEST_F(IgnoreWrongPartitionId, TFixture) {
+ MockAddSession(Source1, {PartitionId0}, ReadActorId1);
+ auto topicSessionId = ExpectRegisterTopicSession();
+ ExpectStartSessionAck(ReadActorId1);
+ ExpectStartSession(topicSessionId);
+
+ MockNewDataArrived(PartitionId1, topicSessionId, ReadActorId1);
+
+ MockStopSession(Source1, ReadActorId1);
+ ExpectStopSession(topicSessionId);
+ }
}
}
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
index e0dc13b4def..dacd6946ada 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
@@ -636,7 +636,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
}
auto partitionIt = session->Partitions.find(ev->Get()->Record.GetPartitionId());
if (partitionIt == session->Partitions.end()) {
- SRC_LOG_E("TEvNewDataArrived: wrong partition id " << ev->Get()->Record.GetPartitionId());
+ SRC_LOG_E("Received TEvNewDataArrived from " << ev->Sender << " with wrong partition id " << ev->Get()->Record.GetPartitionId());
Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << ev->Get()->Record.GetPartitionId())});
return;
}
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp
index ba9b9825f24..f4c4162a9f2 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp
@@ -73,9 +73,12 @@ void TDqPqReadActorBase::LoadState(const TSourceState& state) {
minStartingMessageTs = Min(minStartingMessageTs, TInstant::MilliSeconds(stateProto.GetStartingMessageTimestampMs()));
ingressBytes += stateProto.GetIngressBytes();
}
+ TStringStream str;
+ str << "SessionId: " << GetSessionId() << " Restoring offset: ";
for (const auto& [key, value] : PartitionToOffset) {
- SRC_LOG_D("SessionId: " << GetSessionId() << " Restoring offset: cluster " << key.first << ", partition id " << key.second << ", offset: " << value);
+ str << "{" << key.first << "," << key.second << "," << value << "},";
}
+ SRC_LOG_D(str.Str());
StartingMessageTimestamp = minStartingMessageTs;
IngressStats.Bytes += ingressBytes;
IngressStats.Chunks++;