diff options
author | shmel1k <shmel1k@ydb.tech> | 2022-08-01 14:02:49 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2022-08-01 14:02:49 +0300 |
commit | 0ae3ab76a37cd9766a4bc1c402021ddfda25c163 (patch) | |
tree | 9c8a1ee3e48b3b1550abb79fe1b2fee5b0d07391 | |
parent | 5cbf4c3a664cee3d79b1b449b252fd28c373ee98 (diff) | |
download | ydb-0ae3ab76a37cd9766a4bc1c402021ddfda25c163.tar.gz |
[] add write session close fix
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp | 34 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp | 4 |
2 files changed, 31 insertions, 7 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index eae5146e8a..8eb1bfc4e9 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -344,13 +344,20 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions bool TReadSession::Close(TDuration timeout) { Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); - with_lock (Lock) { - Cancel(ClusterDiscoveryDelayContext); - Cancel(DumpCountersContext); - } // Log final counters. DumpCountersToLog(); + with_lock (Lock) { + if (ClusterDiscoveryDelayContext) { + ClusterDiscoveryDelayContext->Cancel(); + ClusterDiscoveryDelayContext.reset(); + } + if (DumpCountersContext) { + DumpCountersContext->Cancel(); + DumpCountersContext.reset(); + } + } + std::vector<TSingleClusterReadSessionImpl::TPtr> sessions; NThreading::TPromise<bool> promise = NThreading::NewPromise<bool>(); std::shared_ptr<std::atomic<size_t>> count = std::make_shared<std::atomic<size_t>>(0); @@ -429,8 +436,14 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions& if (!Aborting) { Aborting = true; Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); - Cancel(ClusterDiscoveryDelayContext); - Cancel(DumpCountersContext); + if (ClusterDiscoveryDelayContext) { + ClusterDiscoveryDelayContext->Cancel(); + ClusterDiscoveryDelayContext.reset(); + } + if (DumpCountersContext) { + DumpCountersContext->Cancel(); + DumpCountersContext.reset(); + } for (auto& [cluster, sessionInfo] : ClusterSessions) { if (sessionInfo.Session) { sessionInfo.Session->Abort(); @@ -1423,6 +1436,11 @@ void TSingleClusterReadSessionImpl::Abort() { Cancel(ConnectTimeoutContext); Cancel(ConnectDelayContext); + if (ClientContext) { + ClientContext->Cancel(); + ClientContext.reset(); + } + if (Processor) { Processor->Cancel(); } @@ -1463,6 +1481,10 @@ void TSingleClusterReadSessionImpl::CallCloseCallbackImpl() { CloseCallback = {}; } Aborting = true; // So abort call will have no effect. + if (ClientContext) { + ClientContext->Cancel(); + ClientContext.reset(); + } } void TSingleClusterReadSessionImpl::StopReadingData() { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 90d6731028..20e7318421 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -1214,9 +1214,11 @@ void TWriteSession::AbortImpl() { Cancel(ConnectContext); Cancel(ConnectTimeoutContext); Cancel(ConnectDelayContext); - ///Cancel(ClientContext); if (Processor) Processor->Cancel(); + + Cancel(ClientContext); + ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. } } |