diff options
author | hiddenpath <[email protected]> | 2025-04-23 15:43:56 +0300 |
---|---|---|
committer | hiddenpath <[email protected]> | 2025-04-23 15:59:27 +0300 |
commit | 7c0632d935742fed09b7e3c49c5677e9bc3320b3 (patch) | |
tree | 1be8b66161d4ff4723749600972ff76250d4c150 /yt/cpp/mapreduce/client/retryful_writer.cpp | |
parent | c222f6f103376934dd705e16bf9a3bad66e44365 (diff) |
YT-23616: Split TRetryfulWriter implementation into rpc and http
commit_hash:2b01b1b2387f71903fc29f3364d1e5f46f35a703
Diffstat (limited to 'yt/cpp/mapreduce/client/retryful_writer.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.cpp | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index f002adaed26..d68f8f4d3d3 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/cpp/mapreduce/http_client/raw_client.h> +#include <yt/cpp/mapreduce/http_client/raw_requests.h> #include <util/generic/size_literals.h> @@ -99,16 +100,8 @@ void TRetryfulWriter::FlushBuffer(bool lastBlock) void TRetryfulWriter::Send(const TBuffer& buffer) { - THttpHeader header("PUT", Command_); - header.SetInputFormat(Format_); - header.MergeParameters(Parameters_); - header.SetRequestCompression(ToString(Context_.Config->ContentEncoding)); - auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_); - NDetail::TRequestConfig config; - config.IsHeavy = true; - NDetail::RequestWithRetry<void>( CreateDefaultRequestRetryPolicy(Context_.Config), [&](TMutationId&) { @@ -116,17 +109,26 @@ void TRetryfulWriter::Send(const TBuffer& buffer) RawClient_, ClientRetryPolicy_, Context_, transactionId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); - auto input = std::make_unique<TBufferInput>(buffer); - header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true); + std::unique_ptr<IOutputStream> stream; + std::visit([this, &attemptTx, &stream] (const auto& options) -> void { + using TType = std::decay_t<decltype(options)>; + if constexpr (std::is_same_v<TType, TFileWriterOptions>) { + stream = NDetail::NRawClient::WriteFile(Context_, attemptTx.GetId(), Path_, options); + } else if constexpr (std::is_same_v<TType, TTableWriterOptions>) { + stream = NDetail::NRawClient::WriteTable(Context_, attemptTx.GetId(), Path_, Format_, options); + } else { + static_assert(TDependentFalse<TType>); + } + }, Options_); - auto request = NDetail::StartRequestWithoutRetry(Context_, header, config); - TransferData(input.get(), request->GetStream()); - request->Finish()->GetResponse(); + auto input = std::make_unique<TBufferInput>(buffer); + TransferData(input.get(), stream.get()); + stream->Finish(); attemptTx.Commit(); }); - Parameters_ = SecondaryParameters_; // all blocks except the first one are appended + Path_ = SecondaryPath_; // all blocks except the first one are appended } void TRetryfulWriter::SendThread() |