diff options
| author | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
| commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
| tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/http/retry_request.cpp | |
| parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/http/retry_request.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/http/retry_request.cpp | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp new file mode 100644 index 00000000000..ba116edcf7a --- /dev/null +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -0,0 +1,149 @@ +#include "retry_request.h" + +#include "context.h" +#include "helpers.h" +#include "http_client.h" +#include "requests.h" + +#include <yt/cpp/mapreduce/common/wait_proxy.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/tvm.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/node/node_io.h> + +namespace NYT { +namespace NDetail { + +/////////////////////////////////////////////////////////////////////////////// + +static TResponseInfo Request( + const TClientContext& context, + THttpHeader& header, + TMaybe<TStringBuf> body, + const TString& requestId, + const TRequestConfig& config) +{ + TString hostName; + if (config.IsHeavy) { + hostName = GetProxyForHeavyRequest(context); + } else { + hostName = context.ServerName; + } + + auto url = GetFullUrl(hostName, context, header); + + auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body); + + TResponseInfo result; + result.RequestId = requestId; + result.Response = response->GetResponse(); + result.HttpCode = response->GetStatusCode(); + return result; +} + +TResponseInfo RequestWithoutRetry( + const TClientContext& context, + THttpHeader& header, + TMaybe<TStringBuf> body, + const TRequestConfig& config) +{ + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + + if (context.ImpersonationUser) { + header.SetImpersonationUser(*context.ImpersonationUser); + } + + if (header.HasMutationId()) { + header.RemoveParameter("retry"); + header.AddMutationId(); + } + auto requestId = CreateGuidAsString(); + return Request(context, header, body, requestId, config); +} + + +TResponseInfo RetryRequestWithPolicy( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + THttpHeader& header, + TMaybe<TStringBuf> body, + const TRequestConfig& config) +{ + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + + if (context.ImpersonationUser) { + header.SetImpersonationUser(*context.ImpersonationUser); + } + + bool useMutationId = header.HasMutationId(); + bool retryWithSameMutationId = false; + + if (!retryPolicy) { + retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); + } + + while (true) { + auto requestId = CreateGuidAsString(); + try { + retryPolicy->NotifyNewAttempt(); + + if (useMutationId) { + if (retryWithSameMutationId) { + header.AddParameter("retry", true, /* overwrite = */ true); + } else { + header.RemoveParameter("retry"); + header.AddMutationId(); + } + } + + return Request(context, header, body, requestId, config); + } catch (const TErrorResponse& e) { + LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription()); + retryWithSameMutationId = e.IsTransportError(); + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnRetriableError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } catch (const std::exception& e) { + LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription()); + retryWithSameMutationId = true; + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnGenericError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } + } + + Y_FAIL("Retries must have either succeeded or thrown an exception"); +} + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT |
