diff options
author | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/retry_heavy_write_request.cpp | |
parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
download | ydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/retry_heavy_write_request.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/retry_heavy_write_request.cpp | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp new file mode 100644 index 0000000000..b4e4975d7f --- /dev/null +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp @@ -0,0 +1,87 @@ +#include "retry_heavy_write_request.h" + +#include "transaction.h" +#include "transaction_pinger.h" + +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/tvm.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/cpp/mapreduce/http/helpers.h> +#include <yt/cpp/mapreduce/http/http_client.h> +#include <yt/cpp/mapreduce/http/requests.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +namespace NYT { + +using ::ToString; + +//////////////////////////////////////////////////////////////////////////////// + +void RetryHeavyWriteRequest( + const IClientRetryPolicyPtr& clientRetryPolicy, + const ITransactionPingerPtr& transactionPinger, + const TClientContext& context, + const TTransactionId& parentId, + THttpHeader& header, + std::function<THolder<IInputStream>()> streamMaker) +{ + int retryCount = context.Config->RetryCount; + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + + for (int attempt = 0; attempt < retryCount; ++attempt) { + TPingableTransaction attemptTx(clientRetryPolicy, context, parentId, transactionPinger->GetChildTxPinger(), TStartTransactionOptions()); + + auto input = streamMaker(); + TString requestId; + + try { + auto hostName = GetProxyForHeavyRequest(context); + requestId = CreateGuidAsString(); + + header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true); + header.SetRequestCompression(ToString(context.Config->ContentEncoding)); + + auto request = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header); + TransferData(input.Get(), request->GetStream()); + request->Finish()->GetResponse(); + } catch (TErrorResponse& e) { + YT_LOG_ERROR("RSP %v - attempt %v failed", + requestId, + attempt); + + if (!IsRetriable(e) || attempt + 1 == retryCount) { + throw; + } + NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, context.Config)); + continue; + + } catch (std::exception& e) { + YT_LOG_ERROR("RSP %v - %v - attempt %v failed", + requestId, + e.what(), + attempt); + + if (attempt + 1 == retryCount) { + throw; + } + NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, context.Config)); + continue; + } + + attemptTx.Commit(); + return; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |