summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/retryful_writer.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2025-04-23 15:43:56 +0300
committerhiddenpath <[email protected]>2025-04-23 15:59:27 +0300
commit7c0632d935742fed09b7e3c49c5677e9bc3320b3 (patch)
tree1be8b66161d4ff4723749600972ff76250d4c150 /yt/cpp/mapreduce/client/retryful_writer.cpp
parentc222f6f103376934dd705e16bf9a3bad66e44365 (diff)
YT-23616: Split TRetryfulWriter implementation into rpc and http
commit_hash:2b01b1b2387f71903fc29f3364d1e5f46f35a703
Diffstat (limited to 'yt/cpp/mapreduce/client/retryful_writer.cpp')
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.cpp30
1 files changed, 16 insertions, 14 deletions
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp
index f002adaed26..d68f8f4d3d3 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer.cpp
@@ -10,6 +10,7 @@
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/http_client/raw_client.h>
+#include <yt/cpp/mapreduce/http_client/raw_requests.h>
#include <util/generic/size_literals.h>
@@ -99,16 +100,8 @@ void TRetryfulWriter::FlushBuffer(bool lastBlock)
void TRetryfulWriter::Send(const TBuffer& buffer)
{
- THttpHeader header("PUT", Command_);
- header.SetInputFormat(Format_);
- header.MergeParameters(Parameters_);
- header.SetRequestCompression(ToString(Context_.Config->ContentEncoding));
-
auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_);
- NDetail::TRequestConfig config;
- config.IsHeavy = true;
-
NDetail::RequestWithRetry<void>(
CreateDefaultRequestRetryPolicy(Context_.Config),
[&](TMutationId&) {
@@ -116,17 +109,26 @@ void TRetryfulWriter::Send(const TBuffer& buffer)
RawClient_, ClientRetryPolicy_, Context_,
transactionId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
- auto input = std::make_unique<TBufferInput>(buffer);
- header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true);
+ std::unique_ptr<IOutputStream> stream;
+ std::visit([this, &attemptTx, &stream] (const auto& options) -> void {
+ using TType = std::decay_t<decltype(options)>;
+ if constexpr (std::is_same_v<TType, TFileWriterOptions>) {
+ stream = NDetail::NRawClient::WriteFile(Context_, attemptTx.GetId(), Path_, options);
+ } else if constexpr (std::is_same_v<TType, TTableWriterOptions>) {
+ stream = NDetail::NRawClient::WriteTable(Context_, attemptTx.GetId(), Path_, Format_, options);
+ } else {
+ static_assert(TDependentFalse<TType>);
+ }
+ }, Options_);
- auto request = NDetail::StartRequestWithoutRetry(Context_, header, config);
- TransferData(input.get(), request->GetStream());
- request->Finish()->GetResponse();
+ auto input = std::make_unique<TBufferInput>(buffer);
+ TransferData(input.get(), stream.get());
+ stream->Finish();
attemptTx.Commit();
});
- Parameters_ = SecondaryParameters_; // all blocks except the first one are appended
+ Path_ = SecondaryPath_; // all blocks except the first one are appended
}
void TRetryfulWriter::SendThread()