diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-12-13 10:22:13 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-13 10:22:13 +0000 |
commit | e73e490feb4e1f63d097697324aa48b643a62317 (patch) | |
tree | f63fe3d15819a5148ade51609c5211251d93e425 /yt/cpp/mapreduce/http | |
parent | 19346460a8060a0ed4731edb192745642ff34b3d (diff) | |
parent | 4dde77404d1eae4a633d1cc3807142409a9938eb (diff) | |
download | ydb-e73e490feb4e1f63d097697324aa48b643a62317.tar.gz |
Merge pull request #12582 from vitalyisaev2/rightlib_20241212
Merge rightlib 20241212
Diffstat (limited to 'yt/cpp/mapreduce/http')
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/retry_request.cpp | 10 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/retry_request.h | 64 |
4 files changed, 81 insertions, 4 deletions
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index ca243a929a..41ee56f672 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -207,7 +207,7 @@ void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite AddParameter("operation_id", GetGuidAsString(operationId), overwrite); } -void THttpHeader::AddMutationId() +TMutationId THttpHeader::AddMutationId() { TGUID guid; @@ -220,6 +220,8 @@ void THttpHeader::AddMutationId() guid.dw[2] = GetPID() ^ MicroSeconds(); AddParameter("mutation_id", GetGuidAsString(guid), true); + + return guid; } bool THttpHeader::HasMutationId() const @@ -227,6 +229,10 @@ bool THttpHeader::HasMutationId() const return Parameters_.contains("mutation_id"); } +void THttpHeader::SetMutationId(TMutationId mutationId) { + AddParameter("mutation_id", GetGuidAsString(mutationId), /* overwrite */ true); +} + void THttpHeader::SetToken(const TString& token) { Token_ = token; diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 618b1e2c22..0f5e9034ee 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -59,8 +59,9 @@ public: void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false); void AddPath(const TString& path, bool overwrite = false); void AddOperationId(const TOperationId& operationId, bool overwrite = false); - void AddMutationId(); + TMutationId AddMutationId(); bool HasMutationId() const; + void SetMutationId(TMutationId mutationId); void SetToken(const TString& token); void SetProxyAddress(const TString& proxyAddress); diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp index 0c719eeda3..3330cf696c 100644 --- a/yt/cpp/mapreduce/http/retry_request.cpp +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -49,6 +49,7 @@ static TResponseInfo Request( TResponseInfo RequestWithoutRetry( const TClientContext& context, + TMutationId& mutationId, THttpHeader& header, TMaybe<TStringBuf> body, const TRequestConfig& config) @@ -64,8 +65,13 @@ TResponseInfo RequestWithoutRetry( } if (header.HasMutationId()) { - header.RemoveParameter("retry"); - header.AddMutationId(); + if (mutationId.IsEmpty()) { + header.RemoveParameter("retry"); + mutationId = header.AddMutationId(); + } else { + header.AddParameter("retry", true, /* overwrite */ true); + header.SetMutationId(mutationId); + } } auto requestId = CreateGuidAsString(); return Request(context, header, body, requestId, config); diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h index a6007d6eae..19bf0a7c14 100644 --- a/yt/cpp/mapreduce/http/retry_request.h +++ b/yt/cpp/mapreduce/http/retry_request.h @@ -2,8 +2,13 @@ #include "fwd.h" +#include <yt/cpp/mapreduce/interface/errors.h> #include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + #include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> #include <yt/cpp/mapreduce/http/http_client.h> @@ -32,6 +37,64 @@ struct TRequestConfig //////////////////////////////////////////////////////////////////////////////// +template <typename TResult> +TResult RequestWithRetry( + IRequestRetryPolicyPtr retryPolicy, + std::function<TResult(TMutationId&)> func) +{ + bool useSameMutationId = false; + TMutationId mutationId; + + while (true) { + try { + retryPolicy->NotifyNewAttempt(); + if constexpr (std::is_same_v<TResult, void>) { + func(mutationId); + return; + } else { + return func(mutationId); + } + } catch (const TErrorResponse& e) { + YT_LOG_ERROR("Retry failed %v - %v", + e.GetError().GetMessage(), + retryPolicy->GetAttemptDescription()); + + useSameMutationId = e.IsTransportError(); + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnRetriableError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } catch (const std::exception& e) { + YT_LOG_ERROR("Retry failed %v - %v", + e.what(), + retryPolicy->GetAttemptDescription()); + + useSameMutationId = true; + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnGenericError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } + if (!useSameMutationId) { + mutationId = {}; + } + } +} + // Retry request with given `header' and `body' using `retryPolicy'. // If `retryPolicy == nullptr' use default, currently `TAttemptLimitedRetryPolicy(TConfig::Get()->RetryCount)`. TResponseInfo RetryRequestWithPolicy( @@ -43,6 +106,7 @@ TResponseInfo RetryRequestWithPolicy( TResponseInfo RequestWithoutRetry( const TClientContext& context, + TMutationId& mutationId, THttpHeader& header, TMaybe<TStringBuf> body = {}, const TRequestConfig& config = TRequestConfig()); |