diff options
author | hiddenpath <[email protected]> | 2025-04-23 15:43:56 +0300 |
---|---|---|
committer | hiddenpath <[email protected]> | 2025-04-23 15:59:27 +0300 |
commit | 7c0632d935742fed09b7e3c49c5677e9bc3320b3 (patch) | |
tree | 1be8b66161d4ff4723749600972ff76250d4c150 | |
parent | c222f6f103376934dd705e16bf9a3bad66e44365 (diff) |
YT-23616: Split TRetryfulWriter implementation into rpc and http
commit_hash:2b01b1b2387f71903fc29f3364d1e5f46f35a703
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_writer.cpp | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/file_writer.cpp | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.cpp | 30 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.h | 25 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http_client/raw_requests.cpp | 52 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http_client/raw_requests.h | 6 |
7 files changed, 82 insertions, 34 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 48ad4a352ea..8977a29b106 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -440,7 +440,6 @@ TRawTableWriterPtr TClientBase::CreateRawWriter( GetTransactionPinger(), Context_, TransactionId_, - GetWriteTableCommand(Context_.Config->ApiVersion), format, CanonizeYPath(path), options).Get(); diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp index 8a4421c1c77..6622a40a59e 100644 --- a/yt/cpp/mapreduce/client/client_writer.cpp +++ b/yt/cpp/mapreduce/client/client_writer.cpp @@ -57,7 +57,6 @@ TClientWriter::TClientWriter( std::move(transactionPinger), context, transactionId, - GetWriteTableCommand(context.Config->ApiVersion), format, path, options)); diff --git a/yt/cpp/mapreduce/client/file_writer.cpp b/yt/cpp/mapreduce/client/file_writer.cpp index aa0d7f8e146..5a8b3f3409c 100644 --- a/yt/cpp/mapreduce/client/file_writer.cpp +++ b/yt/cpp/mapreduce/client/file_writer.cpp @@ -25,7 +25,6 @@ TFileWriter::TFileWriter( std::move(transactionPinger), context, transactionId, - GetWriteFileCommand(context.Config->ApiVersion), TMaybe<TFormat>(), path, options) diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index f002adaed26..d68f8f4d3d3 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/cpp/mapreduce/http_client/raw_client.h> +#include <yt/cpp/mapreduce/http_client/raw_requests.h> #include <util/generic/size_literals.h> @@ -99,16 +100,8 @@ void TRetryfulWriter::FlushBuffer(bool lastBlock) void TRetryfulWriter::Send(const TBuffer& buffer) { - THttpHeader header("PUT", Command_); - header.SetInputFormat(Format_); - header.MergeParameters(Parameters_); - header.SetRequestCompression(ToString(Context_.Config->ContentEncoding)); - auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_); - NDetail::TRequestConfig config; - config.IsHeavy = true; - NDetail::RequestWithRetry<void>( CreateDefaultRequestRetryPolicy(Context_.Config), [&](TMutationId&) { @@ -116,17 +109,26 @@ void TRetryfulWriter::Send(const TBuffer& buffer) RawClient_, ClientRetryPolicy_, Context_, transactionId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); - auto input = std::make_unique<TBufferInput>(buffer); - header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true); + std::unique_ptr<IOutputStream> stream; + std::visit([this, &attemptTx, &stream] (const auto& options) -> void { + using TType = std::decay_t<decltype(options)>; + if constexpr (std::is_same_v<TType, TFileWriterOptions>) { + stream = NDetail::NRawClient::WriteFile(Context_, attemptTx.GetId(), Path_, options); + } else if constexpr (std::is_same_v<TType, TTableWriterOptions>) { + stream = NDetail::NRawClient::WriteTable(Context_, attemptTx.GetId(), Path_, Format_, options); + } else { + static_assert(TDependentFalse<TType>); + } + }, Options_); - auto request = NDetail::StartRequestWithoutRetry(Context_, header, config); - TransferData(input.get(), request->GetStream()); - request->Finish()->GetResponse(); + auto input = std::make_unique<TBufferInput>(buffer); + TransferData(input.get(), stream.get()); + stream->Finish(); attemptTx.Commit(); }); - Parameters_ = SecondaryParameters_; // all blocks except the first one are appended + Path_ = SecondaryPath_; // all blocks except the first one are appended } void TRetryfulWriter::SendThread() diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h index b0fcb13d3a6..9aa7f1a6040 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.h +++ b/yt/cpp/mapreduce/client/retryful_writer.h @@ -36,7 +36,6 @@ public: ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& parentId, - const TString& command, const TMaybe<TFormat>& format, const TRichYPath& path, const TWriterOptions& options) @@ -45,9 +44,10 @@ public: , TransactionPinger_(std::move(transactionPinger)) , Context_(context) , AutoFinish_(options.AutoFinish_) - , Command_(command) + , Options_(options) , Format_(format) , BufferSize_(GetBufferSize(options.WriterOptions_)) + , Path_(path) , ParentTransactionId_(parentId) , WriteTransaction_() , FilledBuffers_(2) @@ -55,15 +55,12 @@ public: , Buffer_(BufferSize_ * 2) , Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer")) { - Parameters_ = FormIORequestParameters(path, options); - - auto secondaryPath = path; - secondaryPath.Append_ = true; - secondaryPath.Schema_.Clear(); - secondaryPath.CompressionCodec_.Clear(); - secondaryPath.ErasureCodec_.Clear(); - secondaryPath.OptimizeFor_.Clear(); - SecondaryParameters_ = FormIORequestParameters(secondaryPath, options); + SecondaryPath_ = path; + SecondaryPath_.Append_ = true; + SecondaryPath_.Schema_.Clear(); + SecondaryPath_.CompressionCodec_.Clear(); + SecondaryPath_.ErasureCodec_.Clear(); + SecondaryPath_.OptimizeFor_.Clear(); if (options.CreateTransaction_) { WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); @@ -103,14 +100,14 @@ private: const ITransactionPingerPtr TransactionPinger_; const TClientContext Context_; const bool AutoFinish_; + std::variant<TTableWriterOptions, TFileWriterOptions> Options_; - TString Command_; TMaybe<TFormat> Format_; const size_t BufferSize_; - TNode Parameters_; - TNode SecondaryParameters_; + TRichYPath Path_; + TRichYPath SecondaryPath_; TTransactionId ParentTransactionId_; TMaybe<TPingableTransaction> WriteTransaction_; diff --git a/yt/cpp/mapreduce/http_client/raw_requests.cpp b/yt/cpp/mapreduce/http_client/raw_requests.cpp index c07c5ac1ddd..d4bfaa7944d 100644 --- a/yt/cpp/mapreduce/http_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/http_client/raw_requests.cpp @@ -315,7 +315,33 @@ struct THttpRequestStream : public IOutputStream { public: - THttpRequestStream(NHttpClient::IHttpRequestPtr request, size_t bufferSize) + THttpRequestStream(NHttpClient::IHttpRequestPtr request) + : Request_(std::move(request)) + , Underlying_(Request_->GetStream()) + { } + +private: + void DoWrite(const void* buf, size_t len) override + { + Underlying_->Write(buf, len); + } + + void DoFinish() override + { + Underlying_->Finish(); + Request_->Finish()->GetResponse(); + } + +private: + NHttpClient::IHttpRequestPtr Request_; + IOutputStream* Underlying_; +}; + +struct THttpBufferedRequestStream + : public IOutputStream +{ +public: + THttpBufferedRequestStream(NHttpClient::IHttpRequestPtr request, size_t bufferSize) : Request_(std::move(request)) , Underlying_(std::make_unique<TBufferedOutput>(Request_->GetStream(), bufferSize)) { } @@ -334,7 +360,7 @@ private: private: NHttpClient::IHttpRequestPtr Request_; - std::unique_ptr<TBufferedOutput> Underlying_; + std::unique_ptr<IOutputStream> Underlying_; }; std::unique_ptr<IOutputStream> WriteTable( @@ -353,7 +379,27 @@ std::unique_ptr<IOutputStream> WriteTable( TRequestConfig config; config.IsHeavy = true; auto request = StartRequestWithoutRetry(context, header, config); - return std::make_unique<THttpRequestStream>(std::move(request), options.BufferSize_); + if (options.SingleHttpRequest_) { + return std::make_unique<THttpBufferedRequestStream>(std::move(request), options.BufferSize_); + } + return std::make_unique<THttpRequestStream>(std::move(request)); +} + +std::unique_ptr<IOutputStream> WriteFile( + const TClientContext& context, + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileWriterOptions& options) +{ + THttpHeader header("PUT", GetWriteFileCommand(context.Config->ApiVersion)); + header.AddTransactionId(transactionId); + header.SetRequestCompression(ToString(context.Config->ContentEncoding)); + header.MergeParameters(FormIORequestParameters(path, options)); + + TRequestConfig config; + config.IsHeavy = true; + auto request = StartRequestWithoutRetry(context, header, config); + return std::make_unique<THttpRequestStream>(std::move(request)); } TAuthorizationInfo WhoAmI(const TClientContext& context) diff --git a/yt/cpp/mapreduce/http_client/raw_requests.h b/yt/cpp/mapreduce/http_client/raw_requests.h index 63f39f62bba..d7e1c1eebc2 100644 --- a/yt/cpp/mapreduce/http_client/raw_requests.h +++ b/yt/cpp/mapreduce/http_client/raw_requests.h @@ -55,6 +55,12 @@ std::unique_ptr<IOutputStream> WriteTable( const TMaybe<TFormat>& format, const TTableWriterOptions& options); +std::unique_ptr<IOutputStream> WriteFile( + const TClientContext& context, + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileWriterOptions& options); + TAuthorizationInfo WhoAmI(const TClientContext& context); //////////////////////////////////////////////////////////////////////////////// |