diff options
author | alexnick <alexnick@ydb.tech> | 2022-10-05 12:31:46 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-10-05 12:31:46 +0300 |
commit | 22a9f2946efed8beb7d3c68dbeb8c14d5f63edf9 (patch) | |
tree | 5f28a53222b2299093f3e699d963c28d1c27daf6 | |
parent | 0ce4feb0aeffff80b8f48ae565835263a8bbd7fa (diff) | |
download | ydb-22a9f2946efed8beb7d3c68dbeb8c14d5f63edf9.tar.gz |
fixed connection loss for topic-service
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h | 6 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp | 42 |
2 files changed, 28 insertions, 20 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 6228462c687..d41ea16dd80 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -1023,7 +1023,7 @@ public: , ClientContext(std::move(clientContext)) , CookieMapping(ErrorHandler) , ReadSizeBudget(GetCompressedDataSizeLimit()) - , ReadSizeServerDelta(GetCompressedDataSizeLimit()) + , ReadSizeServerDelta(0) { } @@ -1243,8 +1243,8 @@ private: bool Closing = false; std::function<void()> CloseCallback; std::atomic<int> DecompressionTasksInflight = 0; - ui64 ReadSizeBudget; - i64 ReadSizeServerDelta; + i64 ReadSizeBudget; + i64 ReadSizeServerDelta = 0; }; // High level class that manages several read session impls. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index afa8aa982a5..d86e95b6d39 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -224,22 +224,27 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain WaitingReadResponse = false; ServerMessage = std::make_shared<TServerMessage<UseMigrationProtocol>>(); ++ConnectionGeneration; - if (RetryState) { + + ReadSizeBudget += ReadSizeServerDelta; + ReadSizeServerDelta = 0; + + if (!RetryState) { + RetryState = Settings.RetryPolicy_->CreateRetryState(); + } + if (!status.Ok()) { TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(status.Status); - if (nextDelay) { - delay = *nextDelay; - delayContext = ClientContext->CreateContext(); - if (!delayContext) { - return false; - } - Log.Write(TLOG_DEBUG, GetLogPrefix() - << "Reconnecting session to cluster " << ClusterName << " in " << delay); - } else { + if (!nextDelay) { + return false; + } + delay = *nextDelay; + delayContext = ClientContext->CreateContext(); + if (!delayContext) { return false; } - } else { - RetryState = Settings.RetryPolicy_->CreateRetryState(); } + + Log.Write(TLOG_DEBUG, GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in " << delay); + ++ConnectionAttemptsDone; // Set new context @@ -350,7 +355,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnect( if (st.Ok()) { Processor = std::move(processor); - RetryState = nullptr; ConnectionAttemptsDone = 0; InitImpl(deferred); return; @@ -452,10 +456,12 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImp if constexpr (UseMigrationProtocol) { req.mutable_read(); } else { - if (ReadSizeBudget == 0 || ReadSizeServerDelta <= 0) { + if (ReadSizeBudget <= 0 || ReadSizeServerDelta + ReadSizeBudget <= 0) { return; } req.mutable_read_request()->set_bytes_size(ReadSizeBudget); + ReadSizeServerDelta += ReadSizeBudget; + ReadSizeBudget = 0; } @@ -826,8 +832,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnReadDone(NGrpc::TGrp } if (!errorStatus.Ok()) { ++*Settings.Counters_->Errors; - // Explicitly create retry state to determine whether we should connect to server again. - RetryState = Settings.RetryPolicy_->CreateRetryState(); + if (!Reconnect(errorStatus)) { ErrorHandler->AbortSession(std::move(errorStatus)); } @@ -843,6 +848,8 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); + RetryState = nullptr; + // Successful init. Do nothing. ContinueReadingDataImpl(); } @@ -1054,6 +1061,8 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( TDeferredActions<false>& deferred) { // Assumes that we're under lock. Y_UNUSED(deferred); + RetryState = nullptr; + Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); // Successful init. Do nothing. @@ -1327,7 +1336,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64 } if constexpr (!UseMigrationProtocol) { ReadSizeBudget += serverBytesSize; - ReadSizeServerDelta += serverBytesSize; } ContinueReadingDataImpl(); StartDecompressionTasksImpl(deferred); |