diff options
| author | Alexander Smirnov <[email protected]> | 2025-04-24 00:52:06 +0000 |
|---|---|---|
| committer | Alexander Smirnov <[email protected]> | 2025-04-24 00:52:06 +0000 |
| commit | 514f9b2af9d5384875cf0dbf50b64cdb84fffc84 (patch) | |
| tree | d939c6b413c0aad4c9ca53f58b93056a60420e5d /yt/cpp/mapreduce/client | |
| parent | 0ab1114c86c011bbb7bda529bdf1885732d839f9 (diff) | |
| parent | cecbcb67c89297b8e37981073b0fe887418b0993 (diff) | |
Merge branch 'rightlib' into merge-libs-250424-0050
Diffstat (limited to 'yt/cpp/mapreduce/client')
| -rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 1 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/client_writer.cpp | 1 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/file_writer.cpp | 1 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.cpp | 30 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.h | 25 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/transaction_pinger.cpp | 2 |
6 files changed, 28 insertions, 32 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 48ad4a352ea..8977a29b106 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -440,7 +440,6 @@ TRawTableWriterPtr TClientBase::CreateRawWriter( GetTransactionPinger(), Context_, TransactionId_, - GetWriteTableCommand(Context_.Config->ApiVersion), format, CanonizeYPath(path), options).Get(); diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp index 8a4421c1c77..6622a40a59e 100644 --- a/yt/cpp/mapreduce/client/client_writer.cpp +++ b/yt/cpp/mapreduce/client/client_writer.cpp @@ -57,7 +57,6 @@ TClientWriter::TClientWriter( std::move(transactionPinger), context, transactionId, - GetWriteTableCommand(context.Config->ApiVersion), format, path, options)); diff --git a/yt/cpp/mapreduce/client/file_writer.cpp b/yt/cpp/mapreduce/client/file_writer.cpp index aa0d7f8e146..5a8b3f3409c 100644 --- a/yt/cpp/mapreduce/client/file_writer.cpp +++ b/yt/cpp/mapreduce/client/file_writer.cpp @@ -25,7 +25,6 @@ TFileWriter::TFileWriter( std::move(transactionPinger), context, transactionId, - GetWriteFileCommand(context.Config->ApiVersion), TMaybe<TFormat>(), path, options) diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index f002adaed26..d68f8f4d3d3 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/cpp/mapreduce/http_client/raw_client.h> +#include <yt/cpp/mapreduce/http_client/raw_requests.h> #include <util/generic/size_literals.h> @@ -99,16 +100,8 @@ void TRetryfulWriter::FlushBuffer(bool lastBlock) void TRetryfulWriter::Send(const TBuffer& buffer) { - THttpHeader header("PUT", Command_); - header.SetInputFormat(Format_); - header.MergeParameters(Parameters_); - header.SetRequestCompression(ToString(Context_.Config->ContentEncoding)); - auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_); - NDetail::TRequestConfig config; - config.IsHeavy = true; - NDetail::RequestWithRetry<void>( CreateDefaultRequestRetryPolicy(Context_.Config), [&](TMutationId&) { @@ -116,17 +109,26 @@ void TRetryfulWriter::Send(const TBuffer& buffer) RawClient_, ClientRetryPolicy_, Context_, transactionId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); - auto input = std::make_unique<TBufferInput>(buffer); - header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true); + std::unique_ptr<IOutputStream> stream; + std::visit([this, &attemptTx, &stream] (const auto& options) -> void { + using TType = std::decay_t<decltype(options)>; + if constexpr (std::is_same_v<TType, TFileWriterOptions>) { + stream = NDetail::NRawClient::WriteFile(Context_, attemptTx.GetId(), Path_, options); + } else if constexpr (std::is_same_v<TType, TTableWriterOptions>) { + stream = NDetail::NRawClient::WriteTable(Context_, attemptTx.GetId(), Path_, Format_, options); + } else { + static_assert(TDependentFalse<TType>); + } + }, Options_); - auto request = NDetail::StartRequestWithoutRetry(Context_, header, config); - TransferData(input.get(), request->GetStream()); - request->Finish()->GetResponse(); + auto input = std::make_unique<TBufferInput>(buffer); + TransferData(input.get(), stream.get()); + stream->Finish(); attemptTx.Commit(); }); - Parameters_ = SecondaryParameters_; // all blocks except the first one are appended + Path_ = SecondaryPath_; // all blocks except the first one are appended } void TRetryfulWriter::SendThread() diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h index b0fcb13d3a6..9aa7f1a6040 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.h +++ b/yt/cpp/mapreduce/client/retryful_writer.h @@ -36,7 +36,6 @@ public: ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& parentId, - const TString& command, const TMaybe<TFormat>& format, const TRichYPath& path, const TWriterOptions& options) @@ -45,9 +44,10 @@ public: , TransactionPinger_(std::move(transactionPinger)) , Context_(context) , AutoFinish_(options.AutoFinish_) - , Command_(command) + , Options_(options) , Format_(format) , BufferSize_(GetBufferSize(options.WriterOptions_)) + , Path_(path) , ParentTransactionId_(parentId) , WriteTransaction_() , FilledBuffers_(2) @@ -55,15 +55,12 @@ public: , Buffer_(BufferSize_ * 2) , Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer")) { - Parameters_ = FormIORequestParameters(path, options); - - auto secondaryPath = path; - secondaryPath.Append_ = true; - secondaryPath.Schema_.Clear(); - secondaryPath.CompressionCodec_.Clear(); - secondaryPath.ErasureCodec_.Clear(); - secondaryPath.OptimizeFor_.Clear(); - SecondaryParameters_ = FormIORequestParameters(secondaryPath, options); + SecondaryPath_ = path; + SecondaryPath_.Append_ = true; + SecondaryPath_.Schema_.Clear(); + SecondaryPath_.CompressionCodec_.Clear(); + SecondaryPath_.ErasureCodec_.Clear(); + SecondaryPath_.OptimizeFor_.Clear(); if (options.CreateTransaction_) { WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); @@ -103,14 +100,14 @@ private: const ITransactionPingerPtr TransactionPinger_; const TClientContext Context_; const bool AutoFinish_; + std::variant<TTableWriterOptions, TFileWriterOptions> Options_; - TString Command_; TMaybe<TFormat> Format_; const size_t BufferSize_; - TNode Parameters_; - TNode SecondaryParameters_; + TRichYPath Path_; + TRichYPath SecondaryPath_; TTransactionId ParentTransactionId_; TMaybe<TPingableTransaction> WriteTransaction_; diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp index d393e50a5d8..e65c09fa02a 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.cpp +++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp @@ -69,7 +69,7 @@ void PingTx(NHttp::IClientPtr httpClient, const TPingableTransaction& tx) if (const auto& serviceTicketAuth = tx.GetContext().ServiceTicketAuth) { const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket(); - headers->Add("X-Ya-Service-Ticket", serviceTicket); + headers->Add("X-Ya-Service-Ticket", TString(serviceTicket)); } else if (const auto& token = tx.GetContext().Token; !token.empty()) { headers->Add("Authorization", "OAuth " + token); } |
