aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2023-05-03 12:58:34 +0300
committeralexnick <alexnick@ydb.tech>2023-05-03 12:58:34 +0300
commit6d2b034c88f51bebae7f6dff359e503174d39ffd (patch)
tree22c1f12d725a5f304678bef0cd46bdb2710da4f0
parentd61cfa00fcd6078cabd02c099e257ec215c496c5 (diff)
downloadydb-6d2b034c88f51bebae7f6dff359e503174d39ffd.tar.gz
fix for deadlock
fix for deadlock
-rw-r--r--ydb/core/persqueue/mirrorer.cpp11
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp57
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h74
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp191
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp13
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp26
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h2
8 files changed, 226 insertions, 152 deletions
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp
index 3798fea118..739cfdd161 100644
--- a/ydb/core/persqueue/mirrorer.cpp
+++ b/ydb/core/persqueue/mirrorer.cpp
@@ -89,6 +89,8 @@ void TMirrorer::StartInit(const TActorContext& ctx) {
void TMirrorer::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " killed");
+ if (ReadSession)
+ ReadSession->Close();
ReadSession = nullptr;
PartitionStream = nullptr;
CredentialsProvider = nullptr;
@@ -441,7 +443,9 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
Queue.pop_back();
}
}
-
+ if (ReadSession) {
+ ReadSession->Close();
+ }
ReadSession.reset();
PartitionStream.Reset();
@@ -459,6 +463,9 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
});
try {
+ if (ReadSession) {
+ ReadSession->Close();
+ }
ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT, log);
} catch(...) {
ProcessError(ctx, TStringBuilder() << "got an exception during the creation read session: " << CurrentExceptionMessage());
@@ -514,6 +521,8 @@ void TMirrorer::AddMessagesToQueue(TVector<TPersQueueReadEvent::TDataReceivedEve
void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) {
LastInitStageTimestamp = ctx.Now();
+ if (ReadSession)
+ ReadSession->Close();
ReadSession = nullptr;
PartitionStream = nullptr;
ReadFuturesInFlight = 0;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
index 7e2c26b66c..21179740da 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
@@ -407,6 +407,10 @@ public:
}
}
+ bool IsClosed() {
+ return Closed;
+ }
+
protected:
const TSettings& Settings;
TWaiter Waiter;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index 5b3abb3b48..85c6e87589 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -57,7 +57,14 @@ TReadSession::TReadSession(const TReadSessionSettings& settings,
}
TReadSession::~TReadSession() {
- Abort(EStatus::ABORTED, "Aborted");
+ {
+ TDeferredActions<true> deferred;
+ NYql::TIssues issues;
+ issues.AddIssue("Aborted");
+ EventsQueue->Close(TSessionClosedEvent(EStatus::ABORTED, std::move(issues)), deferred);
+ }
+
+ Abort();
ClearAllEvents();
if (Tracker) {
@@ -76,7 +83,6 @@ Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClus
}
void TReadSession::Start() {
- ErrorHandler = MakeIntrusive<TErrorHandler<true>>(weak_from_this());
Tracker = std::make_shared<TImplTracker>();
EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, weak_from_this(), Tracker);
@@ -107,7 +113,11 @@ bool TReadSession::ValidateSettings() {
}
if (issues) {
- Abort(EStatus::BAD_REQUEST, MakeIssueWithSubIssues("Invalid read session settings", issues));
+ {
+ TDeferredActions<true> deferred;
+ EventsQueue->Close(TSessionClosedEvent(EStatus::BAD_REQUEST, MakeIssueWithSubIssues("Invalid read session settings", issues)), deferred);
+ }
+ Abort();
return false;
} else {
return true;
@@ -124,7 +134,7 @@ void TReadSession::StartClusterDiscovery() {
ClusterDiscoveryDelayContext = nullptr;
}
- auto extractor = [errorHandler = ErrorHandler, self = weak_from_this()]
+ auto extractor = [self = weak_from_this()]
(google::protobuf::Any* any, TPlainStatus status) mutable {
auto selfShared = self.lock();
if (!selfShared) {
@@ -203,7 +213,6 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) {
Log,
subclient->CreateReadSessionConnectionProcessorFactory(),
EventsQueue,
- ErrorHandler,
context,
partitionStreamIdStart++,
clusterSessionsCount,
@@ -417,12 +426,13 @@ bool TReadSession::Close(TDuration timeout) {
return result;
}
-void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<true>& deferred) {
+void TReadSession::AbortImpl(TDeferredActions<true>&) {
+
Y_VERIFY(Lock.IsLocked());
if (!Aborting) {
+ Y_VERIFY(EventsQueue->IsClosed());
Aborting = true;
- LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
if (ClusterDiscoveryDelayContext) {
ClusterDiscoveryDelayContext->Cancel();
ClusterDiscoveryDelayContext.reset();
@@ -436,14 +446,16 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<
sessionInfo.Session->Abort();
}
}
- EventsQueue->Close(std::move(closeEvent), deferred);
}
}
void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred) {
Y_VERIFY(Lock.IsLocked());
+ auto closeEvent = TSessionClosedEvent(statusCode, std::move(issues));
+ LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
- AbortImpl(TSessionClosedEvent(statusCode, std::move(issues)), deferred);
+ EventsQueue->Close(std::move(closeEvent), deferred);
+ AbortImpl(deferred);
}
void TReadSession::AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred) {
@@ -451,23 +463,14 @@ void TReadSession::AbortImpl(EStatus statusCode, const TString& message, TDeferr
NYql::TIssues issues;
issues.AddIssue(message);
- AbortImpl(statusCode, std::move(issues), deferred);
-}
-void TReadSession::Abort(EStatus statusCode, NYql::TIssues&& issues) {
- Abort(TSessionClosedEvent(statusCode, std::move(issues)));
-}
-
-void TReadSession::Abort(EStatus statusCode, const TString& message) {
- NYql::TIssues issues;
- issues.AddIssue(message);
- Abort(statusCode, std::move(issues));
+ AbortImpl(statusCode, std::move(issues), deferred);
}
-void TReadSession::Abort(TSessionClosedEvent&& closeEvent) {
+void TReadSession::Abort() {
TDeferredActions<true> deferred;
with_lock (Lock) {
- AbortImpl(std::move(closeEvent), deferred);
+ AbortImpl(EStatus::ABORTED, "Aborted", deferred);
}
}
@@ -480,11 +483,19 @@ NThreading::TFuture<void> TReadSession::WaitEvent() {
}
TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) {
- return EventsQueue->GetEvents(block, maxEventsCount, maxByteSize);
+ auto res = EventsQueue->GetEvents(block, maxEventsCount, maxByteSize);
+ if (EventsQueue->IsClosed()) {
+ Abort();
+ }
+ return res;
}
TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxByteSize) {
- return EventsQueue->GetEvent(block, maxByteSize);
+ auto res = EventsQueue->GetEvent(block, maxByteSize);
+ if (EventsQueue->IsClosed()) {
+ Abort();
+ }
+ return res;
}
void TReadSession::StopReadingData() {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index d3fa055a6c..c4acb6fcab 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -149,11 +149,11 @@ public:
void DeferReadFromProcessor(const typename IProcessor<UseMigrationProtocol>::TPtr& processor, TServerMessage<UseMigrationProtocol>* dst, typename IProcessor<UseMigrationProtocol>::TReadCallback callback);
void DeferStartExecutorTask(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction task);
- void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent);
- void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, NYql::TIssues&& issues);
- void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, const TString& message);
- void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status);
- void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status);
+ void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent);
+ void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, NYql::TIssues&& issues);
+ void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, const TString& message);
+ void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status);
+ void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status);
void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session);
void DeferSignalWaiter(TWaiter&& waiter);
void DeferDestroyDecompressionInfos(std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>>&& infos);
@@ -178,13 +178,12 @@ private:
std::vector<std::pair<typename IAExecutor<UseMigrationProtocol>::TPtr, typename IAExecutor<UseMigrationProtocol>::TFunction>> ExecutorsTasks;
// Abort session.
- typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler;
TMaybe<TASessionClosedEvent<UseMigrationProtocol>> SessionClosedEvent;
// Waiters.
std::vector<TWaiter> Waiters;
- // Reconnection.
+ // Reconnection and abort.
std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
TPlainStatus ReconnectionStatus;
@@ -569,13 +568,11 @@ public:
ui64 partitionId,
ui64 assignId,
ui64 readOffset,
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession,
- typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler)
+ std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession)
: Key{topicPath, cluster, partitionId}
, AssignId(assignId)
, FirstNotReadOffset(readOffset)
, Session(std::move(parentSession))
- , ErrorHandler(std::move(errorHandler))
{
TAPartitionStream<true>::PartitionStreamId = partitionStreamId;
TAPartitionStream<true>::TopicPath = std::move(topicPath);
@@ -591,13 +588,11 @@ public:
i64 partitionId,
i64 assignId,
i64 readOffset,
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession,
- typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler)
+ std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession)
: Key{topicPath, "", static_cast<ui64>(partitionId)}
, AssignId(static_cast<ui64>(assignId))
, FirstNotReadOffset(static_cast<ui64>(readOffset))
, Session(std::move(parentSession))
- , ErrorHandler(std::move(errorHandler))
{
TAPartitionStream<false>::PartitionSessionId = partitionStreamId;
TAPartitionStream<false>::TopicPath = std::move(topicPath);
@@ -667,9 +662,6 @@ public:
TReadSessionEventsQueue<UseMigrationProtocol>* queue,
TDeferredActions<UseMigrationProtocol>& deferred);
- const typename IErrorHandler<UseMigrationProtocol>::TPtr& GetErrorHandler() const {
- return ErrorHandler;
- }
ui64 GetMaxReadOffset() const {
return MaxReadOffset;
@@ -740,7 +732,6 @@ private:
ui64 AssignId;
ui64 FirstNotReadOffset;
std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
- typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler;
TRawPartitionStreamEventQueue<UseMigrationProtocol> EventsQueue;
ui64 MaxReadOffset = 0;
ui64 MaxCommittedOffset = 0;
@@ -779,9 +770,11 @@ public:
GetEvent(bool block = false,
size_t maxByteSize = std::numeric_limits<size_t>::max());
- void Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
+ bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
TWaiter waiter;
with_lock (TParent::Mutex) {
+ if (TParent::Closed)
+ return false;
TParent::CloseEvent = event;
TParent::Closed = true;
waiter = TWaiter(TParent::Waiter.ExtractPromise(), this);
@@ -789,8 +782,8 @@ public:
TReadSessionEventInfo<UseMigrationProtocol> info(event);
ApplyHandler(info, deferred);
-
- waiter.Signal();
+ deferred.DeferSignalWaiter(std::move(waiter));
+ return true;
}
bool TryApplyCallbackToEventImpl(typename TParent::TEvent& event,
@@ -802,14 +795,14 @@ public:
void GetDataEventCallbackSettings(size_t& maxMessagesBytes);
- // Push usual event.
- void PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+ // Push usual event. Returns false if queue is closed
+ bool PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
TDeferredActions<UseMigrationProtocol>& deferred);
- // Push data event.
- void PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
+ // Push data event. Returns false if queue is closed
+ bool PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
size_t batch,
size_t message,
TDataDecompressionInfoPtr<UseMigrationProtocol> parent,
@@ -946,7 +939,6 @@ public:
const TLog& log,
std::shared_ptr<IReadSessionConnectionProcessorFactory<UseMigrationProtocol>> connectionFactory,
std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> eventsQueue,
- typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler,
NGrpc::IQueueClientContextPtr clientContext,
ui64 partitionStreamIdStart,
ui64 partitionStreamIdStep,
@@ -961,9 +953,8 @@ public:
, PartitionStreamIdStep(partitionStreamIdStep)
, ConnectionFactory(std::move(connectionFactory))
, EventsQueue(std::move(eventsQueue))
- , ErrorHandler(std::move(errorHandler))
, ClientContext(std::move(clientContext))
- , CookieMapping(ErrorHandler)
+ , CookieMapping()
, ReadSizeBudget(GetCompressedDataSizeLimit())
, ReadSizeServerDelta(0)
, Tracker(std::move(tracker))
@@ -990,6 +981,21 @@ public:
void Abort();
void AbortImpl();
void Close(std::function<void()> callback);
+ void AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent);
+
+ void AbortSession(EStatus statusCode, NYql::TIssues&& issues) {
+ AbortSession(TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues)));
+ }
+
+ void AbortSession(EStatus statusCode, const TString& message) {
+ NYql::TIssues issues;
+ issues.AddIssue(message);
+ AbortSession(statusCode, std::move(issues));
+ }
+
+ void AbortSession(TPlainStatus&& status) {
+ AbortSession(TASessionClosedEvent<UseMigrationProtocol>(std::move(status)));
+ }
bool Reconnect(const TPlainStatus& status);
@@ -1106,8 +1112,7 @@ private:
size_t UncommittedMessagesLeft = 0;
};
- explicit TPartitionCookieMapping(typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler)
- : ErrorHandler(std::move(errorHandler))
+ explicit TPartitionCookieMapping()
{
}
@@ -1132,7 +1137,6 @@ private:
THashMap<typename TCookie::TKey, typename TCookie::TPtr, typename TCookie::TKey::THash> Cookies;
THashMap<std::pair<ui64, ui64>, typename TCookie::TPtr> UncommittedOffsetToCookie; // (Partition stream id, Offset) -> Cookie.
THashMultiMap<ui64, typename TCookie::TPtr> PartitionStreamIdToCookie;
- typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler;
size_t CommitInflight = 0; // Commit inflight to server.
};
@@ -1158,7 +1162,6 @@ private:
ui64 PartitionStreamIdStep;
std::shared_ptr<IReadSessionConnectionProcessorFactory<UseMigrationProtocol>> ConnectionFactory;
std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> EventsQueue;
- typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler;
NGrpc::IQueueClientContextPtr ClientContext; // Common client context.
NGrpc::IQueueClientContextPtr ConnectContext;
NGrpc::IQueueClientContextPtr ConnectTimeoutContext;
@@ -1258,7 +1261,7 @@ public:
void StopReadingData() override;
void ResumeReadingData() override;
- void Abort(TSessionClosedEvent&& closeEvent);
+ void Abort();
void ClearAllEvents();
@@ -1276,11 +1279,7 @@ private:
void RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions<true>& deferred);
void CreateClusterSessionsImpl(TDeferredActions<true>& deferred);
-
- // Shutdown.
- void Abort(EStatus statusCode, NYql::TIssues&& issues);
- void Abort(EStatus statusCode, const TString& message);
-
+ void AbortImpl(TDeferredActions<true>& deferred);
void AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<true>& deferred);
void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred);
void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred);
@@ -1298,7 +1297,6 @@ private:
TLog Log;
std::shared_ptr<TPersQueueClient::TImpl> Client;
std::shared_ptr<TGRpcConnectionsImpl> Connections;
- typename IErrorHandler<true>::TPtr ErrorHandler;
TDbDriverStatePtr DbDriverState;
TAdaptiveLock Lock;
std::shared_ptr<TReadSessionEventsQueue<true>> EventsQueue;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index b2d24a177c..53c0561f60 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -48,22 +48,6 @@ void MakeCountersNotNull(TReaderCounters& counters);
template <typename TReaderCounters>
bool HasNullCounters(TReaderCounters& counters);
-template <bool UseMigrationProtocol>
-class TErrorHandler : public IErrorHandler<UseMigrationProtocol> {
- using TReadSession = typename std::conditional_t<UseMigrationProtocol,
- NPersQueue::TReadSession,
- NTopic::TReadSession>;
-public:
- TErrorHandler(std::weak_ptr<TReadSession> session)
- : Session(std::move(session))
- {
- }
-
- void AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) override;
-
-private:
- std::weak_ptr<TReadSession> Session;
-};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TPartitionStreamImpl
@@ -234,7 +218,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Start() {
Settings.DecompressionExecutor_->Start();
Settings.EventHandlers_.HandlersExecutor_->Start();
if (!Reconnect(TPlainStatus())) {
- ErrorHandler->AbortSession(EStatus::ABORTED, "Driver is stopping");
+ AbortSession(EStatus::ABORTED, "Driver is stopping");
}
}
@@ -355,7 +339,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReco
Processor = nullptr;
RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again.
- deferred.DeferReconnection(this->shared_from_this(), ErrorHandler, std::move(status));
+ deferred.DeferReconnection(this->shared_from_this(), std::move(status));
}
template<bool UseMigrationProtocol>
@@ -380,7 +364,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnectTimeout(const
TStringBuilder description;
description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone;
if (!Reconnect(TPlainStatus(EStatus::TIMEOUT, description))) {
- ErrorHandler->AbortSession(EStatus::TIMEOUT, description);
+ AbortSession(EStatus::TIMEOUT, description);
}
}
@@ -414,7 +398,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnect(
if (!st.Ok()) {
++*Settings.Counters_->Errors;
if (!Reconnect(st)) {
- ErrorHandler->AbortSession(
+ AbortSession(
st.Status, MakeIssueWithSubIssues(TStringBuilder() << "Failed to establish connection to server \""
<< st.Endpoint << "\" ( cluster " << ClusterName
<< "). Attempts done: " << ConnectionAttemptsDone,
@@ -629,16 +613,20 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
CookieMapping.RemoveMapping(GetPartitionStreamId(partitionStream));
PartitionStreams.erase(partitionStream->GetAssignId());
+ bool pushRes = true;
if constexpr (UseMigrationProtocol) {
- EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser),
deferred);
} else {
- EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser),
deferred);
}
-
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
TClientMessage<UseMigrationProtocol> req;
if constexpr (UseMigrationProtocol) {
@@ -890,7 +878,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnReadDone(NGrpc::TGrp
++*Settings.Counters_->Errors;
if (!Reconnect(errorStatus)) {
- ErrorHandler->AbortSession(std::move(errorStatus));
+ AbortSession(std::move(errorStatus));
}
}
}
@@ -1010,8 +998,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
NextPartitionStreamId, msg.topic().path(), msg.cluster(),
msg.partition() + 1, // Group.
msg.partition(), // Partition.
- msg.assign_id(), msg.read_offset(), weak_from_this(),
- ErrorHandler);
+ msg.assign_id(), msg.read_offset(), weak_from_this());
NextPartitionStreamId += PartitionStreamIdStep;
// Renew partition stream.
@@ -1019,19 +1006,27 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
PartitionStreams[partitionStream->GetAssignId()];
if (currentPartitionStream) {
CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId());
- EventsQueue->PushEvent(
+ bool pushRes = EventsQueue->PushEvent(
currentPartitionStream, weak_from_this(),
TReadSessionEvent::TPartitionStreamClosedEvent(
currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
currentPartitionStream = partitionStream;
// Send event to user.
- EventsQueue->PushEvent(
+ bool pushRes = EventsQueue->PushEvent(
partitionStream, weak_from_this(),
TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset()),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
template <>
@@ -1046,19 +1041,26 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
return;
}
TIntrusivePtr<TPartitionStreamImpl<true>> partitionStream = partitionStreamIt->second;
+ bool pushRes = true;
if (msg.forceful_release()) {
PartitionStreams.erase(msg.assign_id());
CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId());
- EventsQueue->PushEvent(partitionStream, weak_from_this(),
+ pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(),
TReadSessionEvent::TPartitionStreamClosedEvent(
partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost),
deferred);
} else {
- EventsQueue->PushEvent(
+ pushRes = EventsQueue->PushEvent(
partitionStream, weak_from_this(),
TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset()),
deferred);
}
+
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
+
}
template <>
@@ -1079,10 +1081,14 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
}
}
for (auto& [id, partitionStream] : partitionStreams) {
- EventsQueue->PushEvent(
+ bool pushRes = EventsQueue->PushEvent(
partitionStream, weak_from_this(),
TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset()),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
for (const auto& rangeProto : msg.offset_ranges()) {
@@ -1090,10 +1096,14 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
if (partitionStreamIt != PartitionStreams.end()) {
auto partitionStream = partitionStreamIt->second;
partitionStream->UpdateMaxCommittedOffset(rangeProto.end_offset());
- EventsQueue->PushEvent(
+ bool pushRes = EventsQueue->PushEvent(
partitionStream, weak_from_this(),
TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset()),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
}
}
@@ -1109,12 +1119,16 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
if (partitionStreamIt == PartitionStreams.end()) {
return;
}
- EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(),
TReadSessionEvent::TPartitionStreamStatusEvent(
partitionStreamIt->second, msg.committed_offset(),
0, // TODO: support read offset in status
msg.end_offset(), TInstant::MilliSeconds(msg.write_watermark_ms())),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
//////////////
@@ -1227,25 +1241,33 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>(
NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(),
msg.partition_session().partition_session_id(), msg.committed_offset(),
- weak_from_this(), ErrorHandler);
+ weak_from_this());
NextPartitionStreamId += PartitionStreamIdStep;
// Renew partition stream.
TIntrusivePtr<TPartitionStreamImpl<false>>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()];
if (currentPartitionStream) {
- EventsQueue->PushEvent(
+ bool pushRes = EventsQueue->PushEvent(
currentPartitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent(
currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
currentPartitionStream = partitionStream;
// Send event to user.
- EventsQueue->PushEvent(partitionStream, weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TStartPartitionSessionEvent(
partitionStream, msg.committed_offset(), msg.partition_offsets().end()),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
template <>
@@ -1260,18 +1282,23 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
return;
}
TIntrusivePtr<TPartitionStreamImpl<false>> partitionStream = partitionStreamIt->second;
+ bool pushRes = true;
if (!msg.graceful()) {
PartitionStreams.erase(msg.partition_session_id());
- EventsQueue->PushEvent(partitionStream, weak_from_this(),
+ pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent(
partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
deferred);
} else {
- EventsQueue->PushEvent(
+ pushRes = EventsQueue->PushEvent(
partitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset()),
deferred);
}
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
template <>
@@ -1288,10 +1315,14 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
if (partitionStreamIt != PartitionStreams.end()) {
auto partitionStream = partitionStreamIt->second;
partitionStream->UpdateMaxCommittedOffset(rangeProto.committed_offset());
- EventsQueue->PushEvent(partitionStream, weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent(
partitionStream, rangeProto.committed_offset()),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
}
}
@@ -1307,7 +1338,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
if (partitionStreamIt == PartitionStreams.end()) {
return;
}
- EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(),
NTopic::TReadSessionEvent::TPartitionSessionStatusEvent(
partitionStreamIt->second, msg.committed_offset(),
0, // TODO: support read offset in status
@@ -1315,6 +1346,10 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(
msg.write_time_high_watermark()))),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
template <>
@@ -1368,9 +1403,13 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DestroyAllPartitionStr
>;
for (auto&& [key, partitionStream] : PartitionStreams) {
- EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost),
deferred);
+ if (!pushRes) {
+ AbortImpl();
+ return;
+ }
}
PartitionStreams.clear();
CookieMapping.ClearMapping();
@@ -1449,6 +1488,15 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Abort() {
}
template<bool UseMigrationProtocol>
+void TSingleClusterReadSessionImpl<UseMigrationProtocol>::AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) {
+ TDeferredActions<UseMigrationProtocol> deferred;
+ LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing session to cluster: " << closeEvent.DebugString());
+
+ EventsQueue->Close(closeEvent, deferred);
+}
+
+
+template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::AbortImpl() {
Y_VERIFY(Lock.IsLocked());
@@ -1732,16 +1780,16 @@ TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue(
}
template <bool UseMigrationProtocol>
-void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
+bool TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> /*session*/,
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
TDeferredActions<UseMigrationProtocol>& deferred)
{
- if (TParent::Closed) {
- return;
- }
-
with_lock (TParent::Mutex) {
+ if (TParent::Closed) {
+ return false;
+ }
+ //TODO: check session closed event and return false
using TClosedEvent = std::conditional_t<
UseMigrationProtocol,
NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
@@ -1757,6 +1805,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPar
SignalReadyEventsImpl(stream, deferred);
}
+ return true;
}
template <bool UseMigrationProtocol>
@@ -1788,19 +1837,19 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl(
}
template <bool UseMigrationProtocol>
-void TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+bool TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
size_t batch,
size_t message,
TDataDecompressionInfoPtr<UseMigrationProtocol> parent,
std::atomic<bool>& ready)
{
- if (this->Closed) {
- return;
- }
-
with_lock (TParent::Mutex) {
+ if (this->Closed) {
+ return false;
+ }
partitionStream->InsertDataEvent(batch, message, parent, ready);
}
+ return true;
}
template <bool UseMigrationProtocol>
@@ -1990,6 +2039,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvent(bool block, size_t maxBy
if (TParent::HasEventsImpl()) {
eventInfo = GetEventImpl(maxByteSize, accumulator);
}
+
} while (block && !eventInfo);
}
@@ -2241,11 +2291,15 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double
task.Add(CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, size, estimatedDecompressedSize);
- session->GetEventsQueue()->PushDataEvent(partitionStream,
+ bool pushRes = session->GetEventsQueue()->PushDataEvent(partitionStream,
CurrentDecompressingMessage.first,
CurrentDecompressingMessage.second,
TDataDecompressionInfo::shared_from_this(),
ReadyThresholds.back().Ready);
+ if (!pushRes) {
+ session->AbortImpl();
+ return;
+ }
}
++CurrentDecompressingMessage.second;
@@ -2528,32 +2582,31 @@ void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask(const typena
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) {
- ErrorHandler = errorHandler;
+void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) {
+ Session = session;
SessionClosedEvent.ConstructInPlace(std::move(closeEvent));
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, NYql::TIssues&& issues) {
- DeferAbortSession(errorHandler, TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues)));
+void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, NYql::TIssues&& issues) {
+ DeferAbortSession(session, TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues)));
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, const TString& message) {
+void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, const TString& message) {
NYql::TIssues issues;
issues.AddIssue(message);
- DeferAbortSession(errorHandler, statusCode, std::move(issues));
+ DeferAbortSession(session, statusCode, std::move(issues));
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status) {
- DeferAbortSession(errorHandler, TASessionClosedEvent<UseMigrationProtocol>(std::move(status)));
+void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status) {
+ DeferAbortSession(session, TASessionClosedEvent<UseMigrationProtocol>(std::move(status)));
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status) {
+void TDeferredActions<UseMigrationProtocol>::DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status) {
Session = std::move(session);
- ErrorHandler = errorHandler;
ReconnectionStatus = std::move(status);
}
@@ -2609,17 +2662,16 @@ void TDeferredActions<UseMigrationProtocol>::StartExecutorTasks() {
template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::AbortSession() {
if (SessionClosedEvent) {
- Y_ASSERT(ErrorHandler);
- ErrorHandler->AbortSession(std::move(*SessionClosedEvent));
+ Y_VERIFY(Session);
+ Session->AbortSession(std::move(*SessionClosedEvent));
}
}
template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::Reconnect() {
if (Session) {
- Y_ASSERT(ErrorHandler);
if (!Session->Reconnect(ReconnectionStatus)) {
- ErrorHandler->AbortSession(std::move(ReconnectionStatus));
+ Session->AbortSession(std::move(ReconnectionStatus));
}
}
}
@@ -2631,13 +2683,6 @@ void TDeferredActions<UseMigrationProtocol>::SignalWaiters() {
}
}
-template<bool UseMigrationProtocol>
-void TErrorHandler<UseMigrationProtocol>::AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) {
- if (auto session = Session.lock()) {
- session->Abort(std::move(closeEvent));
- }
-}
-
#define HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100})
template <typename TReaderCounters>
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 2a71b05897..84bce37637 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -450,9 +450,6 @@ public:
using IReadSessionConnectionProcessorFactory = ISessionConnectionProcessorFactory<Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
using TMockProcessorFactory = ::TMockProcessorFactory<Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
- struct TMockErrorHandler : public IErrorHandler<true> {
- MOCK_METHOD(void, AbortSession, (TSessionClosedEvent&& closeEvent), (override));
- };
struct TFakeContext : public NGrpc::IQueueClientContext {
IQueueClientContextPtr CreateContext() override {
@@ -499,7 +496,6 @@ public:
TString ClusterName = "cluster";
TLog Log = CreateLogBackend("cerr");
std::shared_ptr<TReadSessionEventsQueue<true>> EventsQueue;
- TIntrusivePtr<testing::StrictMock<TMockErrorHandler>> MockErrorHandler = MakeIntrusive<testing::StrictMock<TMockErrorHandler>>();
std::shared_ptr<TFakeContext> FakeContext = std::make_shared<TFakeContext>();
std::shared_ptr<TMockProcessorFactory> MockProcessorFactory = std::make_shared<TMockProcessorFactory>();
TIntrusivePtr<TMockReadSessionProcessor> MockProcessor = MakeIntrusive<TMockReadSessionProcessor>();
@@ -585,7 +581,6 @@ TReadSessionImplTestSetup::TReadSessionImplTestSetup() {
Mock::AllowLeak(MockProcessor.Get());
Mock::AllowLeak(MockProcessorFactory.get());
- Mock::AllowLeak(MockErrorHandler.Get());
}
TReadSessionImplTestSetup::~TReadSessionImplTestSetup() noexcept(false) {
@@ -625,7 +620,6 @@ TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() {
Log,
MockProcessorFactory,
GetEventsQueue(),
- MockErrorHandler,
FakeContext,
PartitionIdStart, PartitionIdStep);
}
@@ -1013,12 +1007,14 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
setup.MockProcessorFactory->FailCreation();
});
- EXPECT_CALL(*setup.MockErrorHandler, AbortSession(_));
+ //EXPECT_CALL(*setup.MockErrorHandler, AbortSession(_));
setup.GetSession()->Start();
setup.MockProcessorFactory->Wait();
+ UNIT_ASSERT(setup.GetEventsQueue()->IsClosed());
}
+
Y_UNIT_TEST(StopsRetryAfterFailedAttempt) {
StopsRetryAfterFailedAttemptImpl(false);
}
@@ -1882,8 +1878,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
1ull,
1ull,
0ull,
- session,
- nullptr);
+ session);
TPartitionData<true> message;
Ydb::PersQueue::V1::MigrationStreamingReadServerMessage_DataBatch_Batch* batch =
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
index f2b7545b9c..8868748dcc 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
@@ -43,7 +43,6 @@ TReadSession::~TReadSession() {
}
void TReadSession::Start() {
- ErrorHandler = MakeIntrusive<NPersQueue::TErrorHandler<false>>(weak_from_this());
Tracker = std::make_shared<NPersQueue::TImplTracker>();
EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings, weak_from_this(), Tracker);
@@ -85,7 +84,6 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>
Log,
Client->CreateReadSessionConnectionProcessorFactory(),
EventsQueue,
- ErrorHandler,
context,
1, 1,
Tracker);
@@ -120,11 +118,19 @@ NThreading::TFuture<void> TReadSession::WaitEvent() {
}
TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) {
- return EventsQueue->GetEvents(block, maxEventsCount, maxByteSize);
+ auto res = EventsQueue->GetEvents(block, maxEventsCount, maxByteSize);
+ if (EventsQueue->IsClosed()) {
+ Abort(EStatus::ABORTED, "Aborted");
+ }
+ return res;
}
TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxByteSize) {
- return EventsQueue->GetEvent(block, maxByteSize);
+ auto res = EventsQueue->GetEvent(block, maxByteSize);
+ if (EventsQueue->IsClosed()) {
+ Abort(EStatus::ABORTED, "Aborted");
+ }
+ return res;
}
bool TReadSession::Close(TDuration timeout) {
@@ -300,21 +306,27 @@ void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) {
}
}
-void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred) {
+
+void TReadSession::AbortImpl(NPersQueue::TDeferredActions<false>&) {
Y_VERIFY(Lock.IsLocked());
if (!Aborting) {
Aborting = true;
- LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
if (DumpCountersContext) {
DumpCountersContext->Cancel();
DumpCountersContext.reset();
}
Session->Abort();
- EventsQueue->Close(std::move(closeEvent), deferred);
}
}
+void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred) {
+ LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
+
+ EventsQueue->Close(std::move(closeEvent), deferred);
+ AbortImpl(deferred);
+}
+
void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, NPersQueue::TDeferredActions<false>& deferred) {
Y_VERIFY(Lock.IsLocked());
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
index f58fa2786e..e49bc9bdd6 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
@@ -107,6 +107,7 @@ private:
void Abort(EStatus statusCode, NYql::TIssues&& issues);
void Abort(EStatus statusCode, const TString& message);
+ void AbortImpl(NPersQueue::TDeferredActions<false>& deferred);
void AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred);
void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, NPersQueue::TDeferredActions<false>& deferred);
void AbortImpl(EStatus statusCode, const TString& message, NPersQueue::TDeferredActions<false>& deferred);
@@ -118,7 +119,6 @@ private:
TLog Log;
std::shared_ptr<TTopicClient::TImpl> Client;
std::shared_ptr<TGRpcConnectionsImpl> Connections;
- NPersQueue::IErrorHandler<false>::TPtr ErrorHandler;
TDbDriverStatePtr DbDriverState;
TAdaptiveLock Lock;
std::shared_ptr<NPersQueue::TImplTracker> Tracker;