aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
diff options
context:
space:
mode:
authorermolovd <ermolovd@yandex-team.com>2023-11-28 09:30:16 +0300
committerermolovd <ermolovd@yandex-team.com>2023-11-28 09:59:46 +0300
commit5e58015f2e6fddae74e51d9de0870e5f7119cc12 (patch)
tree0f686ad847d54a66716d717404a1d615139ab22d /yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
parent86cac93557a380155b8a87abd5802e5d40c027bf (diff)
downloadydb-5e58015f2e6fddae74e51d9de0870e5f7119cc12.tar.gz
YT-19269: table writer implementation that doesn't wait for complete buffer before sending to network
Diffstat (limited to 'yt/cpp/mapreduce/client/retry_heavy_write_request.cpp')
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.cpp107
1 files changed, 107 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
index 54a2be02e3..253d7c0d44 100644
--- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
@@ -16,6 +16,8 @@
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <util/stream/null.h>
+
namespace NYT {
using ::ToString;
@@ -93,4 +95,109 @@ void RetryHeavyWriteRequest(
////////////////////////////////////////////////////////////////////////////////
+THeavyRequestRetrier::THeavyRequestRetrier(TParameters parameters)
+ : Parameters_(std::move(parameters))
+ , RequestRetryPolicy_(Parameters_.ClientRetryPolicy->CreatePolicyForGenericRequest())
+ , StreamFactory_([] {
+ return MakeHolder<TNullInput>();
+ })
+{
+ Retry([] { });
+}
+
+THeavyRequestRetrier::~THeavyRequestRetrier() = default;
+
+void THeavyRequestRetrier::Update(THeavyRequestRetrier::TStreamFactory streamFactory)
+{
+ StreamFactory_ = streamFactory;
+ Retry([this] {
+ auto stream = StreamFactory_();
+ stream->Skip(Attempt_->Offset);
+ auto transfered = stream->ReadAll(*Attempt_->Request->GetStream());
+ Attempt_->Offset += transfered;
+ });
+}
+
+void THeavyRequestRetrier::Finish()
+{
+ Retry([this] {
+ Attempt_->Request->Finish()->GetResponse();
+ Attempt_->Transaction->Commit();
+ Attempt_.reset();
+ });
+}
+
+void THeavyRequestRetrier::Retry(const std::function<void()> &function)
+{
+ while (true) {
+ try {
+ if (!Attempt_) {
+ TryStartAttempt();
+ }
+ function();
+ return;
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("RSP %v - attempt %v failed",
+ Attempt_->RequestId,
+ RequestRetryPolicy_->GetAttemptDescription());
+ Attempt_.reset();
+
+ TMaybe<TDuration> backoffDuration;
+ if (const auto *errorResponse = dynamic_cast<const TErrorResponse *>(&ex)) {
+ if (!IsRetriable(*errorResponse)) {
+ throw;
+ }
+ backoffDuration = RequestRetryPolicy_->OnRetriableError(*errorResponse);
+ } else {
+ if (!IsRetriable(ex)) {
+ throw;
+ }
+ backoffDuration = RequestRetryPolicy_->OnGenericError(ex);
+ }
+
+ if (!backoffDuration) {
+ throw;
+ }
+ NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
+ }
+ }
+}
+
+void THeavyRequestRetrier::TryStartAttempt()
+{
+ Attempt_ = std::make_unique<TAttempt>();
+ Attempt_->Transaction = std::make_unique<TPingableTransaction>(
+ Parameters_.ClientRetryPolicy, Parameters_.Context,
+ Parameters_.TransactionId,
+ Parameters_.TransactionPinger->GetChildTxPinger(),
+ TStartTransactionOptions());
+
+ auto header = Parameters_.Header;
+ if (Parameters_.Context.ServiceTicketAuth) {
+ header.SetServiceTicket(Parameters_.Context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(Parameters_.Context.Token);
+ }
+
+ if (Parameters_.Context.ImpersonationUser) {
+ header.SetImpersonationUser(*Parameters_.Context.ImpersonationUser);
+ }
+ auto hostName = GetProxyForHeavyRequest(Parameters_.Context);
+ Attempt_->RequestId = CreateGuidAsString();
+
+ UpdateHeaderForProxyIfNeed(hostName, Parameters_.Context, header);
+
+ header.AddTransactionId(Attempt_->Transaction->GetId(), /* overwrite = */ true);
+ header.SetRequestCompression(ToString(Parameters_.Context.Config->ContentEncoding));
+
+ Attempt_->Request = Parameters_.Context.HttpClient->StartRequest(
+ GetFullUrlForProxy(hostName, Parameters_.Context, header),
+ Attempt_->RequestId, header);
+
+ auto stream = StreamFactory_();
+ stream->ReadAll(*Attempt_->Request->GetStream());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT