aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-11-26 13:06:57 +0300
committerGitHub <noreply@github.com>2024-11-26 13:06:57 +0300
commitbf8b612434cc18322ce33d127a269434606cdac8 (patch)
treeb4c4164629aa80ffed1ffd3516fc8b3792375f99
parentf1aa57bba470e90832ca25c102c348e36145a766 (diff)
downloadydb-bf8b612434cc18322ce33d127a269434606cdac8.tar.gz
YQ-3904 Remove TEvSessionClosed and check generation (#11930)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/coordinator.cpp22
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp29
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp14
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp42
-rw-r--r--ydb/library/yql/dq/actors/common/retry_queue.cpp20
-rw-r--r--ydb/library/yql/dq/actors/common/retry_queue.h17
-rw-r--r--ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp9
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp20
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp2
-rw-r--r--ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp6
12 files changed, 102 insertions, 83 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
index cc5bcf5da9..385fad09b7 100644
--- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
@@ -38,8 +38,20 @@ struct TCoordinatorMetrics {
::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode;
};
+struct TEvPrivate {
+ enum EEv : ui32 {
+ EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvPrintState = EvBegin,
+ EvEnd
+ };
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
+ struct TEvPrintState : public NActors::TEventLocal<TEvPrintState, EvPrintState> {};
+};
+
class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
+ const ui64 PrintStatePeriodSec = 300;
+
struct TPartitionKey {
TString Endpoint;
TString Database;
@@ -167,6 +179,7 @@ public:
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev);
+ void Handle(TEvPrivate::TEvPrintState::TPtr&);
STRICT_STFUNC(
StateFunc, {
@@ -176,6 +189,7 @@ public:
hFunc(NActors::TEvents::TEvUndelivered, Handle);
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle);
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorRequest, Handle);
+ hFunc(TEvPrivate::TEvPrintState, Handle);
})
private:
@@ -209,6 +223,7 @@ TActorCoordinator::TActorCoordinator(
void TActorCoordinator::Bootstrap() {
Become(&TActorCoordinator::StateFunc);
Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
+ Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());
LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId());
auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId()));
Metrics.IsActive = nodeGroup->GetCounter("IsActive");
@@ -432,8 +447,6 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC
LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << readActorId);
Send(readActorId, response.release(), IEventHandle::FlagTrackDelivery, request.Cookie);
- PrintInternalState();
-
return true;
}
@@ -447,6 +460,11 @@ void TActorCoordinator::UpdatePendingReadActors() {
}
}
+void TActorCoordinator::Handle(TEvPrivate::TEvPrintState::TPtr&) {
+ Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());
+ PrintInternalState();
+}
+
} // namespace
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index 337ceea8ec..19d2302188 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -355,7 +355,6 @@ public:
void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
void Handle(const TEvPrivate::TEvTryConnect::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
- void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
void Handle(const NMon::TEvHttpInfo::TPtr&);
@@ -385,7 +384,6 @@ public:
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
hFunc(TEvPrivate::TEvTryConnect, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
- hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
@@ -472,9 +470,15 @@ void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TP
}
void TRowDispatcher::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- LOG_ROW_DISPATCHER_DEBUG("TEvUndelivered, ev: " << ev->Get()->ToString() << ", reason " << ev->Get()->Reason);
- for (auto& [actorId, consumer] : Consumers) {
- consumer->EventsQueue.HandleUndelivered(ev);
+ LOG_ROW_DISPATCHER_DEBUG("TEvUndelivered, from " << ev->Sender << ", reason " << ev->Get()->Reason);
+ for (auto& [key, consumer] : Consumers) {
+ if (ev->Cookie != consumer->Generation) { // Several partitions in one read_actor have different Generation.
+ continue;
+ }
+ if (consumer->EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) {
+ DeleteConsumer(key);
+ break;
+ }
}
}
@@ -761,7 +765,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) {
auto consumerIt = Consumers.find(key);
if (consumerIt == Consumers.end()) {
- LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer )DeleteConsumer, " << " read actor id " << key.ReadActorId << " part id " << key.PartitionId);
+ LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer) DeleteConsumer, " << " read actor id " << key.ReadActorId << " part id " << key.PartitionId);
return;
}
@@ -782,7 +786,7 @@ void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) {
Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId));
sessionInfo.Consumers.erase(consumer->ReadActorId);
if (sessionInfo.Consumers.empty()) {
- LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill");
+ LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << consumerIt->second->TopicSessionId);
topicSessionInfo.Sessions.erase(consumerIt->second->TopicSessionId);
Send(consumerIt->second->TopicSessionId, new NActors::TEvents::TEvPoisonPill());
if (topicSessionInfo.Sessions.empty()) {
@@ -794,17 +798,6 @@ void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) {
Metrics.ClientsCount->Set(Consumers.size());
}
-void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) {
- LOG_ROW_DISPATCHER_WARN("Session closed, event queue id " << ev->Get()->EventQueueId);
- for (auto& [consumerKey, consumer] : Consumers) {
- if (consumer->EventQueueId != ev->Get()->EventQueueId) {
- continue;
- }
- DeleteConsumer(consumerKey);
- break;
- }
-}
-
void TRowDispatcher::Handle(const TEvPrivate::TEvTryConnect::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("TEvTryConnect to node id " << ev->Get()->NodeId);
NodesTracker.TryConnect(ev->Get()->NodeId);
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index 1585571132..0680991ea9 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -179,6 +179,7 @@ private:
bool InflightReconnect = false;
TDuration ReconnectPeriod;
const TString TopicPath;
+ const TString TopicPathPartition;
const TString Endpoint;
const TString Database;
NActors::TActorId RowDispatcherActorId;
@@ -317,6 +318,7 @@ TTopicSession::TTopicSession(
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize)
: TopicPath(topicPath)
+ , TopicPathPartition(TStringBuilder() << topicPath << "/" << partitionId)
, Endpoint(endpoint)
, Database(database)
, RowDispatcherActorId(rowDispatcherActorId)
@@ -336,7 +338,7 @@ void TTopicSession::Bootstrap() {
Become(&TTopicSession::StateFunc);
Metrics.Init(Counters, TopicPath, PartitionId);
LogPrefix = LogPrefix + " " + SelfId().ToString() + " ";
- LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << ", PartitionId " << PartitionId
+ LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << TopicPathPartition
<< ", Timeout " << Config.GetTimeoutBeforeStartSessionSec() << " sec, StatusPeriod " << Config.GetSendStatusPeriodSec() << " sec");
Y_ENSURE(Config.GetSendStatusPeriodSec() > 0);
Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatisticToReadActor());
@@ -401,7 +403,7 @@ NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const N
topicReadSettings.AppendPartitionIds(PartitionId);
TInstant minTime = GetMinStartingMessageTimestamp();
- LOG_ROW_DISPATCHER_INFO("Create topic session, Path " << TopicPath
+ LOG_ROW_DISPATCHER_INFO("Create topic session, Path " << TopicPathPartition
<< ", StartingMessageTimestamp " << minTime
<< ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer());
@@ -504,7 +506,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&)
void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) {
Metrics.ReconnectRate->Inc();
TInstant minTime = GetMinStartingMessageTimestamp();
- LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, Path " << TopicPath
+ LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, " << TopicPathPartition
<< ", StartingMessageTimestamp " << minTime
<< ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer());
StopReadSession();
@@ -582,7 +584,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE
}
void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClosedEvent& ev) {
- TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPath << "\" was closed: " << ev.DebugString();
+ TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPathPartition << "\" was closed: " << ev.DebugString();
LOG_ROW_DISPATCHER_DEBUG(message);
NYql::TIssues issues;
issues.AddIssue(message);
@@ -856,8 +858,8 @@ void TTopicSession::AddDataToClient(TClientsInfo& info, ui64 offset, const TStri
}
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
- LOG_ROW_DISPATCHER_DEBUG("TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
- " partitionId " << ev->Get()->Record.GetPartitionId());
+ LOG_ROW_DISPATCHER_DEBUG("TEvStopSession from " << ev->Sender << " topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
+ " partitionId " << ev->Get()->Record.GetPartitionId() << " clients count " << Clients.size());
auto it = Clients.find(ev->Sender);
if (it == Clients.end()) {
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
index fe4d2ed035..2d30d74b84 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
@@ -145,11 +145,11 @@ public:
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release()));
}
- void MockMessageBatch(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) {
+ void MockMessageBatch(ui64 partitionId, TActorId topicSessionId, TActorId readActorId, ui64 generation) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvMessageBatch>();
event->Record.SetPartitionId(partitionId);
event->ReadActorId = readActorId;
- Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, 1));
+ Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, generation));
}
void MockSessionError(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) {
@@ -159,10 +159,15 @@ public:
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release()));
}
- void MockGetNextBatch(ui64 partitionId, TActorId readActorId) {
+ void MockGetNextBatch(ui64 partitionId, TActorId readActorId, ui64 generation) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
event->Record.SetPartitionId(partitionId);
- Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
+ Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, generation));
+ }
+
+ void MockUndelivered(TActorId readActorId, ui64 generation) {
+ auto event = std::make_unique<NActors::TEvents::TEvUndelivered>(0, NActors::TEvents::TEvUndelivered::ReasonActorUnknown);
+ Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, generation));
}
void ExpectStartSession(NActors::TActorId actorId) {
@@ -210,14 +215,14 @@ public:
return actorId;
}
- void ProcessData(NActors::TActorId readActorId, ui64 partId, NActors::TActorId topicSessionActorId) {
+ void ProcessData(NActors::TActorId readActorId, ui64 partId, NActors::TActorId topicSessionActorId, ui64 generation = 1) {
MockNewDataArrived(partId, topicSessionActorId, readActorId);
ExpectNewDataArrived(readActorId, partId);
- MockGetNextBatch(partId, readActorId);
+ MockGetNextBatch(partId, readActorId, generation);
ExpectGetNextBatch(topicSessionActorId, partId);
- MockMessageBatch(partId, topicSessionActorId, readActorId);
+ MockMessageBatch(partId, topicSessionActorId, readActorId, generation);
ExpectMessageBatch(readActorId);
}
@@ -351,7 +356,7 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
ExpectStopSession(topicSession4, PartitionId1);
// Ignore data after StopSession
- MockMessageBatch(PartitionId1, topicSession4, ReadActorId2);
+ MockMessageBatch(PartitionId1, topicSession4, ReadActorId2, 1);
}
Y_UNIT_TEST_F(ReinitConsumerIfNewGeneration, TFixture) {
@@ -368,6 +373,27 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
MockAddSession(Source1, PartitionId0, ReadActorId1, 2);
ExpectStartSessionAck(ReadActorId1, 2);
}
+
+ Y_UNIT_TEST_F(HandleTEvUndelivered, TFixture) {
+ MockAddSession(Source1, PartitionId0, ReadActorId1, 1);
+ auto topicSession1 = ExpectRegisterTopicSession();
+ ExpectStartSessionAck(ReadActorId1, 1);
+ ExpectStartSession(topicSession1);
+
+ MockAddSession(Source1, PartitionId1, ReadActorId1, 2);
+ auto topicSession2 = ExpectRegisterTopicSession();
+ ExpectStartSessionAck(ReadActorId1, 2);
+ ExpectStartSession(topicSession2);
+
+ ProcessData(ReadActorId1, PartitionId0, topicSession1, 1);
+ ProcessData(ReadActorId1, PartitionId1, topicSession2, 2);
+
+ MockUndelivered(ReadActorId1, 2);
+ ExpectStopSession(topicSession2, PartitionId1);
+
+ MockUndelivered(ReadActorId1, 1);
+ ExpectStopSession(topicSession1, PartitionId0);
+ }
}
}
diff --git a/ydb/library/yql/dq/actors/common/retry_queue.cpp b/ydb/library/yql/dq/actors/common/retry_queue.cpp
index 49820b1585..dc02020a99 100644
--- a/ydb/library/yql/dq/actors/common/retry_queue.cpp
+++ b/ydb/library/yql/dq/actors/common/retry_queue.cpp
@@ -64,22 +64,20 @@ void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
}
}
-bool TRetryEventsQueue::HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- if (ev->Sender == RecipientId && ev->Get()->Reason == NActors::TEvents::TEvUndelivered::Disconnected) {
+TRetryEventsQueue::ESessionState TRetryEventsQueue::HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
+ if (ev->Sender != RecipientId) {
+ return ESessionState::WrongSession;
+ }
+ if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::Disconnected) {
Connected = false;
ScheduleRetry();
- return true;
+ return ESessionState::Disconnected;
}
- if (ev->Sender == RecipientId && ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) {
- if (KeepAlive) {
- NActors::TActivationContext::Send(
- new NActors::IEventHandle(SelfId, SelfId, new TEvRetryQueuePrivate::TEvSessionClosed(EventQueueId), 0, 0));
- }
- return true;
+ if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) {
+ return ESessionState::SessionClosed;
}
-
- return false;
+ return ESessionState::Disconnected;
}
void TRetryEventsQueue::Retry() {
diff --git a/ydb/library/yql/dq/actors/common/retry_queue.h b/ydb/library/yql/dq/actors/common/retry_queue.h
index 69f7d35332..4ebf0a7353 100644
--- a/ydb/library/yql/dq/actors/common/retry_queue.h
+++ b/ydb/library/yql/dq/actors/common/retry_queue.h
@@ -19,7 +19,6 @@ struct TEvRetryQueuePrivate {
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvRetry = EvBegin,
EvHeartbeat,
- EvSessionClosed, // recipientId does not exist anymore
EvEnd
};
@@ -41,14 +40,6 @@ struct TEvRetryQueuePrivate {
const ui64 EventQueueId;
};
-
- struct TEvSessionClosed : NActors::TEventLocal<TEvSessionClosed, EvSessionClosed> {
- explicit TEvSessionClosed(ui64 eventQueueId)
- : EventQueueId(eventQueueId)
- { }
- const ui64 EventQueueId;
- };
-
static constexpr size_t UNCONFIRMED_EVENTS_COUNT_LIMIT = 10000;
};
@@ -71,6 +62,12 @@ concept TProtobufEventWithTransportMeta = TProtobufEvent<T> && THasTransportMeta
class TRetryEventsQueue {
public:
+ enum class ESessionState{
+ WrongSession, // event RecipientId != Sender (event is not from this queue)
+ Disconnected,
+ SessionClosed // recipientId does not exist anymore
+ };
+
class IRetryableEvent : public TSimpleRefCount<IRetryableEvent> {
public:
using TPtr = TIntrusivePtr<IRetryableEvent>;
@@ -148,7 +145,7 @@ public:
void OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe = true, bool connected = false);
void HandleNodeConnected(ui32 nodeId);
void HandleNodeDisconnected(ui32 nodeId);
- bool HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev);
+ ESessionState HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Retry();
bool Heartbeat();
diff --git a/ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp b/ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp
index cee3eb8053..82184d982b 100644
--- a/ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp
+++ b/ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp
@@ -57,10 +57,6 @@ public:
}
}
- void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ) {
- Send(ClientEdgeActorId, new TEvPrivate::TEvDisconnect());
- }
-
void Handle(const TEvPrivate::TEvSend::TPtr& ) {
EventsQueue.Send(new TEvDqCompute::TEvInjectCheckpoint());
}
@@ -74,13 +70,14 @@ public:
}
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- EventsQueue.HandleUndelivered(ev);
+ if (EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) {
+ Send(ClientEdgeActorId, new TEvPrivate::TEvDisconnect());
+ }
}
STRICT_STFUNC(StateFunc,
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
- hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
hFunc(TEvPrivate::TEvSend, Handle);
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index 11c7feca17..7b3a62aead 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -407,7 +407,7 @@ void TDqComputeActorCheckpoints::Handle(NActors::TEvInterconnect::TEvNodeConnect
void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_D("Handle undelivered");
- if (!EventsQueue.HandleUndelivered(ev)) {
+ if (EventsQueue.HandleUndelivered(ev) != NYql::NDq::TRetryEventsQueue::ESessionState::WrongSession) {
LOG_E("TEvUndelivered: " << ev->Get()->SourceType);
}
}
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
index 6abb7c04a0..81b1820193 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
@@ -232,7 +232,6 @@ public:
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
- void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
void Handle(NActors::TEvents::TEvPong::TPtr& ev);
void Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr&);
void Handle(TEvPrivate::TEvPrintState::TPtr&);
@@ -253,7 +252,6 @@ public:
hFunc(NActors::TEvents::TEvUndelivered, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
- hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
hFunc(TEvPrivate::TEvPrintState, Handle);
hFunc(TEvPrivate::TEvProcessState, Handle);
@@ -690,10 +688,13 @@ void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::
}
void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString());
+ SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString());
Counters.Undelivered++;
for (auto& [partitionId, sessionInfo] : Sessions) {
- sessionInfo.EventsQueue.HandleUndelivered(ev);
+ if (sessionInfo.EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) {
+ ReInit(TStringBuilder() << "Session closed, partition id " << sessionInfo.PartitionId);
+ break;
+ }
}
if (CoordinatorActorId && *CoordinatorActorId == ev->Sender) {
@@ -759,17 +760,6 @@ std::pair<NUdf::TUnboxedValuePod, i64> TDqPqRdReadActor::CreateItem(const TStrin
return std::make_pair(item, usedSpace);
}
-void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) {
- Counters.SessionClosed++;
- if (State != EState::STARTED) {
- if (!Sessions.empty()) {
- Stop(TStringBuilder() << "Internal error: wrong state on TEvSessionClosed, session size " << Sessions.size() << " state " << static_cast<ui64>(State));
- }
- return;
- }
- ReInit(TStringBuilder() << "Session closed, event queue id " << ev->Get()->EventQueueId);
-}
-
void TDqPqRdReadActor::Handle(NActors::TEvents::TEvPong::TPtr& ev) {
SRC_LOG_T("TEvPong from " << ev->Sender);
Counters.Pong++;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp
index 0c2ede403d..6bee49604b 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp
@@ -419,7 +419,7 @@ private:
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_T("TS3ReadActor", "Handle undelivered FileQueue ");
- if (!FileQueueEvents.HandleUndelivered(ev)) {
+ if (FileQueueEvents.HandleUndelivered(ev) != NYql::NDq::TRetryEventsQueue::ESessionState::WrongSession) {
TIssues issues{TIssue{TStringBuilder() << "FileQueue was lost"}};
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::UNAVAILABLE));
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index adf840443f..2163816fc4 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1831,7 +1831,7 @@ private:
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_T("TS3StreamReadActor", "Handle undelivered FileQueue ");
- if (!FileQueueEvents.HandleUndelivered(ev)) {
+ if (FileQueueEvents.HandleUndelivered(ev) != NYql::NDq::TRetryEventsQueue::ESessionState::WrongSession) {
TIssues issues{TIssue{TStringBuilder() << "FileQueue was lost"}};
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::UNAVAILABLE));
}
diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
index 27f9f785b3..83fc68b55d 100644
--- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
+++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
@@ -419,10 +419,8 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
MockCoordinatorChanged(Coordinator2Id);
auto req = ExpectCoordinatorRequest(Coordinator2Id);
- CaSetup->Execute([&](TFakeActor& actor) {
- auto event = new NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed(0);
- CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, LocalRowDispatcherId, event));
- });
+ MockUndelivered();
+
MockCoordinatorResult(RowDispatcher1, req->Cookie);
MockCoordinatorResult(RowDispatcher1, req->Cookie);
ExpectStartSession(2, RowDispatcher1, 2);