diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-11-26 13:06:57 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-26 13:06:57 +0300 |
commit | bf8b612434cc18322ce33d127a269434606cdac8 (patch) | |
tree | b4c4164629aa80ffed1ffd3516fc8b3792375f99 | |
parent | f1aa57bba470e90832ca25c102c348e36145a766 (diff) | |
download | ydb-bf8b612434cc18322ce33d127a269434606cdac8.tar.gz |
YQ-3904 Remove TEvSessionClosed and check generation (#11930)
12 files changed, 102 insertions, 83 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index cc5bcf5da9..385fad09b7 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -38,8 +38,20 @@ struct TCoordinatorMetrics { ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode; }; +struct TEvPrivate { + enum EEv : ui32 { + EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvPrintState = EvBegin, + EvEnd + }; + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + struct TEvPrintState : public NActors::TEventLocal<TEvPrintState, EvPrintState> {}; +}; + class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> { + const ui64 PrintStatePeriodSec = 300; + struct TPartitionKey { TString Endpoint; TString Database; @@ -167,6 +179,7 @@ public: void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev); + void Handle(TEvPrivate::TEvPrintState::TPtr&); STRICT_STFUNC( StateFunc, { @@ -176,6 +189,7 @@ public: hFunc(NActors::TEvents::TEvUndelivered, Handle); hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle); hFunc(NFq::TEvRowDispatcher::TEvCoordinatorRequest, Handle); + hFunc(TEvPrivate::TEvPrintState, Handle); }) private: @@ -209,6 +223,7 @@ TActorCoordinator::TActorCoordinator( void TActorCoordinator::Bootstrap() { Become(&TActorCoordinator::StateFunc); Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); + Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId()); auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId())); Metrics.IsActive = nodeGroup->GetCounter("IsActive"); @@ -432,8 +447,6 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << readActorId); Send(readActorId, response.release(), IEventHandle::FlagTrackDelivery, request.Cookie); - PrintInternalState(); - return true; } @@ -447,6 +460,11 @@ void TActorCoordinator::UpdatePendingReadActors() { } } +void TActorCoordinator::Handle(TEvPrivate::TEvPrintState::TPtr&) { + Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); + PrintInternalState(); +} + } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 337ceea8ec..19d2302188 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -355,7 +355,6 @@ public: void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev); void Handle(const TEvPrivate::TEvTryConnect::TPtr&); void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&); - void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&); void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&); void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&); void Handle(const NMon::TEvHttpInfo::TPtr&); @@ -385,7 +384,6 @@ public: hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle); hFunc(TEvPrivate::TEvTryConnect, Handle); hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle); - hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle); hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle); hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle); hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle); @@ -472,9 +470,15 @@ void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TP } void TRowDispatcher::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { - LOG_ROW_DISPATCHER_DEBUG("TEvUndelivered, ev: " << ev->Get()->ToString() << ", reason " << ev->Get()->Reason); - for (auto& [actorId, consumer] : Consumers) { - consumer->EventsQueue.HandleUndelivered(ev); + LOG_ROW_DISPATCHER_DEBUG("TEvUndelivered, from " << ev->Sender << ", reason " << ev->Get()->Reason); + for (auto& [key, consumer] : Consumers) { + if (ev->Cookie != consumer->Generation) { // Several partitions in one read_actor have different Generation. + continue; + } + if (consumer->EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) { + DeleteConsumer(key); + break; + } } } @@ -761,7 +765,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) { void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) { auto consumerIt = Consumers.find(key); if (consumerIt == Consumers.end()) { - LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer )DeleteConsumer, " << " read actor id " << key.ReadActorId << " part id " << key.PartitionId); + LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer) DeleteConsumer, " << " read actor id " << key.ReadActorId << " part id " << key.PartitionId); return; } @@ -782,7 +786,7 @@ void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) { Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId)); sessionInfo.Consumers.erase(consumer->ReadActorId); if (sessionInfo.Consumers.empty()) { - LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill"); + LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << consumerIt->second->TopicSessionId); topicSessionInfo.Sessions.erase(consumerIt->second->TopicSessionId); Send(consumerIt->second->TopicSessionId, new NActors::TEvents::TEvPoisonPill()); if (topicSessionInfo.Sessions.empty()) { @@ -794,17 +798,6 @@ void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) { Metrics.ClientsCount->Set(Consumers.size()); } -void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) { - LOG_ROW_DISPATCHER_WARN("Session closed, event queue id " << ev->Get()->EventQueueId); - for (auto& [consumerKey, consumer] : Consumers) { - if (consumer->EventQueueId != ev->Get()->EventQueueId) { - continue; - } - DeleteConsumer(consumerKey); - break; - } -} - void TRowDispatcher::Handle(const TEvPrivate::TEvTryConnect::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvTryConnect to node id " << ev->Get()->NodeId); NodesTracker.TryConnect(ev->Get()->NodeId); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 1585571132..0680991ea9 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -179,6 +179,7 @@ private: bool InflightReconnect = false; TDuration ReconnectPeriod; const TString TopicPath; + const TString TopicPathPartition; const TString Endpoint; const TString Database; NActors::TActorId RowDispatcherActorId; @@ -317,6 +318,7 @@ TTopicSession::TTopicSession( const NYql::IPqGateway::TPtr& pqGateway, ui64 maxBufferSize) : TopicPath(topicPath) + , TopicPathPartition(TStringBuilder() << topicPath << "/" << partitionId) , Endpoint(endpoint) , Database(database) , RowDispatcherActorId(rowDispatcherActorId) @@ -336,7 +338,7 @@ void TTopicSession::Bootstrap() { Become(&TTopicSession::StateFunc); Metrics.Init(Counters, TopicPath, PartitionId); LogPrefix = LogPrefix + " " + SelfId().ToString() + " "; - LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << ", PartitionId " << PartitionId + LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << TopicPathPartition << ", Timeout " << Config.GetTimeoutBeforeStartSessionSec() << " sec, StatusPeriod " << Config.GetSendStatusPeriodSec() << " sec"); Y_ENSURE(Config.GetSendStatusPeriodSec() > 0); Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatisticToReadActor()); @@ -401,7 +403,7 @@ NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const N topicReadSettings.AppendPartitionIds(PartitionId); TInstant minTime = GetMinStartingMessageTimestamp(); - LOG_ROW_DISPATCHER_INFO("Create topic session, Path " << TopicPath + LOG_ROW_DISPATCHER_INFO("Create topic session, Path " << TopicPathPartition << ", StartingMessageTimestamp " << minTime << ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer()); @@ -504,7 +506,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&) void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) { Metrics.ReconnectRate->Inc(); TInstant minTime = GetMinStartingMessageTimestamp(); - LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, Path " << TopicPath + LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, " << TopicPathPartition << ", StartingMessageTimestamp " << minTime << ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer()); StopReadSession(); @@ -582,7 +584,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE } void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClosedEvent& ev) { - TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPath << "\" was closed: " << ev.DebugString(); + TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPathPartition << "\" was closed: " << ev.DebugString(); LOG_ROW_DISPATCHER_DEBUG(message); NYql::TIssues issues; issues.AddIssue(message); @@ -856,8 +858,8 @@ void TTopicSession::AddDataToClient(TClientsInfo& info, ui64 offset, const TStri } void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) { - LOG_ROW_DISPATCHER_DEBUG("TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << - " partitionId " << ev->Get()->Record.GetPartitionId()); + LOG_ROW_DISPATCHER_DEBUG("TEvStopSession from " << ev->Sender << " topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << + " partitionId " << ev->Get()->Record.GetPartitionId() << " clients count " << Clients.size()); auto it = Clients.find(ev->Sender); if (it == Clients.end()) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index fe4d2ed035..2d30d74b84 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -145,11 +145,11 @@ public: Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release())); } - void MockMessageBatch(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) { + void MockMessageBatch(ui64 partitionId, TActorId topicSessionId, TActorId readActorId, ui64 generation) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvMessageBatch>(); event->Record.SetPartitionId(partitionId); event->ReadActorId = readActorId; - Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, 1)); + Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, generation)); } void MockSessionError(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) { @@ -159,10 +159,15 @@ public: Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release())); } - void MockGetNextBatch(ui64 partitionId, TActorId readActorId) { + void MockGetNextBatch(ui64 partitionId, TActorId readActorId, ui64 generation) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>(); event->Record.SetPartitionId(partitionId); - Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1)); + Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, generation)); + } + + void MockUndelivered(TActorId readActorId, ui64 generation) { + auto event = std::make_unique<NActors::TEvents::TEvUndelivered>(0, NActors::TEvents::TEvUndelivered::ReasonActorUnknown); + Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, generation)); } void ExpectStartSession(NActors::TActorId actorId) { @@ -210,14 +215,14 @@ public: return actorId; } - void ProcessData(NActors::TActorId readActorId, ui64 partId, NActors::TActorId topicSessionActorId) { + void ProcessData(NActors::TActorId readActorId, ui64 partId, NActors::TActorId topicSessionActorId, ui64 generation = 1) { MockNewDataArrived(partId, topicSessionActorId, readActorId); ExpectNewDataArrived(readActorId, partId); - MockGetNextBatch(partId, readActorId); + MockGetNextBatch(partId, readActorId, generation); ExpectGetNextBatch(topicSessionActorId, partId); - MockMessageBatch(partId, topicSessionActorId, readActorId); + MockMessageBatch(partId, topicSessionActorId, readActorId, generation); ExpectMessageBatch(readActorId); } @@ -351,7 +356,7 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { ExpectStopSession(topicSession4, PartitionId1); // Ignore data after StopSession - MockMessageBatch(PartitionId1, topicSession4, ReadActorId2); + MockMessageBatch(PartitionId1, topicSession4, ReadActorId2, 1); } Y_UNIT_TEST_F(ReinitConsumerIfNewGeneration, TFixture) { @@ -368,6 +373,27 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { MockAddSession(Source1, PartitionId0, ReadActorId1, 2); ExpectStartSessionAck(ReadActorId1, 2); } + + Y_UNIT_TEST_F(HandleTEvUndelivered, TFixture) { + MockAddSession(Source1, PartitionId0, ReadActorId1, 1); + auto topicSession1 = ExpectRegisterTopicSession(); + ExpectStartSessionAck(ReadActorId1, 1); + ExpectStartSession(topicSession1); + + MockAddSession(Source1, PartitionId1, ReadActorId1, 2); + auto topicSession2 = ExpectRegisterTopicSession(); + ExpectStartSessionAck(ReadActorId1, 2); + ExpectStartSession(topicSession2); + + ProcessData(ReadActorId1, PartitionId0, topicSession1, 1); + ProcessData(ReadActorId1, PartitionId1, topicSession2, 2); + + MockUndelivered(ReadActorId1, 2); + ExpectStopSession(topicSession2, PartitionId1); + + MockUndelivered(ReadActorId1, 1); + ExpectStopSession(topicSession1, PartitionId0); + } } } diff --git a/ydb/library/yql/dq/actors/common/retry_queue.cpp b/ydb/library/yql/dq/actors/common/retry_queue.cpp index 49820b1585..dc02020a99 100644 --- a/ydb/library/yql/dq/actors/common/retry_queue.cpp +++ b/ydb/library/yql/dq/actors/common/retry_queue.cpp @@ -64,22 +64,20 @@ void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) { } } -bool TRetryEventsQueue::HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { - if (ev->Sender == RecipientId && ev->Get()->Reason == NActors::TEvents::TEvUndelivered::Disconnected) { +TRetryEventsQueue::ESessionState TRetryEventsQueue::HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { + if (ev->Sender != RecipientId) { + return ESessionState::WrongSession; + } + if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::Disconnected) { Connected = false; ScheduleRetry(); - return true; + return ESessionState::Disconnected; } - if (ev->Sender == RecipientId && ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { - if (KeepAlive) { - NActors::TActivationContext::Send( - new NActors::IEventHandle(SelfId, SelfId, new TEvRetryQueuePrivate::TEvSessionClosed(EventQueueId), 0, 0)); - } - return true; + if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { + return ESessionState::SessionClosed; } - - return false; + return ESessionState::Disconnected; } void TRetryEventsQueue::Retry() { diff --git a/ydb/library/yql/dq/actors/common/retry_queue.h b/ydb/library/yql/dq/actors/common/retry_queue.h index 69f7d35332..4ebf0a7353 100644 --- a/ydb/library/yql/dq/actors/common/retry_queue.h +++ b/ydb/library/yql/dq/actors/common/retry_queue.h @@ -19,7 +19,6 @@ struct TEvRetryQueuePrivate { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvRetry = EvBegin, EvHeartbeat, - EvSessionClosed, // recipientId does not exist anymore EvEnd }; @@ -41,14 +40,6 @@ struct TEvRetryQueuePrivate { const ui64 EventQueueId; }; - - struct TEvSessionClosed : NActors::TEventLocal<TEvSessionClosed, EvSessionClosed> { - explicit TEvSessionClosed(ui64 eventQueueId) - : EventQueueId(eventQueueId) - { } - const ui64 EventQueueId; - }; - static constexpr size_t UNCONFIRMED_EVENTS_COUNT_LIMIT = 10000; }; @@ -71,6 +62,12 @@ concept TProtobufEventWithTransportMeta = TProtobufEvent<T> && THasTransportMeta class TRetryEventsQueue { public: + enum class ESessionState{ + WrongSession, // event RecipientId != Sender (event is not from this queue) + Disconnected, + SessionClosed // recipientId does not exist anymore + }; + class IRetryableEvent : public TSimpleRefCount<IRetryableEvent> { public: using TPtr = TIntrusivePtr<IRetryableEvent>; @@ -148,7 +145,7 @@ public: void OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe = true, bool connected = false); void HandleNodeConnected(ui32 nodeId); void HandleNodeDisconnected(ui32 nodeId); - bool HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev); + ESessionState HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev); void Retry(); bool Heartbeat(); diff --git a/ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp b/ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp index cee3eb8053..82184d982b 100644 --- a/ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp +++ b/ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp @@ -57,10 +57,6 @@ public: } } - void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ) { - Send(ClientEdgeActorId, new TEvPrivate::TEvDisconnect()); - } - void Handle(const TEvPrivate::TEvSend::TPtr& ) { EventsQueue.Send(new TEvDqCompute::TEvInjectCheckpoint()); } @@ -74,13 +70,14 @@ public: } void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { - EventsQueue.HandleUndelivered(ev); + if (EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) { + Send(ClientEdgeActorId, new TEvPrivate::TEvDisconnect()); + } } STRICT_STFUNC(StateFunc, hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle); hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle); - hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle); hFunc(TEvPrivate::TEvSend, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index 11c7feca17..7b3a62aead 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -407,7 +407,7 @@ void TDqComputeActorCheckpoints::Handle(NActors::TEvInterconnect::TEvNodeConnect void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { LOG_D("Handle undelivered"); - if (!EventsQueue.HandleUndelivered(ev)) { + if (EventsQueue.HandleUndelivered(ev) != NYql::NDq::TRetryEventsQueue::ESessionState::WrongSession) { LOG_E("TEvUndelivered: " << ev->Get()->SourceType); } } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 6abb7c04a0..81b1820193 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -232,7 +232,6 @@ public: void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev); void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&); void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&); - void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&); void Handle(NActors::TEvents::TEvPong::TPtr& ev); void Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr&); void Handle(TEvPrivate::TEvPrintState::TPtr&); @@ -253,7 +252,6 @@ public: hFunc(NActors::TEvents::TEvUndelivered, Handle); hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle); hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle); - hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle); hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle); hFunc(TEvPrivate::TEvPrintState, Handle); hFunc(TEvPrivate::TEvProcessState, Handle); @@ -690,10 +688,13 @@ void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected:: } void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { - SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString()); + SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString()); Counters.Undelivered++; for (auto& [partitionId, sessionInfo] : Sessions) { - sessionInfo.EventsQueue.HandleUndelivered(ev); + if (sessionInfo.EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) { + ReInit(TStringBuilder() << "Session closed, partition id " << sessionInfo.PartitionId); + break; + } } if (CoordinatorActorId && *CoordinatorActorId == ev->Sender) { @@ -759,17 +760,6 @@ std::pair<NUdf::TUnboxedValuePod, i64> TDqPqRdReadActor::CreateItem(const TStrin return std::make_pair(item, usedSpace); } -void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) { - Counters.SessionClosed++; - if (State != EState::STARTED) { - if (!Sessions.empty()) { - Stop(TStringBuilder() << "Internal error: wrong state on TEvSessionClosed, session size " << Sessions.size() << " state " << static_cast<ui64>(State)); - } - return; - } - ReInit(TStringBuilder() << "Session closed, event queue id " << ev->Get()->EventQueueId); -} - void TDqPqRdReadActor::Handle(NActors::TEvents::TEvPong::TPtr& ev) { SRC_LOG_T("TEvPong from " << ev->Sender); Counters.Pong++; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp index 0c2ede403d..6bee49604b 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp @@ -419,7 +419,7 @@ private: void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { LOG_T("TS3ReadActor", "Handle undelivered FileQueue "); - if (!FileQueueEvents.HandleUndelivered(ev)) { + if (FileQueueEvents.HandleUndelivered(ev) != NYql::NDq::TRetryEventsQueue::ESessionState::WrongSession) { TIssues issues{TIssue{TStringBuilder() << "FileQueue was lost"}}; Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::UNAVAILABLE)); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index adf840443f..2163816fc4 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1831,7 +1831,7 @@ private: void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { LOG_T("TS3StreamReadActor", "Handle undelivered FileQueue "); - if (!FileQueueEvents.HandleUndelivered(ev)) { + if (FileQueueEvents.HandleUndelivered(ev) != NYql::NDq::TRetryEventsQueue::ESessionState::WrongSession) { TIssues issues{TIssue{TStringBuilder() << "FileQueue was lost"}}; Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::UNAVAILABLE)); } diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index 27f9f785b3..83fc68b55d 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -419,10 +419,8 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { MockCoordinatorChanged(Coordinator2Id); auto req = ExpectCoordinatorRequest(Coordinator2Id); - CaSetup->Execute([&](TFakeActor& actor) { - auto event = new NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed(0); - CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, LocalRowDispatcherId, event)); - }); + MockUndelivered(); + MockCoordinatorResult(RowDispatcher1, req->Cookie); MockCoordinatorResult(RowDispatcher1, req->Cookie); ExpectStartSession(2, RowDispatcher1, 2); |