aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-10-05 12:31:46 +0300
committeralexnick <alexnick@ydb.tech>2022-10-05 12:31:46 +0300
commit22a9f2946efed8beb7d3c68dbeb8c14d5f63edf9 (patch)
tree5f28a53222b2299093f3e699d963c28d1c27daf6
parent0ce4feb0aeffff80b8f48ae565835263a8bbd7fa (diff)
downloadydb-22a9f2946efed8beb7d3c68dbeb8c14d5f63edf9.tar.gz
fixed connection loss for topic-service
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp42
2 files changed, 28 insertions, 20 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index 6228462c687..d41ea16dd80 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -1023,7 +1023,7 @@ public:
, ClientContext(std::move(clientContext))
, CookieMapping(ErrorHandler)
, ReadSizeBudget(GetCompressedDataSizeLimit())
- , ReadSizeServerDelta(GetCompressedDataSizeLimit())
+ , ReadSizeServerDelta(0)
{
}
@@ -1243,8 +1243,8 @@ private:
bool Closing = false;
std::function<void()> CloseCallback;
std::atomic<int> DecompressionTasksInflight = 0;
- ui64 ReadSizeBudget;
- i64 ReadSizeServerDelta;
+ i64 ReadSizeBudget;
+ i64 ReadSizeServerDelta = 0;
};
// High level class that manages several read session impls.
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index afa8aa982a5..d86e95b6d39 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -224,22 +224,27 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain
WaitingReadResponse = false;
ServerMessage = std::make_shared<TServerMessage<UseMigrationProtocol>>();
++ConnectionGeneration;
- if (RetryState) {
+
+ ReadSizeBudget += ReadSizeServerDelta;
+ ReadSizeServerDelta = 0;
+
+ if (!RetryState) {
+ RetryState = Settings.RetryPolicy_->CreateRetryState();
+ }
+ if (!status.Ok()) {
TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(status.Status);
- if (nextDelay) {
- delay = *nextDelay;
- delayContext = ClientContext->CreateContext();
- if (!delayContext) {
- return false;
- }
- Log.Write(TLOG_DEBUG, GetLogPrefix()
- << "Reconnecting session to cluster " << ClusterName << " in " << delay);
- } else {
+ if (!nextDelay) {
+ return false;
+ }
+ delay = *nextDelay;
+ delayContext = ClientContext->CreateContext();
+ if (!delayContext) {
return false;
}
- } else {
- RetryState = Settings.RetryPolicy_->CreateRetryState();
}
+
+ Log.Write(TLOG_DEBUG, GetLogPrefix() << "Reconnecting session to cluster " << ClusterName << " in " << delay);
+
++ConnectionAttemptsDone;
// Set new context
@@ -350,7 +355,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnect(
if (st.Ok()) {
Processor = std::move(processor);
- RetryState = nullptr;
ConnectionAttemptsDone = 0;
InitImpl(deferred);
return;
@@ -452,10 +456,12 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImp
if constexpr (UseMigrationProtocol) {
req.mutable_read();
} else {
- if (ReadSizeBudget == 0 || ReadSizeServerDelta <= 0) {
+ if (ReadSizeBudget <= 0 || ReadSizeServerDelta + ReadSizeBudget <= 0) {
return;
}
req.mutable_read_request()->set_bytes_size(ReadSizeBudget);
+ ReadSizeServerDelta += ReadSizeBudget;
+
ReadSizeBudget = 0;
}
@@ -826,8 +832,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnReadDone(NGrpc::TGrp
}
if (!errorStatus.Ok()) {
++*Settings.Counters_->Errors;
- // Explicitly create retry state to determine whether we should connect to server again.
- RetryState = Settings.RetryPolicy_->CreateRetryState();
+
if (!Reconnect(errorStatus)) {
ErrorHandler->AbortSession(std::move(errorStatus));
}
@@ -843,6 +848,8 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
+ RetryState = nullptr;
+
// Successful init. Do nothing.
ContinueReadingDataImpl();
}
@@ -1054,6 +1061,8 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
TDeferredActions<false>& deferred) { // Assumes that we're under lock.
Y_UNUSED(deferred);
+ RetryState = nullptr;
+
Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
// Successful init. Do nothing.
@@ -1327,7 +1336,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64
}
if constexpr (!UseMigrationProtocol) {
ReadSizeBudget += serverBytesSize;
- ReadSizeServerDelta += serverBytesSize;
}
ContinueReadingDataImpl();
StartDecompressionTasksImpl(deferred);