aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http
diff options
context:
space:
mode:
authorVitaly Isaev <vitalyisaev@ydb.tech>2024-12-12 15:39:00 +0000
committerVitaly Isaev <vitalyisaev@ydb.tech>2024-12-12 15:39:00 +0000
commit827b115675004838023427572a7c69f40a86a80a (patch)
treee99c953fe494b9de8d8597a15859d77c81f118c7 /yt/cpp/mapreduce/http
parent42701242eaf5be980cb935631586d0e90b82641c (diff)
parentfab222fd8176d00eee5ddafc6bce8cb95a6e3ab0 (diff)
downloadydb-827b115675004838023427572a7c69f40a86a80a.tar.gz
Merge branch 'rightlib' into rightlib_20241212
Diffstat (limited to 'yt/cpp/mapreduce/http')
-rw-r--r--yt/cpp/mapreduce/http/http.cpp8
-rw-r--r--yt/cpp/mapreduce/http/http.h3
-rw-r--r--yt/cpp/mapreduce/http/retry_request.cpp10
-rw-r--r--yt/cpp/mapreduce/http/retry_request.h64
4 files changed, 81 insertions, 4 deletions
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index ca243a929a..41ee56f672 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -207,7 +207,7 @@ void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite
AddParameter("operation_id", GetGuidAsString(operationId), overwrite);
}
-void THttpHeader::AddMutationId()
+TMutationId THttpHeader::AddMutationId()
{
TGUID guid;
@@ -220,6 +220,8 @@ void THttpHeader::AddMutationId()
guid.dw[2] = GetPID() ^ MicroSeconds();
AddParameter("mutation_id", GetGuidAsString(guid), true);
+
+ return guid;
}
bool THttpHeader::HasMutationId() const
@@ -227,6 +229,10 @@ bool THttpHeader::HasMutationId() const
return Parameters_.contains("mutation_id");
}
+void THttpHeader::SetMutationId(TMutationId mutationId) {
+ AddParameter("mutation_id", GetGuidAsString(mutationId), /* overwrite */ true);
+}
+
void THttpHeader::SetToken(const TString& token)
{
Token_ = token;
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
index 618b1e2c22..0f5e9034ee 100644
--- a/yt/cpp/mapreduce/http/http.h
+++ b/yt/cpp/mapreduce/http/http.h
@@ -59,8 +59,9 @@ public:
void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false);
void AddPath(const TString& path, bool overwrite = false);
void AddOperationId(const TOperationId& operationId, bool overwrite = false);
- void AddMutationId();
+ TMutationId AddMutationId();
bool HasMutationId() const;
+ void SetMutationId(TMutationId mutationId);
void SetToken(const TString& token);
void SetProxyAddress(const TString& proxyAddress);
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp
index 0c719eeda3..3330cf696c 100644
--- a/yt/cpp/mapreduce/http/retry_request.cpp
+++ b/yt/cpp/mapreduce/http/retry_request.cpp
@@ -49,6 +49,7 @@ static TResponseInfo Request(
TResponseInfo RequestWithoutRetry(
const TClientContext& context,
+ TMutationId& mutationId,
THttpHeader& header,
TMaybe<TStringBuf> body,
const TRequestConfig& config)
@@ -64,8 +65,13 @@ TResponseInfo RequestWithoutRetry(
}
if (header.HasMutationId()) {
- header.RemoveParameter("retry");
- header.AddMutationId();
+ if (mutationId.IsEmpty()) {
+ header.RemoveParameter("retry");
+ mutationId = header.AddMutationId();
+ } else {
+ header.AddParameter("retry", true, /* overwrite */ true);
+ header.SetMutationId(mutationId);
+ }
}
auto requestId = CreateGuidAsString();
return Request(context, header, body, requestId, config);
diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h
index a6007d6eae..19bf0a7c14 100644
--- a/yt/cpp/mapreduce/http/retry_request.h
+++ b/yt/cpp/mapreduce/http/retry_request.h
@@ -2,8 +2,13 @@
#include "fwd.h"
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
#include <yt/cpp/mapreduce/http/http_client.h>
@@ -32,6 +37,64 @@ struct TRequestConfig
////////////////////////////////////////////////////////////////////////////////
+template <typename TResult>
+TResult RequestWithRetry(
+ IRequestRetryPolicyPtr retryPolicy,
+ std::function<TResult(TMutationId&)> func)
+{
+ bool useSameMutationId = false;
+ TMutationId mutationId;
+
+ while (true) {
+ try {
+ retryPolicy->NotifyNewAttempt();
+ if constexpr (std::is_same_v<TResult, void>) {
+ func(mutationId);
+ return;
+ } else {
+ return func(mutationId);
+ }
+ } catch (const TErrorResponse& e) {
+ YT_LOG_ERROR("Retry failed %v - %v",
+ e.GetError().GetMessage(),
+ retryPolicy->GetAttemptDescription());
+
+ useSameMutationId = e.IsTransportError();
+
+ if (!IsRetriable(e)) {
+ throw;
+ }
+
+ auto maybeRetryTimeout = retryPolicy->OnRetriableError(e);
+ if (maybeRetryTimeout) {
+ TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
+ } else {
+ throw;
+ }
+ } catch (const std::exception& e) {
+ YT_LOG_ERROR("Retry failed %v - %v",
+ e.what(),
+ retryPolicy->GetAttemptDescription());
+
+ useSameMutationId = true;
+
+ if (!IsRetriable(e)) {
+ throw;
+ }
+
+ auto maybeRetryTimeout = retryPolicy->OnGenericError(e);
+ if (maybeRetryTimeout) {
+ TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
+ } else {
+ throw;
+ }
+ }
+ if (!useSameMutationId) {
+ mutationId = {};
+ }
+ }
+}
+
// Retry request with given `header' and `body' using `retryPolicy'.
// If `retryPolicy == nullptr' use default, currently `TAttemptLimitedRetryPolicy(TConfig::Get()->RetryCount)`.
TResponseInfo RetryRequestWithPolicy(
@@ -43,6 +106,7 @@ TResponseInfo RetryRequestWithPolicy(
TResponseInfo RequestWithoutRetry(
const TClientContext& context,
+ TMutationId& mutationId,
THttpHeader& header,
TMaybe<TStringBuf> body = {},
const TRequestConfig& config = TRequestConfig());