summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2025-04-23 15:43:56 +0300
committerhiddenpath <[email protected]>2025-04-23 15:59:27 +0300
commit7c0632d935742fed09b7e3c49c5677e9bc3320b3 (patch)
tree1be8b66161d4ff4723749600972ff76250d4c150
parentc222f6f103376934dd705e16bf9a3bad66e44365 (diff)
YT-23616: Split TRetryfulWriter implementation into rpc and http
commit_hash:2b01b1b2387f71903fc29f3364d1e5f46f35a703
-rw-r--r--yt/cpp/mapreduce/client/client.cpp1
-rw-r--r--yt/cpp/mapreduce/client/client_writer.cpp1
-rw-r--r--yt/cpp/mapreduce/client/file_writer.cpp1
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.cpp30
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.h25
-rw-r--r--yt/cpp/mapreduce/http_client/raw_requests.cpp52
-rw-r--r--yt/cpp/mapreduce/http_client/raw_requests.h6
7 files changed, 82 insertions, 34 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 48ad4a352ea..8977a29b106 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -440,7 +440,6 @@ TRawTableWriterPtr TClientBase::CreateRawWriter(
GetTransactionPinger(),
Context_,
TransactionId_,
- GetWriteTableCommand(Context_.Config->ApiVersion),
format,
CanonizeYPath(path),
options).Get();
diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp
index 8a4421c1c77..6622a40a59e 100644
--- a/yt/cpp/mapreduce/client/client_writer.cpp
+++ b/yt/cpp/mapreduce/client/client_writer.cpp
@@ -57,7 +57,6 @@ TClientWriter::TClientWriter(
std::move(transactionPinger),
context,
transactionId,
- GetWriteTableCommand(context.Config->ApiVersion),
format,
path,
options));
diff --git a/yt/cpp/mapreduce/client/file_writer.cpp b/yt/cpp/mapreduce/client/file_writer.cpp
index aa0d7f8e146..5a8b3f3409c 100644
--- a/yt/cpp/mapreduce/client/file_writer.cpp
+++ b/yt/cpp/mapreduce/client/file_writer.cpp
@@ -25,7 +25,6 @@ TFileWriter::TFileWriter(
std::move(transactionPinger),
context,
transactionId,
- GetWriteFileCommand(context.Config->ApiVersion),
TMaybe<TFormat>(),
path,
options)
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp
index f002adaed26..d68f8f4d3d3 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer.cpp
@@ -10,6 +10,7 @@
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/http_client/raw_client.h>
+#include <yt/cpp/mapreduce/http_client/raw_requests.h>
#include <util/generic/size_literals.h>
@@ -99,16 +100,8 @@ void TRetryfulWriter::FlushBuffer(bool lastBlock)
void TRetryfulWriter::Send(const TBuffer& buffer)
{
- THttpHeader header("PUT", Command_);
- header.SetInputFormat(Format_);
- header.MergeParameters(Parameters_);
- header.SetRequestCompression(ToString(Context_.Config->ContentEncoding));
-
auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_);
- NDetail::TRequestConfig config;
- config.IsHeavy = true;
-
NDetail::RequestWithRetry<void>(
CreateDefaultRequestRetryPolicy(Context_.Config),
[&](TMutationId&) {
@@ -116,17 +109,26 @@ void TRetryfulWriter::Send(const TBuffer& buffer)
RawClient_, ClientRetryPolicy_, Context_,
transactionId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
- auto input = std::make_unique<TBufferInput>(buffer);
- header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true);
+ std::unique_ptr<IOutputStream> stream;
+ std::visit([this, &attemptTx, &stream] (const auto& options) -> void {
+ using TType = std::decay_t<decltype(options)>;
+ if constexpr (std::is_same_v<TType, TFileWriterOptions>) {
+ stream = NDetail::NRawClient::WriteFile(Context_, attemptTx.GetId(), Path_, options);
+ } else if constexpr (std::is_same_v<TType, TTableWriterOptions>) {
+ stream = NDetail::NRawClient::WriteTable(Context_, attemptTx.GetId(), Path_, Format_, options);
+ } else {
+ static_assert(TDependentFalse<TType>);
+ }
+ }, Options_);
- auto request = NDetail::StartRequestWithoutRetry(Context_, header, config);
- TransferData(input.get(), request->GetStream());
- request->Finish()->GetResponse();
+ auto input = std::make_unique<TBufferInput>(buffer);
+ TransferData(input.get(), stream.get());
+ stream->Finish();
attemptTx.Commit();
});
- Parameters_ = SecondaryParameters_; // all blocks except the first one are appended
+ Path_ = SecondaryPath_; // all blocks except the first one are appended
}
void TRetryfulWriter::SendThread()
diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h
index b0fcb13d3a6..9aa7f1a6040 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.h
+++ b/yt/cpp/mapreduce/client/retryful_writer.h
@@ -36,7 +36,6 @@ public:
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& parentId,
- const TString& command,
const TMaybe<TFormat>& format,
const TRichYPath& path,
const TWriterOptions& options)
@@ -45,9 +44,10 @@ public:
, TransactionPinger_(std::move(transactionPinger))
, Context_(context)
, AutoFinish_(options.AutoFinish_)
- , Command_(command)
+ , Options_(options)
, Format_(format)
, BufferSize_(GetBufferSize(options.WriterOptions_))
+ , Path_(path)
, ParentTransactionId_(parentId)
, WriteTransaction_()
, FilledBuffers_(2)
@@ -55,15 +55,12 @@ public:
, Buffer_(BufferSize_ * 2)
, Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer"))
{
- Parameters_ = FormIORequestParameters(path, options);
-
- auto secondaryPath = path;
- secondaryPath.Append_ = true;
- secondaryPath.Schema_.Clear();
- secondaryPath.CompressionCodec_.Clear();
- secondaryPath.ErasureCodec_.Clear();
- secondaryPath.OptimizeFor_.Clear();
- SecondaryParameters_ = FormIORequestParameters(secondaryPath, options);
+ SecondaryPath_ = path;
+ SecondaryPath_.Append_ = true;
+ SecondaryPath_.Schema_.Clear();
+ SecondaryPath_.CompressionCodec_.Clear();
+ SecondaryPath_.ErasureCodec_.Clear();
+ SecondaryPath_.OptimizeFor_.Clear();
if (options.CreateTransaction_) {
WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
@@ -103,14 +100,14 @@ private:
const ITransactionPingerPtr TransactionPinger_;
const TClientContext Context_;
const bool AutoFinish_;
+ std::variant<TTableWriterOptions, TFileWriterOptions> Options_;
- TString Command_;
TMaybe<TFormat> Format_;
const size_t BufferSize_;
- TNode Parameters_;
- TNode SecondaryParameters_;
+ TRichYPath Path_;
+ TRichYPath SecondaryPath_;
TTransactionId ParentTransactionId_;
TMaybe<TPingableTransaction> WriteTransaction_;
diff --git a/yt/cpp/mapreduce/http_client/raw_requests.cpp b/yt/cpp/mapreduce/http_client/raw_requests.cpp
index c07c5ac1ddd..d4bfaa7944d 100644
--- a/yt/cpp/mapreduce/http_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/http_client/raw_requests.cpp
@@ -315,7 +315,33 @@ struct THttpRequestStream
: public IOutputStream
{
public:
- THttpRequestStream(NHttpClient::IHttpRequestPtr request, size_t bufferSize)
+ THttpRequestStream(NHttpClient::IHttpRequestPtr request)
+ : Request_(std::move(request))
+ , Underlying_(Request_->GetStream())
+ { }
+
+private:
+ void DoWrite(const void* buf, size_t len) override
+ {
+ Underlying_->Write(buf, len);
+ }
+
+ void DoFinish() override
+ {
+ Underlying_->Finish();
+ Request_->Finish()->GetResponse();
+ }
+
+private:
+ NHttpClient::IHttpRequestPtr Request_;
+ IOutputStream* Underlying_;
+};
+
+struct THttpBufferedRequestStream
+ : public IOutputStream
+{
+public:
+ THttpBufferedRequestStream(NHttpClient::IHttpRequestPtr request, size_t bufferSize)
: Request_(std::move(request))
, Underlying_(std::make_unique<TBufferedOutput>(Request_->GetStream(), bufferSize))
{ }
@@ -334,7 +360,7 @@ private:
private:
NHttpClient::IHttpRequestPtr Request_;
- std::unique_ptr<TBufferedOutput> Underlying_;
+ std::unique_ptr<IOutputStream> Underlying_;
};
std::unique_ptr<IOutputStream> WriteTable(
@@ -353,7 +379,27 @@ std::unique_ptr<IOutputStream> WriteTable(
TRequestConfig config;
config.IsHeavy = true;
auto request = StartRequestWithoutRetry(context, header, config);
- return std::make_unique<THttpRequestStream>(std::move(request), options.BufferSize_);
+ if (options.SingleHttpRequest_) {
+ return std::make_unique<THttpBufferedRequestStream>(std::move(request), options.BufferSize_);
+ }
+ return std::make_unique<THttpRequestStream>(std::move(request));
+}
+
+std::unique_ptr<IOutputStream> WriteFile(
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TFileWriterOptions& options)
+{
+ THttpHeader header("PUT", GetWriteFileCommand(context.Config->ApiVersion));
+ header.AddTransactionId(transactionId);
+ header.SetRequestCompression(ToString(context.Config->ContentEncoding));
+ header.MergeParameters(FormIORequestParameters(path, options));
+
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto request = StartRequestWithoutRetry(context, header, config);
+ return std::make_unique<THttpRequestStream>(std::move(request));
}
TAuthorizationInfo WhoAmI(const TClientContext& context)
diff --git a/yt/cpp/mapreduce/http_client/raw_requests.h b/yt/cpp/mapreduce/http_client/raw_requests.h
index 63f39f62bba..d7e1c1eebc2 100644
--- a/yt/cpp/mapreduce/http_client/raw_requests.h
+++ b/yt/cpp/mapreduce/http_client/raw_requests.h
@@ -55,6 +55,12 @@ std::unique_ptr<IOutputStream> WriteTable(
const TMaybe<TFormat>& format,
const TTableWriterOptions& options);
+std::unique_ptr<IOutputStream> WriteFile(
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TFileWriterOptions& options);
+
TAuthorizationInfo WhoAmI(const TClientContext& context);
////////////////////////////////////////////////////////////////////////////////