aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqyryq <qyryq@ydb.tech>2024-04-19 16:49:50 +0300
committerGitHub <noreply@github.com>2024-04-19 16:49:50 +0300
commit5a7fff3fd7b08999fbbf508417ef3b027bb21ecd (patch)
tree7007e0026d8b5a96ffd701e8771797080679aac2
parente5efcb1959b8e9703afccca7421f9f11123c6a3c (diff)
downloadydb-5a7fff3fd7b08999fbbf508417ef3b027bb21ecd.tar.gz
ydb_persqueue_public: cancel previous client context on reconnect (#3927)
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp17
1 files changed, 10 insertions, 7 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp
index ced65e048a..25efd7790b 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp
@@ -445,18 +445,19 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
}
++ConnectionGeneration;
auto subclient = Client->GetClientForEndpoint(endpoint);
- connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory();
- auto clientContext = subclient->CreateContext();
- ConnectionFactory = connectionFactory;
- ClientContext = std::move(clientContext);
- ServerMessage = std::make_shared<TServerMessage>();
-
- if (!ClientContext) {
+ auto clientContext = subclient->CreateContext();
+ if (!clientContext) {
AbortImpl();
// Grpc and WriteSession is closing right now.
return;
}
+ auto prevClientContext = std::exchange(ClientContext, clientContext);
+
+ ServerMessage = std::make_shared<TServerMessage>();
+
+ connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory();
+ ConnectionFactory = connectionFactory;
connectContext = ClientContext->CreateContext();
if (delay)
@@ -477,8 +478,10 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
if (prevConnectDelayContext)
Cancel(prevConnectDelayContext);
Cancel(prevConnectTimeoutContext);
+ Cancel(prevClientContext);
Y_ASSERT(connectContext);
Y_ASSERT(connectTimeoutContext);
+
reqSettings = TRpcRequestSettings::Make(Settings);
connectCallback = [cbContext = SelfContext,