diff options
author | ermolovd <ermolovd@yandex-team.com> | 2023-11-28 09:30:16 +0300 |
---|---|---|
committer | ermolovd <ermolovd@yandex-team.com> | 2023-11-28 09:59:46 +0300 |
commit | 5e58015f2e6fddae74e51d9de0870e5f7119cc12 (patch) | |
tree | 0f686ad847d54a66716d717404a1d615139ab22d /yt/cpp/mapreduce/client/retry_heavy_write_request.cpp | |
parent | 86cac93557a380155b8a87abd5802e5d40c027bf (diff) | |
download | ydb-5e58015f2e6fddae74e51d9de0870e5f7119cc12.tar.gz |
YT-19269: table writer implementation that doesn't wait for complete buffer before sending to network
Diffstat (limited to 'yt/cpp/mapreduce/client/retry_heavy_write_request.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/retry_heavy_write_request.cpp | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp index 54a2be02e3..253d7c0d44 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp @@ -16,6 +16,8 @@ #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> +#include <util/stream/null.h> + namespace NYT { using ::ToString; @@ -93,4 +95,109 @@ void RetryHeavyWriteRequest( //////////////////////////////////////////////////////////////////////////////// +THeavyRequestRetrier::THeavyRequestRetrier(TParameters parameters) + : Parameters_(std::move(parameters)) + , RequestRetryPolicy_(Parameters_.ClientRetryPolicy->CreatePolicyForGenericRequest()) + , StreamFactory_([] { + return MakeHolder<TNullInput>(); + }) +{ + Retry([] { }); +} + +THeavyRequestRetrier::~THeavyRequestRetrier() = default; + +void THeavyRequestRetrier::Update(THeavyRequestRetrier::TStreamFactory streamFactory) +{ + StreamFactory_ = streamFactory; + Retry([this] { + auto stream = StreamFactory_(); + stream->Skip(Attempt_->Offset); + auto transfered = stream->ReadAll(*Attempt_->Request->GetStream()); + Attempt_->Offset += transfered; + }); +} + +void THeavyRequestRetrier::Finish() +{ + Retry([this] { + Attempt_->Request->Finish()->GetResponse(); + Attempt_->Transaction->Commit(); + Attempt_.reset(); + }); +} + +void THeavyRequestRetrier::Retry(const std::function<void()> &function) +{ + while (true) { + try { + if (!Attempt_) { + TryStartAttempt(); + } + function(); + return; + } catch (const std::exception& ex) { + YT_LOG_ERROR("RSP %v - attempt %v failed", + Attempt_->RequestId, + RequestRetryPolicy_->GetAttemptDescription()); + Attempt_.reset(); + + TMaybe<TDuration> backoffDuration; + if (const auto *errorResponse = dynamic_cast<const TErrorResponse *>(&ex)) { + if (!IsRetriable(*errorResponse)) { + throw; + } + backoffDuration = RequestRetryPolicy_->OnRetriableError(*errorResponse); + } else { + if (!IsRetriable(ex)) { + throw; + } + backoffDuration = RequestRetryPolicy_->OnGenericError(ex); + } + + if (!backoffDuration) { + throw; + } + NDetail::TWaitProxy::Get()->Sleep(*backoffDuration); + } + } +} + +void THeavyRequestRetrier::TryStartAttempt() +{ + Attempt_ = std::make_unique<TAttempt>(); + Attempt_->Transaction = std::make_unique<TPingableTransaction>( + Parameters_.ClientRetryPolicy, Parameters_.Context, + Parameters_.TransactionId, + Parameters_.TransactionPinger->GetChildTxPinger(), + TStartTransactionOptions()); + + auto header = Parameters_.Header; + if (Parameters_.Context.ServiceTicketAuth) { + header.SetServiceTicket(Parameters_.Context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(Parameters_.Context.Token); + } + + if (Parameters_.Context.ImpersonationUser) { + header.SetImpersonationUser(*Parameters_.Context.ImpersonationUser); + } + auto hostName = GetProxyForHeavyRequest(Parameters_.Context); + Attempt_->RequestId = CreateGuidAsString(); + + UpdateHeaderForProxyIfNeed(hostName, Parameters_.Context, header); + + header.AddTransactionId(Attempt_->Transaction->GetId(), /* overwrite = */ true); + header.SetRequestCompression(ToString(Parameters_.Context.Config->ContentEncoding)); + + Attempt_->Request = Parameters_.Context.HttpClient->StartRequest( + GetFullUrlForProxy(hostName, Parameters_.Context, header), + Attempt_->RequestId, header); + + auto stream = StreamFactory_(); + stream->ReadAll(*Attempt_->Request->GetStream()); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT |