diff options
author | alexnick <alexnick@ydb.tech> | 2023-05-03 12:58:34 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2023-05-03 12:58:34 +0300 |
commit | 6d2b034c88f51bebae7f6dff359e503174d39ffd (patch) | |
tree | 22c1f12d725a5f304678bef0cd46bdb2710da4f0 | |
parent | d61cfa00fcd6078cabd02c099e257ec215c496c5 (diff) | |
download | ydb-6d2b034c88f51bebae7f6dff359e503174d39ffd.tar.gz |
fix for deadlock
fix for deadlock
8 files changed, 226 insertions, 152 deletions
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp index 3798fea118..739cfdd161 100644 --- a/ydb/core/persqueue/mirrorer.cpp +++ b/ydb/core/persqueue/mirrorer.cpp @@ -89,6 +89,8 @@ void TMirrorer::StartInit(const TActorContext& ctx) { void TMirrorer::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " killed"); + if (ReadSession) + ReadSession->Close(); ReadSession = nullptr; PartitionStream = nullptr; CredentialsProvider = nullptr; @@ -441,7 +443,9 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont Queue.pop_back(); } } - + if (ReadSession) { + ReadSession->Close(); + } ReadSession.reset(); PartitionStream.Reset(); @@ -459,6 +463,9 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont }); try { + if (ReadSession) { + ReadSession->Close(); + } ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT, log); } catch(...) { ProcessError(ctx, TStringBuilder() << "got an exception during the creation read session: " << CurrentExceptionMessage()); @@ -514,6 +521,8 @@ void TMirrorer::AddMessagesToQueue(TVector<TPersQueueReadEvent::TDataReceivedEve void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) { LastInitStageTimestamp = ctx.Now(); + if (ReadSession) + ReadSession->Close(); ReadSession = nullptr; PartitionStream = nullptr; ReadFuturesInFlight = 0; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h index 7e2c26b66c..21179740da 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h @@ -407,6 +407,10 @@ public: } } + bool IsClosed() { + return Closed; + } + protected: const TSettings& Settings; TWaiter Waiter; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 5b3abb3b48..85c6e87589 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -57,7 +57,14 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, } TReadSession::~TReadSession() { - Abort(EStatus::ABORTED, "Aborted"); + { + TDeferredActions<true> deferred; + NYql::TIssues issues; + issues.AddIssue("Aborted"); + EventsQueue->Close(TSessionClosedEvent(EStatus::ABORTED, std::move(issues)), deferred); + } + + Abort(); ClearAllEvents(); if (Tracker) { @@ -76,7 +83,6 @@ Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClus } void TReadSession::Start() { - ErrorHandler = MakeIntrusive<TErrorHandler<true>>(weak_from_this()); Tracker = std::make_shared<TImplTracker>(); EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, weak_from_this(), Tracker); @@ -107,7 +113,11 @@ bool TReadSession::ValidateSettings() { } if (issues) { - Abort(EStatus::BAD_REQUEST, MakeIssueWithSubIssues("Invalid read session settings", issues)); + { + TDeferredActions<true> deferred; + EventsQueue->Close(TSessionClosedEvent(EStatus::BAD_REQUEST, MakeIssueWithSubIssues("Invalid read session settings", issues)), deferred); + } + Abort(); return false; } else { return true; @@ -124,7 +134,7 @@ void TReadSession::StartClusterDiscovery() { ClusterDiscoveryDelayContext = nullptr; } - auto extractor = [errorHandler = ErrorHandler, self = weak_from_this()] + auto extractor = [self = weak_from_this()] (google::protobuf::Any* any, TPlainStatus status) mutable { auto selfShared = self.lock(); if (!selfShared) { @@ -203,7 +213,6 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) { Log, subclient->CreateReadSessionConnectionProcessorFactory(), EventsQueue, - ErrorHandler, context, partitionStreamIdStart++, clusterSessionsCount, @@ -417,12 +426,13 @@ bool TReadSession::Close(TDuration timeout) { return result; } -void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<true>& deferred) { +void TReadSession::AbortImpl(TDeferredActions<true>&) { + Y_VERIFY(Lock.IsLocked()); if (!Aborting) { + Y_VERIFY(EventsQueue->IsClosed()); Aborting = true; - LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); if (ClusterDiscoveryDelayContext) { ClusterDiscoveryDelayContext->Cancel(); ClusterDiscoveryDelayContext.reset(); @@ -436,14 +446,16 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions< sessionInfo.Session->Abort(); } } - EventsQueue->Close(std::move(closeEvent), deferred); } } void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred) { Y_VERIFY(Lock.IsLocked()); + auto closeEvent = TSessionClosedEvent(statusCode, std::move(issues)); + LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); - AbortImpl(TSessionClosedEvent(statusCode, std::move(issues)), deferred); + EventsQueue->Close(std::move(closeEvent), deferred); + AbortImpl(deferred); } void TReadSession::AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred) { @@ -451,23 +463,14 @@ void TReadSession::AbortImpl(EStatus statusCode, const TString& message, TDeferr NYql::TIssues issues; issues.AddIssue(message); - AbortImpl(statusCode, std::move(issues), deferred); -} -void TReadSession::Abort(EStatus statusCode, NYql::TIssues&& issues) { - Abort(TSessionClosedEvent(statusCode, std::move(issues))); -} - -void TReadSession::Abort(EStatus statusCode, const TString& message) { - NYql::TIssues issues; - issues.AddIssue(message); - Abort(statusCode, std::move(issues)); + AbortImpl(statusCode, std::move(issues), deferred); } -void TReadSession::Abort(TSessionClosedEvent&& closeEvent) { +void TReadSession::Abort() { TDeferredActions<true> deferred; with_lock (Lock) { - AbortImpl(std::move(closeEvent), deferred); + AbortImpl(EStatus::ABORTED, "Aborted", deferred); } } @@ -480,11 +483,19 @@ NThreading::TFuture<void> TReadSession::WaitEvent() { } TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) { - return EventsQueue->GetEvents(block, maxEventsCount, maxByteSize); + auto res = EventsQueue->GetEvents(block, maxEventsCount, maxByteSize); + if (EventsQueue->IsClosed()) { + Abort(); + } + return res; } TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxByteSize) { - return EventsQueue->GetEvent(block, maxByteSize); + auto res = EventsQueue->GetEvent(block, maxByteSize); + if (EventsQueue->IsClosed()) { + Abort(); + } + return res; } void TReadSession::StopReadingData() { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index d3fa055a6c..c4acb6fcab 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -149,11 +149,11 @@ public: void DeferReadFromProcessor(const typename IProcessor<UseMigrationProtocol>::TPtr& processor, TServerMessage<UseMigrationProtocol>* dst, typename IProcessor<UseMigrationProtocol>::TReadCallback callback); void DeferStartExecutorTask(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction task); - void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent); - void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, NYql::TIssues&& issues); - void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, const TString& message); - void DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status); - void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status); + void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent); + void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, NYql::TIssues&& issues); + void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, const TString& message); + void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status); + void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status); void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session); void DeferSignalWaiter(TWaiter&& waiter); void DeferDestroyDecompressionInfos(std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>>&& infos); @@ -178,13 +178,12 @@ private: std::vector<std::pair<typename IAExecutor<UseMigrationProtocol>::TPtr, typename IAExecutor<UseMigrationProtocol>::TFunction>> ExecutorsTasks; // Abort session. - typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; TMaybe<TASessionClosedEvent<UseMigrationProtocol>> SessionClosedEvent; // Waiters. std::vector<TWaiter> Waiters; - // Reconnection. + // Reconnection and abort. std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; TPlainStatus ReconnectionStatus; @@ -569,13 +568,11 @@ public: ui64 partitionId, ui64 assignId, ui64 readOffset, - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession, - typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler) + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession) : Key{topicPath, cluster, partitionId} , AssignId(assignId) , FirstNotReadOffset(readOffset) , Session(std::move(parentSession)) - , ErrorHandler(std::move(errorHandler)) { TAPartitionStream<true>::PartitionStreamId = partitionStreamId; TAPartitionStream<true>::TopicPath = std::move(topicPath); @@ -591,13 +588,11 @@ public: i64 partitionId, i64 assignId, i64 readOffset, - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession, - typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler) + std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession) : Key{topicPath, "", static_cast<ui64>(partitionId)} , AssignId(static_cast<ui64>(assignId)) , FirstNotReadOffset(static_cast<ui64>(readOffset)) , Session(std::move(parentSession)) - , ErrorHandler(std::move(errorHandler)) { TAPartitionStream<false>::PartitionSessionId = partitionStreamId; TAPartitionStream<false>::TopicPath = std::move(topicPath); @@ -667,9 +662,6 @@ public: TReadSessionEventsQueue<UseMigrationProtocol>* queue, TDeferredActions<UseMigrationProtocol>& deferred); - const typename IErrorHandler<UseMigrationProtocol>::TPtr& GetErrorHandler() const { - return ErrorHandler; - } ui64 GetMaxReadOffset() const { return MaxReadOffset; @@ -740,7 +732,6 @@ private: ui64 AssignId; ui64 FirstNotReadOffset; std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; - typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; TRawPartitionStreamEventQueue<UseMigrationProtocol> EventsQueue; ui64 MaxReadOffset = 0; ui64 MaxCommittedOffset = 0; @@ -779,9 +770,11 @@ public: GetEvent(bool block = false, size_t maxByteSize = std::numeric_limits<size_t>::max()); - void Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) { + bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) { TWaiter waiter; with_lock (TParent::Mutex) { + if (TParent::Closed) + return false; TParent::CloseEvent = event; TParent::Closed = true; waiter = TWaiter(TParent::Waiter.ExtractPromise(), this); @@ -789,8 +782,8 @@ public: TReadSessionEventInfo<UseMigrationProtocol> info(event); ApplyHandler(info, deferred); - - waiter.Signal(); + deferred.DeferSignalWaiter(std::move(waiter)); + return true; } bool TryApplyCallbackToEventImpl(typename TParent::TEvent& event, @@ -802,14 +795,14 @@ public: void GetDataEventCallbackSettings(size_t& maxMessagesBytes); - // Push usual event. - void PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + // Push usual event. Returns false if queue is closed + bool PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event, TDeferredActions<UseMigrationProtocol>& deferred); - // Push data event. - void PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, + // Push data event. Returns false if queue is closed + bool PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, size_t batch, size_t message, TDataDecompressionInfoPtr<UseMigrationProtocol> parent, @@ -946,7 +939,6 @@ public: const TLog& log, std::shared_ptr<IReadSessionConnectionProcessorFactory<UseMigrationProtocol>> connectionFactory, std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> eventsQueue, - typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler, NGrpc::IQueueClientContextPtr clientContext, ui64 partitionStreamIdStart, ui64 partitionStreamIdStep, @@ -961,9 +953,8 @@ public: , PartitionStreamIdStep(partitionStreamIdStep) , ConnectionFactory(std::move(connectionFactory)) , EventsQueue(std::move(eventsQueue)) - , ErrorHandler(std::move(errorHandler)) , ClientContext(std::move(clientContext)) - , CookieMapping(ErrorHandler) + , CookieMapping() , ReadSizeBudget(GetCompressedDataSizeLimit()) , ReadSizeServerDelta(0) , Tracker(std::move(tracker)) @@ -990,6 +981,21 @@ public: void Abort(); void AbortImpl(); void Close(std::function<void()> callback); + void AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent); + + void AbortSession(EStatus statusCode, NYql::TIssues&& issues) { + AbortSession(TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues))); + } + + void AbortSession(EStatus statusCode, const TString& message) { + NYql::TIssues issues; + issues.AddIssue(message); + AbortSession(statusCode, std::move(issues)); + } + + void AbortSession(TPlainStatus&& status) { + AbortSession(TASessionClosedEvent<UseMigrationProtocol>(std::move(status))); + } bool Reconnect(const TPlainStatus& status); @@ -1106,8 +1112,7 @@ private: size_t UncommittedMessagesLeft = 0; }; - explicit TPartitionCookieMapping(typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler) - : ErrorHandler(std::move(errorHandler)) + explicit TPartitionCookieMapping() { } @@ -1132,7 +1137,6 @@ private: THashMap<typename TCookie::TKey, typename TCookie::TPtr, typename TCookie::TKey::THash> Cookies; THashMap<std::pair<ui64, ui64>, typename TCookie::TPtr> UncommittedOffsetToCookie; // (Partition stream id, Offset) -> Cookie. THashMultiMap<ui64, typename TCookie::TPtr> PartitionStreamIdToCookie; - typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; size_t CommitInflight = 0; // Commit inflight to server. }; @@ -1158,7 +1162,6 @@ private: ui64 PartitionStreamIdStep; std::shared_ptr<IReadSessionConnectionProcessorFactory<UseMigrationProtocol>> ConnectionFactory; std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> EventsQueue; - typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; NGrpc::IQueueClientContextPtr ClientContext; // Common client context. NGrpc::IQueueClientContextPtr ConnectContext; NGrpc::IQueueClientContextPtr ConnectTimeoutContext; @@ -1258,7 +1261,7 @@ public: void StopReadingData() override; void ResumeReadingData() override; - void Abort(TSessionClosedEvent&& closeEvent); + void Abort(); void ClearAllEvents(); @@ -1276,11 +1279,7 @@ private: void RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions<true>& deferred); void CreateClusterSessionsImpl(TDeferredActions<true>& deferred); - - // Shutdown. - void Abort(EStatus statusCode, NYql::TIssues&& issues); - void Abort(EStatus statusCode, const TString& message); - + void AbortImpl(TDeferredActions<true>& deferred); void AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<true>& deferred); void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred); void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred); @@ -1298,7 +1297,6 @@ private: TLog Log; std::shared_ptr<TPersQueueClient::TImpl> Client; std::shared_ptr<TGRpcConnectionsImpl> Connections; - typename IErrorHandler<true>::TPtr ErrorHandler; TDbDriverStatePtr DbDriverState; TAdaptiveLock Lock; std::shared_ptr<TReadSessionEventsQueue<true>> EventsQueue; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index b2d24a177c..53c0561f60 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -48,22 +48,6 @@ void MakeCountersNotNull(TReaderCounters& counters); template <typename TReaderCounters> bool HasNullCounters(TReaderCounters& counters); -template <bool UseMigrationProtocol> -class TErrorHandler : public IErrorHandler<UseMigrationProtocol> { - using TReadSession = typename std::conditional_t<UseMigrationProtocol, - NPersQueue::TReadSession, - NTopic::TReadSession>; -public: - TErrorHandler(std::weak_ptr<TReadSession> session) - : Session(std::move(session)) - { - } - - void AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) override; - -private: - std::weak_ptr<TReadSession> Session; -}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TPartitionStreamImpl @@ -234,7 +218,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Start() { Settings.DecompressionExecutor_->Start(); Settings.EventHandlers_.HandlersExecutor_->Start(); if (!Reconnect(TPlainStatus())) { - ErrorHandler->AbortSession(EStatus::ABORTED, "Driver is stopping"); + AbortSession(EStatus::ABORTED, "Driver is stopping"); } } @@ -355,7 +339,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReco Processor = nullptr; RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again. - deferred.DeferReconnection(this->shared_from_this(), ErrorHandler, std::move(status)); + deferred.DeferReconnection(this->shared_from_this(), std::move(status)); } template<bool UseMigrationProtocol> @@ -380,7 +364,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnectTimeout(const TStringBuilder description; description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; if (!Reconnect(TPlainStatus(EStatus::TIMEOUT, description))) { - ErrorHandler->AbortSession(EStatus::TIMEOUT, description); + AbortSession(EStatus::TIMEOUT, description); } } @@ -414,7 +398,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnect( if (!st.Ok()) { ++*Settings.Counters_->Errors; if (!Reconnect(st)) { - ErrorHandler->AbortSession( + AbortSession( st.Status, MakeIssueWithSubIssues(TStringBuilder() << "Failed to establish connection to server \"" << st.Endpoint << "\" ( cluster " << ClusterName << "). Attempts done: " << ConnectionAttemptsDone, @@ -629,16 +613,20 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream CookieMapping.RemoveMapping(GetPartitionStreamId(partitionStream)); PartitionStreams.erase(partitionStream->GetAssignId()); + bool pushRes = true; if constexpr (UseMigrationProtocol) { - EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser), deferred); } else { - EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser), deferred); } - + if (!pushRes) { + AbortImpl(); + return; + } TClientMessage<UseMigrationProtocol> req; if constexpr (UseMigrationProtocol) { @@ -890,7 +878,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnReadDone(NGrpc::TGrp ++*Settings.Counters_->Errors; if (!Reconnect(errorStatus)) { - ErrorHandler->AbortSession(std::move(errorStatus)); + AbortSession(std::move(errorStatus)); } } } @@ -1010,8 +998,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( NextPartitionStreamId, msg.topic().path(), msg.cluster(), msg.partition() + 1, // Group. msg.partition(), // Partition. - msg.assign_id(), msg.read_offset(), weak_from_this(), - ErrorHandler); + msg.assign_id(), msg.read_offset(), weak_from_this()); NextPartitionStreamId += PartitionStreamIdStep; // Renew partition stream. @@ -1019,19 +1006,27 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( PartitionStreams[partitionStream->GetAssignId()]; if (currentPartitionStream) { CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId()); - EventsQueue->PushEvent( + bool pushRes = EventsQueue->PushEvent( currentPartitionStream, weak_from_this(), TReadSessionEvent::TPartitionStreamClosedEvent( currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } currentPartitionStream = partitionStream; // Send event to user. - EventsQueue->PushEvent( + bool pushRes = EventsQueue->PushEvent( partitionStream, weak_from_this(), TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset()), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } template <> @@ -1046,19 +1041,26 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( return; } TIntrusivePtr<TPartitionStreamImpl<true>> partitionStream = partitionStreamIt->second; + bool pushRes = true; if (msg.forceful_release()) { PartitionStreams.erase(msg.assign_id()); CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId()); - EventsQueue->PushEvent(partitionStream, weak_from_this(), + pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(), TReadSessionEvent::TPartitionStreamClosedEvent( partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost), deferred); } else { - EventsQueue->PushEvent( + pushRes = EventsQueue->PushEvent( partitionStream, weak_from_this(), TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset()), deferred); } + + if (!pushRes) { + AbortImpl(); + return; + } + } template <> @@ -1079,10 +1081,14 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( } } for (auto& [id, partitionStream] : partitionStreams) { - EventsQueue->PushEvent( + bool pushRes = EventsQueue->PushEvent( partitionStream, weak_from_this(), TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset()), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } for (const auto& rangeProto : msg.offset_ranges()) { @@ -1090,10 +1096,14 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( if (partitionStreamIt != PartitionStreams.end()) { auto partitionStream = partitionStreamIt->second; partitionStream->UpdateMaxCommittedOffset(rangeProto.end_offset()); - EventsQueue->PushEvent( + bool pushRes = EventsQueue->PushEvent( partitionStream, weak_from_this(), TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset()), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } } } @@ -1109,12 +1119,16 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( if (partitionStreamIt == PartitionStreams.end()) { return; } - EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(), TReadSessionEvent::TPartitionStreamStatusEvent( partitionStreamIt->second, msg.committed_offset(), 0, // TODO: support read offset in status msg.end_offset(), TInstant::MilliSeconds(msg.write_watermark_ms())), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } ////////////// @@ -1227,25 +1241,33 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>( NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(), msg.partition_session().partition_session_id(), msg.committed_offset(), - weak_from_this(), ErrorHandler); + weak_from_this()); NextPartitionStreamId += PartitionStreamIdStep; // Renew partition stream. TIntrusivePtr<TPartitionStreamImpl<false>>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()]; if (currentPartitionStream) { - EventsQueue->PushEvent( + bool pushRes = EventsQueue->PushEvent( currentPartitionStream, weak_from_this(), NTopic::TReadSessionEvent::TPartitionSessionClosedEvent( currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } currentPartitionStream = partitionStream; // Send event to user. - EventsQueue->PushEvent(partitionStream, weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(), NTopic::TReadSessionEvent::TStartPartitionSessionEvent( partitionStream, msg.committed_offset(), msg.partition_offsets().end()), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } template <> @@ -1260,18 +1282,23 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( return; } TIntrusivePtr<TPartitionStreamImpl<false>> partitionStream = partitionStreamIt->second; + bool pushRes = true; if (!msg.graceful()) { PartitionStreams.erase(msg.partition_session_id()); - EventsQueue->PushEvent(partitionStream, weak_from_this(), + pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(), NTopic::TReadSessionEvent::TPartitionSessionClosedEvent( partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost), deferred); } else { - EventsQueue->PushEvent( + pushRes = EventsQueue->PushEvent( partitionStream, weak_from_this(), NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset()), deferred); } + if (!pushRes) { + AbortImpl(); + return; + } } template <> @@ -1288,10 +1315,14 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( if (partitionStreamIt != PartitionStreams.end()) { auto partitionStream = partitionStreamIt->second; partitionStream->UpdateMaxCommittedOffset(rangeProto.committed_offset()); - EventsQueue->PushEvent(partitionStream, weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(), NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent( partitionStream, rangeProto.committed_offset()), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } } } @@ -1307,7 +1338,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( if (partitionStreamIt == PartitionStreams.end()) { return; } - EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(), NTopic::TReadSessionEvent::TPartitionSessionStatusEvent( partitionStreamIt->second, msg.committed_offset(), 0, // TODO: support read offset in status @@ -1315,6 +1346,10 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds( msg.write_time_high_watermark()))), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } template <> @@ -1368,9 +1403,13 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DestroyAllPartitionStr >; for (auto&& [key, partitionStream] : PartitionStreams) { - EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost), deferred); + if (!pushRes) { + AbortImpl(); + return; + } } PartitionStreams.clear(); CookieMapping.ClearMapping(); @@ -1449,6 +1488,15 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Abort() { } template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) { + TDeferredActions<UseMigrationProtocol> deferred; + LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing session to cluster: " << closeEvent.DebugString()); + + EventsQueue->Close(closeEvent, deferred); +} + + +template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::AbortImpl() { Y_VERIFY(Lock.IsLocked()); @@ -1732,16 +1780,16 @@ TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue( } template <bool UseMigrationProtocol> -void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, +bool TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> /*session*/, typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event, TDeferredActions<UseMigrationProtocol>& deferred) { - if (TParent::Closed) { - return; - } - with_lock (TParent::Mutex) { + if (TParent::Closed) { + return false; + } + //TODO: check session closed event and return false using TClosedEvent = std::conditional_t< UseMigrationProtocol, NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, @@ -1757,6 +1805,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPar SignalReadyEventsImpl(stream, deferred); } + return true; } template <bool UseMigrationProtocol> @@ -1788,19 +1837,19 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl( } template <bool UseMigrationProtocol> -void TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, +bool TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, size_t batch, size_t message, TDataDecompressionInfoPtr<UseMigrationProtocol> parent, std::atomic<bool>& ready) { - if (this->Closed) { - return; - } - with_lock (TParent::Mutex) { + if (this->Closed) { + return false; + } partitionStream->InsertDataEvent(batch, message, parent, ready); } + return true; } template <bool UseMigrationProtocol> @@ -1990,6 +2039,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvent(bool block, size_t maxBy if (TParent::HasEventsImpl()) { eventInfo = GetEventImpl(maxByteSize, accumulator); } + } while (block && !eventInfo); } @@ -2241,11 +2291,15 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double task.Add(CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, size, estimatedDecompressedSize); - session->GetEventsQueue()->PushDataEvent(partitionStream, + bool pushRes = session->GetEventsQueue()->PushDataEvent(partitionStream, CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, TDataDecompressionInfo::shared_from_this(), ReadyThresholds.back().Ready); + if (!pushRes) { + session->AbortImpl(); + return; + } } ++CurrentDecompressingMessage.second; @@ -2528,32 +2582,31 @@ void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask(const typena } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) { - ErrorHandler = errorHandler; +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) { + Session = session; SessionClosedEvent.ConstructInPlace(std::move(closeEvent)); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, NYql::TIssues&& issues) { - DeferAbortSession(errorHandler, TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues))); +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, NYql::TIssues&& issues) { + DeferAbortSession(session, TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues))); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, EStatus statusCode, const TString& message) { +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, const TString& message) { NYql::TIssues issues; issues.AddIssue(message); - DeferAbortSession(errorHandler, statusCode, std::move(issues)); + DeferAbortSession(session, statusCode, std::move(issues)); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status) { - DeferAbortSession(errorHandler, TASessionClosedEvent<UseMigrationProtocol>(std::move(status))); +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status) { + DeferAbortSession(session, TASessionClosedEvent<UseMigrationProtocol>(std::move(status))); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, const typename IErrorHandler<UseMigrationProtocol>::TPtr& errorHandler, TPlainStatus&& status) { +void TDeferredActions<UseMigrationProtocol>::DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status) { Session = std::move(session); - ErrorHandler = errorHandler; ReconnectionStatus = std::move(status); } @@ -2609,17 +2662,16 @@ void TDeferredActions<UseMigrationProtocol>::StartExecutorTasks() { template<bool UseMigrationProtocol> void TDeferredActions<UseMigrationProtocol>::AbortSession() { if (SessionClosedEvent) { - Y_ASSERT(ErrorHandler); - ErrorHandler->AbortSession(std::move(*SessionClosedEvent)); + Y_VERIFY(Session); + Session->AbortSession(std::move(*SessionClosedEvent)); } } template<bool UseMigrationProtocol> void TDeferredActions<UseMigrationProtocol>::Reconnect() { if (Session) { - Y_ASSERT(ErrorHandler); if (!Session->Reconnect(ReconnectionStatus)) { - ErrorHandler->AbortSession(std::move(ReconnectionStatus)); + Session->AbortSession(std::move(ReconnectionStatus)); } } } @@ -2631,13 +2683,6 @@ void TDeferredActions<UseMigrationProtocol>::SignalWaiters() { } } -template<bool UseMigrationProtocol> -void TErrorHandler<UseMigrationProtocol>::AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) { - if (auto session = Session.lock()) { - session->Abort(std::move(closeEvent)); - } -} - #define HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) template <typename TReaderCounters> diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 2a71b05897..84bce37637 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -450,9 +450,6 @@ public: using IReadSessionConnectionProcessorFactory = ISessionConnectionProcessorFactory<Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; using TMockProcessorFactory = ::TMockProcessorFactory<Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; - struct TMockErrorHandler : public IErrorHandler<true> { - MOCK_METHOD(void, AbortSession, (TSessionClosedEvent&& closeEvent), (override)); - }; struct TFakeContext : public NGrpc::IQueueClientContext { IQueueClientContextPtr CreateContext() override { @@ -499,7 +496,6 @@ public: TString ClusterName = "cluster"; TLog Log = CreateLogBackend("cerr"); std::shared_ptr<TReadSessionEventsQueue<true>> EventsQueue; - TIntrusivePtr<testing::StrictMock<TMockErrorHandler>> MockErrorHandler = MakeIntrusive<testing::StrictMock<TMockErrorHandler>>(); std::shared_ptr<TFakeContext> FakeContext = std::make_shared<TFakeContext>(); std::shared_ptr<TMockProcessorFactory> MockProcessorFactory = std::make_shared<TMockProcessorFactory>(); TIntrusivePtr<TMockReadSessionProcessor> MockProcessor = MakeIntrusive<TMockReadSessionProcessor>(); @@ -585,7 +581,6 @@ TReadSessionImplTestSetup::TReadSessionImplTestSetup() { Mock::AllowLeak(MockProcessor.Get()); Mock::AllowLeak(MockProcessorFactory.get()); - Mock::AllowLeak(MockErrorHandler.Get()); } TReadSessionImplTestSetup::~TReadSessionImplTestSetup() noexcept(false) { @@ -625,7 +620,6 @@ TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() { Log, MockProcessorFactory, GetEventsQueue(), - MockErrorHandler, FakeContext, PartitionIdStart, PartitionIdStep); } @@ -1013,12 +1007,14 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { setup.MockProcessorFactory->FailCreation(); }); - EXPECT_CALL(*setup.MockErrorHandler, AbortSession(_)); + //EXPECT_CALL(*setup.MockErrorHandler, AbortSession(_)); setup.GetSession()->Start(); setup.MockProcessorFactory->Wait(); + UNIT_ASSERT(setup.GetEventsQueue()->IsClosed()); } + Y_UNIT_TEST(StopsRetryAfterFailedAttempt) { StopsRetryAfterFailedAttemptImpl(false); } @@ -1882,8 +1878,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { 1ull, 1ull, 0ull, - session, - nullptr); + session); TPartitionData<true> message; Ydb::PersQueue::V1::MigrationStreamingReadServerMessage_DataBatch_Batch* batch = diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index f2b7545b9c..8868748dcc 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -43,7 +43,6 @@ TReadSession::~TReadSession() { } void TReadSession::Start() { - ErrorHandler = MakeIntrusive<NPersQueue::TErrorHandler<false>>(weak_from_this()); Tracker = std::make_shared<NPersQueue::TImplTracker>(); EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings, weak_from_this(), Tracker); @@ -85,7 +84,6 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false> Log, Client->CreateReadSessionConnectionProcessorFactory(), EventsQueue, - ErrorHandler, context, 1, 1, Tracker); @@ -120,11 +118,19 @@ NThreading::TFuture<void> TReadSession::WaitEvent() { } TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) { - return EventsQueue->GetEvents(block, maxEventsCount, maxByteSize); + auto res = EventsQueue->GetEvents(block, maxEventsCount, maxByteSize); + if (EventsQueue->IsClosed()) { + Abort(EStatus::ABORTED, "Aborted"); + } + return res; } TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxByteSize) { - return EventsQueue->GetEvent(block, maxByteSize); + auto res = EventsQueue->GetEvent(block, maxByteSize); + if (EventsQueue->IsClosed()) { + Abort(EStatus::ABORTED, "Aborted"); + } + return res; } bool TReadSession::Close(TDuration timeout) { @@ -300,21 +306,27 @@ void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) { } } -void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred) { + +void TReadSession::AbortImpl(NPersQueue::TDeferredActions<false>&) { Y_VERIFY(Lock.IsLocked()); if (!Aborting) { Aborting = true; - LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); if (DumpCountersContext) { DumpCountersContext->Cancel(); DumpCountersContext.reset(); } Session->Abort(); - EventsQueue->Close(std::move(closeEvent), deferred); } } +void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred) { + LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); + + EventsQueue->Close(std::move(closeEvent), deferred); + AbortImpl(deferred); +} + void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, NPersQueue::TDeferredActions<false>& deferred) { Y_VERIFY(Lock.IsLocked()); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h index f58fa2786e..e49bc9bdd6 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h @@ -107,6 +107,7 @@ private: void Abort(EStatus statusCode, NYql::TIssues&& issues); void Abort(EStatus statusCode, const TString& message); + void AbortImpl(NPersQueue::TDeferredActions<false>& deferred); void AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred); void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, NPersQueue::TDeferredActions<false>& deferred); void AbortImpl(EStatus statusCode, const TString& message, NPersQueue::TDeferredActions<false>& deferred); @@ -118,7 +119,6 @@ private: TLog Log; std::shared_ptr<TTopicClient::TImpl> Client; std::shared_ptr<TGRpcConnectionsImpl> Connections; - NPersQueue::IErrorHandler<false>::TPtr ErrorHandler; TDbDriverStatePtr DbDriverState; TAdaptiveLock Lock; std::shared_ptr<NPersQueue::TImplTracker> Tracker; |