diff options
author | qyryq <qyryq@ydb.tech> | 2024-04-19 16:49:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-19 16:49:50 +0300 |
commit | 5a7fff3fd7b08999fbbf508417ef3b027bb21ecd (patch) | |
tree | 7007e0026d8b5a96ffd701e8771797080679aac2 | |
parent | e5efcb1959b8e9703afccca7421f9f11123c6a3c (diff) | |
download | ydb-5a7fff3fd7b08999fbbf508417ef3b027bb21ecd.tar.gz |
ydb_persqueue_public: cancel previous client context on reconnect (#3927)
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp index ced65e048a..25efd7790b 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/write_session_impl.cpp @@ -445,18 +445,19 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin } ++ConnectionGeneration; auto subclient = Client->GetClientForEndpoint(endpoint); - connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory(); - auto clientContext = subclient->CreateContext(); - ConnectionFactory = connectionFactory; - ClientContext = std::move(clientContext); - ServerMessage = std::make_shared<TServerMessage>(); - - if (!ClientContext) { + auto clientContext = subclient->CreateContext(); + if (!clientContext) { AbortImpl(); // Grpc and WriteSession is closing right now. return; } + auto prevClientContext = std::exchange(ClientContext, clientContext); + + ServerMessage = std::make_shared<TServerMessage>(); + + connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory(); + ConnectionFactory = connectionFactory; connectContext = ClientContext->CreateContext(); if (delay) @@ -477,8 +478,10 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin if (prevConnectDelayContext) Cancel(prevConnectDelayContext); Cancel(prevConnectTimeoutContext); + Cancel(prevClientContext); Y_ASSERT(connectContext); Y_ASSERT(connectTimeoutContext); + reqSettings = TRpcRequestSettings::Make(Settings); connectCallback = [cbContext = SelfContext, |