aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/retry_heavy_write_request.cpp')
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.cpp87
1 files changed, 87 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
new file mode 100644
index 0000000000..b4e4975d7f
--- /dev/null
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
@@ -0,0 +1,87 @@
+#include "retry_heavy_write_request.h"
+
+#include "transaction.h"
+#include "transaction_pinger.h"
+
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/tvm.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/cpp/mapreduce/http/helpers.h>
+#include <yt/cpp/mapreduce/http/http_client.h>
+#include <yt/cpp/mapreduce/http/requests.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+namespace NYT {
+
+using ::ToString;
+
+////////////////////////////////////////////////////////////////////////////////
+
+void RetryHeavyWriteRequest(
+ const IClientRetryPolicyPtr& clientRetryPolicy,
+ const ITransactionPingerPtr& transactionPinger,
+ const TClientContext& context,
+ const TTransactionId& parentId,
+ THttpHeader& header,
+ std::function<THolder<IInputStream>()> streamMaker)
+{
+ int retryCount = context.Config->RetryCount;
+ if (context.ServiceTicketAuth) {
+ header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(context.Token);
+ }
+
+ for (int attempt = 0; attempt < retryCount; ++attempt) {
+ TPingableTransaction attemptTx(clientRetryPolicy, context, parentId, transactionPinger->GetChildTxPinger(), TStartTransactionOptions());
+
+ auto input = streamMaker();
+ TString requestId;
+
+ try {
+ auto hostName = GetProxyForHeavyRequest(context);
+ requestId = CreateGuidAsString();
+
+ header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true);
+ header.SetRequestCompression(ToString(context.Config->ContentEncoding));
+
+ auto request = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
+ TransferData(input.Get(), request->GetStream());
+ request->Finish()->GetResponse();
+ } catch (TErrorResponse& e) {
+ YT_LOG_ERROR("RSP %v - attempt %v failed",
+ requestId,
+ attempt);
+
+ if (!IsRetriable(e) || attempt + 1 == retryCount) {
+ throw;
+ }
+ NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, context.Config));
+ continue;
+
+ } catch (std::exception& e) {
+ YT_LOG_ERROR("RSP %v - %v - attempt %v failed",
+ requestId,
+ e.what(),
+ attempt);
+
+ if (attempt + 1 == retryCount) {
+ throw;
+ }
+ NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, context.Config));
+ continue;
+ }
+
+ attemptTx.Commit();
+ return;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT