aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2022-08-01 14:02:49 +0300
committershmel1k <shmel1k@ydb.tech>2022-08-01 14:02:49 +0300
commit0ae3ab76a37cd9766a4bc1c402021ddfda25c163 (patch)
tree9c8a1ee3e48b3b1550abb79fe1b2fee5b0d07391
parent5cbf4c3a664cee3d79b1b449b252fd28c373ee98 (diff)
downloadydb-0ae3ab76a37cd9766a4bc1c402021ddfda25c163.tar.gz
[] add write session close fix
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp34
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp4
2 files changed, 31 insertions, 7 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index eae5146e8a..8eb1bfc4e9 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -344,13 +344,20 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions
bool TReadSession::Close(TDuration timeout) {
Log.Write(TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
- with_lock (Lock) {
- Cancel(ClusterDiscoveryDelayContext);
- Cancel(DumpCountersContext);
- }
// Log final counters.
DumpCountersToLog();
+ with_lock (Lock) {
+ if (ClusterDiscoveryDelayContext) {
+ ClusterDiscoveryDelayContext->Cancel();
+ ClusterDiscoveryDelayContext.reset();
+ }
+ if (DumpCountersContext) {
+ DumpCountersContext->Cancel();
+ DumpCountersContext.reset();
+ }
+ }
+
std::vector<TSingleClusterReadSessionImpl::TPtr> sessions;
NThreading::TPromise<bool> promise = NThreading::NewPromise<bool>();
std::shared_ptr<std::atomic<size_t>> count = std::make_shared<std::atomic<size_t>>(0);
@@ -429,8 +436,14 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions&
if (!Aborting) {
Aborting = true;
Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString());
- Cancel(ClusterDiscoveryDelayContext);
- Cancel(DumpCountersContext);
+ if (ClusterDiscoveryDelayContext) {
+ ClusterDiscoveryDelayContext->Cancel();
+ ClusterDiscoveryDelayContext.reset();
+ }
+ if (DumpCountersContext) {
+ DumpCountersContext->Cancel();
+ DumpCountersContext.reset();
+ }
for (auto& [cluster, sessionInfo] : ClusterSessions) {
if (sessionInfo.Session) {
sessionInfo.Session->Abort();
@@ -1423,6 +1436,11 @@ void TSingleClusterReadSessionImpl::Abort() {
Cancel(ConnectTimeoutContext);
Cancel(ConnectDelayContext);
+ if (ClientContext) {
+ ClientContext->Cancel();
+ ClientContext.reset();
+ }
+
if (Processor) {
Processor->Cancel();
}
@@ -1463,6 +1481,10 @@ void TSingleClusterReadSessionImpl::CallCloseCallbackImpl() {
CloseCallback = {};
}
Aborting = true; // So abort call will have no effect.
+ if (ClientContext) {
+ ClientContext->Cancel();
+ ClientContext.reset();
+ }
}
void TSingleClusterReadSessionImpl::StopReadingData() {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index 90d6731028..20e7318421 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -1214,9 +1214,11 @@ void TWriteSession::AbortImpl() {
Cancel(ConnectContext);
Cancel(ConnectTimeoutContext);
Cancel(ConnectDelayContext);
- ///Cancel(ClientContext);
if (Processor)
Processor->Cancel();
+
+ Cancel(ClientContext);
+ ClientContext.reset(); // removes context from contexts set from underlying gRPC-client.
}
}