summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/fq/libs/row_dispatcher/events/data_plane.h1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp57
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp15
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp68
4 files changed, 110 insertions, 31 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
index 593a3ed1cc0..3f1a0d68c77 100644
--- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
+++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
@@ -155,6 +155,7 @@ struct TEvRowDispatcher {
NFq::NRowDispatcherProto::TEvSessionError, EEv::EvSessionError> {
TEvSessionError() = default;
NActors::TActorId ReadActorId;
+ bool IsFatalError = false; // session is no longer valid if true (need to send TEvPoisonPill).
};
struct TEvSessionStatistic : public NActors::TEventLocal<TEvSessionStatistic, EEv::EvSessionStatistic> {
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index 1348d741724..3562b4632c6 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -948,7 +948,7 @@ void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) {
}
const auto& consumer = consumerIt->second;
- LOG_ROW_DISPATCHER_DEBUG("DeleteConsumer, readActorId " << readActorId << " query id " << consumer->QueryId);
+ LOG_ROW_DISPATCHER_DEBUG("DeleteConsumer, readActorId " << readActorId << " query id " << consumer->QueryId << ", partitions size " << consumer->Partitions.size());
for (auto& [partitionId, partition] : consumer->Partitions) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
*event->Record.MutableSource() = consumer->SourceParams;
@@ -960,17 +960,20 @@ void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) {
consumer->SourceParams.GetDatabase(),
consumer->SourceParams.GetTopicPath(),
partitionId};
- TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
- TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId];
- if (!sessionInfo.Consumers.erase(consumer->ReadActorId)) {
- LOG_ROW_DISPATCHER_ERROR("Wrong readActorId " << consumer->ReadActorId << ", no such consumer");
- }
- if (sessionInfo.Consumers.empty()) {
- LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId);
- topicSessionInfo.Sessions.erase(partition.TopicSessionId);
- Send(partition.TopicSessionId, new NActors::TEvents::TEvPoisonPill());
- if (topicSessionInfo.Sessions.empty()) {
- TopicSessions.erase(topicKey);
+ auto sessionIt = TopicSessions.find(topicKey);
+ if (sessionIt != TopicSessions.end()) {
+ TTopicSessionInfo& topicSessionInfo = sessionIt->second;
+ TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId];
+ if (!sessionInfo.Consumers.erase(consumer->ReadActorId)) {
+ LOG_ROW_DISPATCHER_ERROR("Wrong readActorId " << consumer->ReadActorId << ", no such consumer");
+ }
+ if (sessionInfo.Consumers.empty()) {
+ LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId);
+ topicSessionInfo.Sessions.erase(partition.TopicSessionId);
+ Send(partition.TopicSessionId, new NActors::TEvents::TEvPoisonPill());
+ if (topicSessionInfo.Sessions.empty()) {
+ TopicSessions.erase(sessionIt);
+ }
}
}
}
@@ -1050,8 +1053,36 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) {
LWPROBE(SessionError, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
++*Metrics.ErrorsCount;
LOG_ROW_DISPATCHER_TRACE("Forward TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId);
+
+ if (ev->Get()->IsFatalError) {
+ auto consumerIt = Consumers.find(ev->Get()->ReadActorId);
+ if (consumerIt == Consumers.end()) {
+ LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer) DeleteConsumer, " << " read actor id " << ev->Get()->ReadActorId);
+ return;
+ }
+ const auto& consumer = consumerIt->second;
+ TTopicSessionKey topicKey{
+ consumer->SourceParams.GetReadGroup(),
+ consumer->SourceParams.GetEndpoint(),
+ consumer->SourceParams.GetDatabase(),
+ consumer->SourceParams.GetTopicPath(),
+ ev->Get()->Record.GetPartitionId()};
+
+ auto sessionIt = TopicSessions.find(topicKey);
+ if (sessionIt != TopicSessions.end()) {
+ TTopicSessionInfo& topicSessionInfo = sessionIt->second;
+ if (topicSessionInfo.Sessions.erase(ev->Sender)) {
+ LOG_ROW_DISPATCHER_WARN("Fatal session error, remove session " << ev->Sender);
+ Send(ev->Sender, new NActors::TEvents::TEvPoisonPill());
+ if (topicSessionInfo.Sessions.empty()) {
+ TopicSessions.erase(sessionIt);
+ }
+ }
+ }
+ }
+ auto readActorId = ev->Get()->ReadActorId;
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
- DeleteConsumer(ev->Get()->ReadActorId);
+ DeleteConsumer(readActorId);
}
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index 16e1245297e..4150fb7925c 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -158,7 +158,7 @@ private:
}
void OnClientError(TStatus status) override {
- Self.SendSessionError(ReadActorId, status);
+ Self.SendSessionError(ReadActorId, status, false);
}
void StartClientSession() override {
@@ -325,7 +325,7 @@ private:
void SendStatistics();
bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev);
TMaybe<ui64> GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings);
- void SendSessionError(TActorId readActorId, TStatus status);
+ void SendSessionError(TActorId readActorId, TStatus status, bool isFatalError);
void RestartSessionIfOldestClient(const TClientsInfo& info);
void RefreshParsers();
@@ -753,7 +753,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
}
if (auto status = formatIt->second->AddClient(clientInfo); status.IsFail()) {
- SendSessionError(clientInfo->ReadActorId, status);
+ SendSessionError(clientInfo->ReadActorId, status, false);
return;
}
@@ -828,7 +828,7 @@ void TTopicSession::FatalError(const TStatus& status) {
for (auto& [readActorId, info] : Clients) {
LOG_ROW_DISPATCHER_DEBUG("Send TEvSessionError to " << readActorId);
- SendSessionError(readActorId, status);
+ SendSessionError(readActorId, status, true);
}
StopReadSession();
Become(&TTopicSession::ErrorState);
@@ -839,12 +839,13 @@ void TTopicSession::ThrowFatalError(const TStatus& status) {
ythrow yexception() << "FatalError: " << status.GetErrorMessage();
}
-void TTopicSession::SendSessionError(TActorId readActorId, TStatus status) {
+void TTopicSession::SendSessionError(TActorId readActorId, TStatus status, bool isFatalError) {
LOG_ROW_DISPATCHER_WARN("SendSessionError to " << readActorId << ", status: " << status.GetErrorMessage());
auto event = std::make_unique<TEvRowDispatcher::TEvSessionError>();
event->Record.SetStatusCode(status.GetStatus());
NYql::IssuesToMessage(status.GetErrorDescription(), event->Record.MutableIssues());
event->ReadActorId = readActorId;
+ event->IsFatalError = isFatalError;
Send(RowDispatcherActorId, event.release());
}
@@ -926,14 +927,14 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr&
auto it = Clients.find(ev->Sender);
if (it != Clients.end()) {
LOG_ROW_DISPATCHER_ERROR("Such a client already exists");
- SendSessionError(ev->Sender, TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Client with id " << ev->Sender << " already exists"));
+ SendSessionError(ev->Sender, TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Client with id " << ev->Sender << " already exists"), false);
return false;
}
const auto& source = ev->Get()->Record.GetSource();
if (!Config.GetWithoutConsumer() && ConsumerName && ConsumerName != source.GetConsumerName()) {
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << source.GetConsumerName() << ", send error");
- SendSessionError(ev->Sender, TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")"));
+ SendSessionError(ev->Sender, TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")"), false);
return false;
}
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
index cb0f1caa186..b2568b981fc 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
@@ -176,9 +176,11 @@ public:
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, generation));
}
- void MockSessionError(TActorId topicSessionId, TActorId readActorId) {
+ void MockSessionError(TActorId topicSessionId, TActorId readActorId, ui32 partitionId, bool isFatalError = false) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvSessionError>();
event->ReadActorId = readActorId;
+ event->IsFatalError = isFatalError;
+ event->Record.SetPartitionId(partitionId);
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release()));
}
@@ -199,6 +201,11 @@ public:
UNIT_ASSERT(eventHolder.Get() != nullptr);
}
+ void ExpectPoisonPill(NActors::TActorId actorId) {
+ auto eventHolder = Runtime.GrabEdgeEvent<NActors::TEvents::TEvPoisonPill>(actorId);
+ UNIT_ASSERT(eventHolder.Get() != nullptr);
+ }
+
void ExpectStopSession(NActors::TActorId actorId) {
auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStopSession>(actorId);
UNIT_ASSERT(eventHolder.Get() != nullptr);
@@ -263,8 +270,8 @@ public:
NYql::NPq::NProto::TDqPqTopicSource Source2 = BuildPqTopicSourceSettings("Endpoint2", "Database1", "topic", "connection_id1");
NYql::NPq::NProto::TDqPqTopicSource Source1Connection2 = BuildPqTopicSourceSettings("Endpoint1", "Database1", "topic", "connection_id2");
- ui32 PartitionId0 = 0;
- ui32 PartitionId1 = 1;
+ ui32 PartitionId0 = 100;
+ ui32 PartitionId1 = 101;
};
Y_UNIT_TEST_SUITE(RowDispatcherTests) {
@@ -293,10 +300,10 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
ProcessData(ReadActorId1, PartitionId0, topicSessionId);
ProcessData(ReadActorId2, PartitionId0, topicSessionId);
- MockSessionError(topicSessionId, ReadActorId1);
+ MockSessionError(topicSessionId, ReadActorId1, PartitionId0);
ExpectSessionError(ReadActorId1);
- MockSessionError(topicSessionId, ReadActorId2);
+ MockSessionError(topicSessionId, ReadActorId2, PartitionId0);
ExpectSessionError(ReadActorId2);
}
@@ -306,7 +313,7 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
ExpectStartSessionAck(ReadActorId1);
ExpectStartSession(topicSessionId);
- MockSessionError(topicSessionId, ReadActorId1);
+ MockSessionError(topicSessionId, ReadActorId1, PartitionId0);
ExpectSessionError(ReadActorId1);
}
@@ -357,15 +364,11 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
ProcessData(ReadActorId2, PartitionId0, topicSession3);
ProcessData(ReadActorId2, PartitionId1, topicSession4);
- MockSessionError(topicSession1, ReadActorId1);
+ MockSessionError(topicSession1, ReadActorId1, PartitionId0);
ExpectSessionError(ReadActorId1);
- ProcessData(ReadActorId1, PartitionId1, topicSession2);
ProcessData(ReadActorId2, PartitionId0, topicSession3);
ProcessData(ReadActorId2, PartitionId1, topicSession4);
-
- MockStopSession(Source1, ReadActorId1);
- ExpectStopSession(topicSession2);
MockStopSession(Source2, ReadActorId2);
ExpectStopSession(topicSession3);
@@ -466,6 +469,49 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
MockStopSession(Source1, ReadActorId1);
ExpectStopSession(topicSessionId);
}
+
+ Y_UNIT_TEST_F(SessionFatalError, TFixture) {
+ MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId1);
+ auto session0 = ExpectRegisterTopicSession();
+ auto session1 = ExpectRegisterTopicSession();
+ ExpectStartSessionAck(ReadActorId1);
+ ExpectStartSession(session0);
+ ExpectStartSession(session1);
+
+ MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId2);
+ ExpectStartSessionAck(ReadActorId2);
+ ExpectStartSession(session0);
+ ExpectStartSession(session1);
+
+ MockSessionError(session0, ReadActorId1, PartitionId0, true); // consumer (ReadActorId1) deleted
+ ExpectSessionError(ReadActorId1);
+ ExpectPoisonPill(session0);
+ ExpectStopSession(session1);
+
+ // 1 topic session / 1 consumer (ReadActorId2)
+
+ ProcessData(ReadActorId2, PartitionId1, session1); // still working
+
+ MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId1);
+ auto new_session0 = ExpectRegisterTopicSession();
+ ExpectStartSession(new_session0);
+ ExpectStartSession(session1);
+
+ // 2 topic session / 2 consumer
+
+ MockSessionError(session0, ReadActorId2, PartitionId0, true); // late event, delete ReadActorId2 consumer
+ ExpectSessionError(ReadActorId2);
+
+ // 2 topic session / 1 consumer
+
+ MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId2);
+ ExpectStartSession(new_session0);
+ ExpectStartSession(session1);
+ ProcessData(ReadActorId1, PartitionId0, new_session0);
+ ProcessData(ReadActorId2, PartitionId0, new_session0);
+ ProcessData(ReadActorId1, PartitionId1, session1);
+ ProcessData(ReadActorId2, PartitionId1, session1);
+ }
}
}