diff options
4 files changed, 110 insertions, 31 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h index 593a3ed1cc0..3f1a0d68c77 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h +++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h @@ -155,6 +155,7 @@ struct TEvRowDispatcher { NFq::NRowDispatcherProto::TEvSessionError, EEv::EvSessionError> { TEvSessionError() = default; NActors::TActorId ReadActorId; + bool IsFatalError = false; // session is no longer valid if true (need to send TEvPoisonPill). }; struct TEvSessionStatistic : public NActors::TEventLocal<TEvSessionStatistic, EEv::EvSessionStatistic> { diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 1348d741724..3562b4632c6 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -948,7 +948,7 @@ void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) { } const auto& consumer = consumerIt->second; - LOG_ROW_DISPATCHER_DEBUG("DeleteConsumer, readActorId " << readActorId << " query id " << consumer->QueryId); + LOG_ROW_DISPATCHER_DEBUG("DeleteConsumer, readActorId " << readActorId << " query id " << consumer->QueryId << ", partitions size " << consumer->Partitions.size()); for (auto& [partitionId, partition] : consumer->Partitions) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); *event->Record.MutableSource() = consumer->SourceParams; @@ -960,17 +960,20 @@ void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) { consumer->SourceParams.GetDatabase(), consumer->SourceParams.GetTopicPath(), partitionId}; - TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey]; - TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId]; - if (!sessionInfo.Consumers.erase(consumer->ReadActorId)) { - LOG_ROW_DISPATCHER_ERROR("Wrong readActorId " << consumer->ReadActorId << ", no such consumer"); - } - if (sessionInfo.Consumers.empty()) { - LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId); - topicSessionInfo.Sessions.erase(partition.TopicSessionId); - Send(partition.TopicSessionId, new NActors::TEvents::TEvPoisonPill()); - if (topicSessionInfo.Sessions.empty()) { - TopicSessions.erase(topicKey); + auto sessionIt = TopicSessions.find(topicKey); + if (sessionIt != TopicSessions.end()) { + TTopicSessionInfo& topicSessionInfo = sessionIt->second; + TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId]; + if (!sessionInfo.Consumers.erase(consumer->ReadActorId)) { + LOG_ROW_DISPATCHER_ERROR("Wrong readActorId " << consumer->ReadActorId << ", no such consumer"); + } + if (sessionInfo.Consumers.empty()) { + LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId); + topicSessionInfo.Sessions.erase(partition.TopicSessionId); + Send(partition.TopicSessionId, new NActors::TEvents::TEvPoisonPill()); + if (topicSessionInfo.Sessions.empty()) { + TopicSessions.erase(sessionIt); + } } } } @@ -1050,8 +1053,36 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) { LWPROBE(SessionError, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong()); ++*Metrics.ErrorsCount; LOG_ROW_DISPATCHER_TRACE("Forward TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId); + + if (ev->Get()->IsFatalError) { + auto consumerIt = Consumers.find(ev->Get()->ReadActorId); + if (consumerIt == Consumers.end()) { + LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer) DeleteConsumer, " << " read actor id " << ev->Get()->ReadActorId); + return; + } + const auto& consumer = consumerIt->second; + TTopicSessionKey topicKey{ + consumer->SourceParams.GetReadGroup(), + consumer->SourceParams.GetEndpoint(), + consumer->SourceParams.GetDatabase(), + consumer->SourceParams.GetTopicPath(), + ev->Get()->Record.GetPartitionId()}; + + auto sessionIt = TopicSessions.find(topicKey); + if (sessionIt != TopicSessions.end()) { + TTopicSessionInfo& topicSessionInfo = sessionIt->second; + if (topicSessionInfo.Sessions.erase(ev->Sender)) { + LOG_ROW_DISPATCHER_WARN("Fatal session error, remove session " << ev->Sender); + Send(ev->Sender, new NActors::TEvents::TEvPoisonPill()); + if (topicSessionInfo.Sessions.empty()) { + TopicSessions.erase(sessionIt); + } + } + } + } + auto readActorId = ev->Get()->ReadActorId; it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation); - DeleteConsumer(ev->Get()->ReadActorId); + DeleteConsumer(readActorId); } void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) { diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 16e1245297e..4150fb7925c 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -158,7 +158,7 @@ private: } void OnClientError(TStatus status) override { - Self.SendSessionError(ReadActorId, status); + Self.SendSessionError(ReadActorId, status, false); } void StartClientSession() override { @@ -325,7 +325,7 @@ private: void SendStatistics(); bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev); TMaybe<ui64> GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings); - void SendSessionError(TActorId readActorId, TStatus status); + void SendSessionError(TActorId readActorId, TStatus status, bool isFatalError); void RestartSessionIfOldestClient(const TClientsInfo& info); void RefreshParsers(); @@ -753,7 +753,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { } if (auto status = formatIt->second->AddClient(clientInfo); status.IsFail()) { - SendSessionError(clientInfo->ReadActorId, status); + SendSessionError(clientInfo->ReadActorId, status, false); return; } @@ -828,7 +828,7 @@ void TTopicSession::FatalError(const TStatus& status) { for (auto& [readActorId, info] : Clients) { LOG_ROW_DISPATCHER_DEBUG("Send TEvSessionError to " << readActorId); - SendSessionError(readActorId, status); + SendSessionError(readActorId, status, true); } StopReadSession(); Become(&TTopicSession::ErrorState); @@ -839,12 +839,13 @@ void TTopicSession::ThrowFatalError(const TStatus& status) { ythrow yexception() << "FatalError: " << status.GetErrorMessage(); } -void TTopicSession::SendSessionError(TActorId readActorId, TStatus status) { +void TTopicSession::SendSessionError(TActorId readActorId, TStatus status, bool isFatalError) { LOG_ROW_DISPATCHER_WARN("SendSessionError to " << readActorId << ", status: " << status.GetErrorMessage()); auto event = std::make_unique<TEvRowDispatcher::TEvSessionError>(); event->Record.SetStatusCode(status.GetStatus()); NYql::IssuesToMessage(status.GetErrorDescription(), event->Record.MutableIssues()); event->ReadActorId = readActorId; + event->IsFatalError = isFatalError; Send(RowDispatcherActorId, event.release()); } @@ -926,14 +927,14 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& auto it = Clients.find(ev->Sender); if (it != Clients.end()) { LOG_ROW_DISPATCHER_ERROR("Such a client already exists"); - SendSessionError(ev->Sender, TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Client with id " << ev->Sender << " already exists")); + SendSessionError(ev->Sender, TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Client with id " << ev->Sender << " already exists"), false); return false; } const auto& source = ev->Get()->Record.GetSource(); if (!Config.GetWithoutConsumer() && ConsumerName && ConsumerName != source.GetConsumerName()) { LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << source.GetConsumerName() << ", send error"); - SendSessionError(ev->Sender, TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")")); + SendSessionError(ev->Sender, TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")"), false); return false; } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index cb0f1caa186..b2568b981fc 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -176,9 +176,11 @@ public: Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, generation)); } - void MockSessionError(TActorId topicSessionId, TActorId readActorId) { + void MockSessionError(TActorId topicSessionId, TActorId readActorId, ui32 partitionId, bool isFatalError = false) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvSessionError>(); event->ReadActorId = readActorId; + event->IsFatalError = isFatalError; + event->Record.SetPartitionId(partitionId); Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release())); } @@ -199,6 +201,11 @@ public: UNIT_ASSERT(eventHolder.Get() != nullptr); } + void ExpectPoisonPill(NActors::TActorId actorId) { + auto eventHolder = Runtime.GrabEdgeEvent<NActors::TEvents::TEvPoisonPill>(actorId); + UNIT_ASSERT(eventHolder.Get() != nullptr); + } + void ExpectStopSession(NActors::TActorId actorId) { auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStopSession>(actorId); UNIT_ASSERT(eventHolder.Get() != nullptr); @@ -263,8 +270,8 @@ public: NYql::NPq::NProto::TDqPqTopicSource Source2 = BuildPqTopicSourceSettings("Endpoint2", "Database1", "topic", "connection_id1"); NYql::NPq::NProto::TDqPqTopicSource Source1Connection2 = BuildPqTopicSourceSettings("Endpoint1", "Database1", "topic", "connection_id2"); - ui32 PartitionId0 = 0; - ui32 PartitionId1 = 1; + ui32 PartitionId0 = 100; + ui32 PartitionId1 = 101; }; Y_UNIT_TEST_SUITE(RowDispatcherTests) { @@ -293,10 +300,10 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { ProcessData(ReadActorId1, PartitionId0, topicSessionId); ProcessData(ReadActorId2, PartitionId0, topicSessionId); - MockSessionError(topicSessionId, ReadActorId1); + MockSessionError(topicSessionId, ReadActorId1, PartitionId0); ExpectSessionError(ReadActorId1); - MockSessionError(topicSessionId, ReadActorId2); + MockSessionError(topicSessionId, ReadActorId2, PartitionId0); ExpectSessionError(ReadActorId2); } @@ -306,7 +313,7 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { ExpectStartSessionAck(ReadActorId1); ExpectStartSession(topicSessionId); - MockSessionError(topicSessionId, ReadActorId1); + MockSessionError(topicSessionId, ReadActorId1, PartitionId0); ExpectSessionError(ReadActorId1); } @@ -357,15 +364,11 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { ProcessData(ReadActorId2, PartitionId0, topicSession3); ProcessData(ReadActorId2, PartitionId1, topicSession4); - MockSessionError(topicSession1, ReadActorId1); + MockSessionError(topicSession1, ReadActorId1, PartitionId0); ExpectSessionError(ReadActorId1); - ProcessData(ReadActorId1, PartitionId1, topicSession2); ProcessData(ReadActorId2, PartitionId0, topicSession3); ProcessData(ReadActorId2, PartitionId1, topicSession4); - - MockStopSession(Source1, ReadActorId1); - ExpectStopSession(topicSession2); MockStopSession(Source2, ReadActorId2); ExpectStopSession(topicSession3); @@ -466,6 +469,49 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { MockStopSession(Source1, ReadActorId1); ExpectStopSession(topicSessionId); } + + Y_UNIT_TEST_F(SessionFatalError, TFixture) { + MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId1); + auto session0 = ExpectRegisterTopicSession(); + auto session1 = ExpectRegisterTopicSession(); + ExpectStartSessionAck(ReadActorId1); + ExpectStartSession(session0); + ExpectStartSession(session1); + + MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId2); + ExpectStartSessionAck(ReadActorId2); + ExpectStartSession(session0); + ExpectStartSession(session1); + + MockSessionError(session0, ReadActorId1, PartitionId0, true); // consumer (ReadActorId1) deleted + ExpectSessionError(ReadActorId1); + ExpectPoisonPill(session0); + ExpectStopSession(session1); + + // 1 topic session / 1 consumer (ReadActorId2) + + ProcessData(ReadActorId2, PartitionId1, session1); // still working + + MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId1); + auto new_session0 = ExpectRegisterTopicSession(); + ExpectStartSession(new_session0); + ExpectStartSession(session1); + + // 2 topic session / 2 consumer + + MockSessionError(session0, ReadActorId2, PartitionId0, true); // late event, delete ReadActorId2 consumer + ExpectSessionError(ReadActorId2); + + // 2 topic session / 1 consumer + + MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId2); + ExpectStartSession(new_session0); + ExpectStartSession(session1); + ProcessData(ReadActorId1, PartitionId0, new_session0); + ProcessData(ReadActorId2, PartitionId0, new_session0); + ProcessData(ReadActorId1, PartitionId1, session1); + ProcessData(ReadActorId2, PartitionId1, session1); + } } } |
