From 72ca53e2f03266ae5ad40244d3a47ff76e9f9aeb Mon Sep 17 00:00:00 2001 From: babenko Date: Wed, 23 Apr 2025 11:07:56 +0300 Subject: YT-22593: Migrate auth to std::string commit_hash:5c78798b304a05a90b7e9a5b2bcdc1d3454d9f77 --- yt/cpp/mapreduce/client/transaction_pinger.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'yt/cpp') diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp index d393e50a5d8..e65c09fa02a 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.cpp +++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp @@ -69,7 +69,7 @@ void PingTx(NHttp::IClientPtr httpClient, const TPingableTransaction& tx) if (const auto& serviceTicketAuth = tx.GetContext().ServiceTicketAuth) { const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket(); - headers->Add("X-Ya-Service-Ticket", serviceTicket); + headers->Add("X-Ya-Service-Ticket", TString(serviceTicket)); } else if (const auto& token = tx.GetContext().Token; !token.empty()) { headers->Add("Authorization", "OAuth " + token); } -- cgit v1.3 From 7c0632d935742fed09b7e3c49c5677e9bc3320b3 Mon Sep 17 00:00:00 2001 From: hiddenpath Date: Wed, 23 Apr 2025 15:43:56 +0300 Subject: YT-23616: Split TRetryfulWriter implementation into rpc and http commit_hash:2b01b1b2387f71903fc29f3364d1e5f46f35a703 --- yt/cpp/mapreduce/client/client.cpp | 1 - yt/cpp/mapreduce/client/client_writer.cpp | 1 - yt/cpp/mapreduce/client/file_writer.cpp | 1 - yt/cpp/mapreduce/client/retryful_writer.cpp | 30 ++++++++-------- yt/cpp/mapreduce/client/retryful_writer.h | 25 ++++++------- yt/cpp/mapreduce/http_client/raw_requests.cpp | 52 +++++++++++++++++++++++++-- yt/cpp/mapreduce/http_client/raw_requests.h | 6 ++++ 7 files changed, 82 insertions(+), 34 deletions(-) (limited to 'yt/cpp') diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 48ad4a352ea..8977a29b106 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -440,7 +440,6 @@ TRawTableWriterPtr TClientBase::CreateRawWriter( GetTransactionPinger(), Context_, TransactionId_, - GetWriteTableCommand(Context_.Config->ApiVersion), format, CanonizeYPath(path), options).Get(); diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp index 8a4421c1c77..6622a40a59e 100644 --- a/yt/cpp/mapreduce/client/client_writer.cpp +++ b/yt/cpp/mapreduce/client/client_writer.cpp @@ -57,7 +57,6 @@ TClientWriter::TClientWriter( std::move(transactionPinger), context, transactionId, - GetWriteTableCommand(context.Config->ApiVersion), format, path, options)); diff --git a/yt/cpp/mapreduce/client/file_writer.cpp b/yt/cpp/mapreduce/client/file_writer.cpp index aa0d7f8e146..5a8b3f3409c 100644 --- a/yt/cpp/mapreduce/client/file_writer.cpp +++ b/yt/cpp/mapreduce/client/file_writer.cpp @@ -25,7 +25,6 @@ TFileWriter::TFileWriter( std::move(transactionPinger), context, transactionId, - GetWriteFileCommand(context.Config->ApiVersion), TMaybe(), path, options) diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index f002adaed26..d68f8f4d3d3 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -99,16 +100,8 @@ void TRetryfulWriter::FlushBuffer(bool lastBlock) void TRetryfulWriter::Send(const TBuffer& buffer) { - THttpHeader header("PUT", Command_); - header.SetInputFormat(Format_); - header.MergeParameters(Parameters_); - header.SetRequestCompression(ToString(Context_.Config->ContentEncoding)); - auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_); - NDetail::TRequestConfig config; - config.IsHeavy = true; - NDetail::RequestWithRetry( CreateDefaultRequestRetryPolicy(Context_.Config), [&](TMutationId&) { @@ -116,17 +109,26 @@ void TRetryfulWriter::Send(const TBuffer& buffer) RawClient_, ClientRetryPolicy_, Context_, transactionId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); - auto input = std::make_unique(buffer); - header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true); + std::unique_ptr stream; + std::visit([this, &attemptTx, &stream] (const auto& options) -> void { + using TType = std::decay_t; + if constexpr (std::is_same_v) { + stream = NDetail::NRawClient::WriteFile(Context_, attemptTx.GetId(), Path_, options); + } else if constexpr (std::is_same_v) { + stream = NDetail::NRawClient::WriteTable(Context_, attemptTx.GetId(), Path_, Format_, options); + } else { + static_assert(TDependentFalse); + } + }, Options_); - auto request = NDetail::StartRequestWithoutRetry(Context_, header, config); - TransferData(input.get(), request->GetStream()); - request->Finish()->GetResponse(); + auto input = std::make_unique(buffer); + TransferData(input.get(), stream.get()); + stream->Finish(); attemptTx.Commit(); }); - Parameters_ = SecondaryParameters_; // all blocks except the first one are appended + Path_ = SecondaryPath_; // all blocks except the first one are appended } void TRetryfulWriter::SendThread() diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h index b0fcb13d3a6..9aa7f1a6040 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.h +++ b/yt/cpp/mapreduce/client/retryful_writer.h @@ -36,7 +36,6 @@ public: ITransactionPingerPtr transactionPinger, const TClientContext& context, const TTransactionId& parentId, - const TString& command, const TMaybe& format, const TRichYPath& path, const TWriterOptions& options) @@ -45,9 +44,10 @@ public: , TransactionPinger_(std::move(transactionPinger)) , Context_(context) , AutoFinish_(options.AutoFinish_) - , Command_(command) + , Options_(options) , Format_(format) , BufferSize_(GetBufferSize(options.WriterOptions_)) + , Path_(path) , ParentTransactionId_(parentId) , WriteTransaction_() , FilledBuffers_(2) @@ -55,15 +55,12 @@ public: , Buffer_(BufferSize_ * 2) , Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer")) { - Parameters_ = FormIORequestParameters(path, options); - - auto secondaryPath = path; - secondaryPath.Append_ = true; - secondaryPath.Schema_.Clear(); - secondaryPath.CompressionCodec_.Clear(); - secondaryPath.ErasureCodec_.Clear(); - secondaryPath.OptimizeFor_.Clear(); - SecondaryParameters_ = FormIORequestParameters(secondaryPath, options); + SecondaryPath_ = path; + SecondaryPath_.Append_ = true; + SecondaryPath_.Schema_.Clear(); + SecondaryPath_.CompressionCodec_.Clear(); + SecondaryPath_.ErasureCodec_.Clear(); + SecondaryPath_.OptimizeFor_.Clear(); if (options.CreateTransaction_) { WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); @@ -103,14 +100,14 @@ private: const ITransactionPingerPtr TransactionPinger_; const TClientContext Context_; const bool AutoFinish_; + std::variant Options_; - TString Command_; TMaybe Format_; const size_t BufferSize_; - TNode Parameters_; - TNode SecondaryParameters_; + TRichYPath Path_; + TRichYPath SecondaryPath_; TTransactionId ParentTransactionId_; TMaybe WriteTransaction_; diff --git a/yt/cpp/mapreduce/http_client/raw_requests.cpp b/yt/cpp/mapreduce/http_client/raw_requests.cpp index c07c5ac1ddd..d4bfaa7944d 100644 --- a/yt/cpp/mapreduce/http_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/http_client/raw_requests.cpp @@ -315,7 +315,33 @@ struct THttpRequestStream : public IOutputStream { public: - THttpRequestStream(NHttpClient::IHttpRequestPtr request, size_t bufferSize) + THttpRequestStream(NHttpClient::IHttpRequestPtr request) + : Request_(std::move(request)) + , Underlying_(Request_->GetStream()) + { } + +private: + void DoWrite(const void* buf, size_t len) override + { + Underlying_->Write(buf, len); + } + + void DoFinish() override + { + Underlying_->Finish(); + Request_->Finish()->GetResponse(); + } + +private: + NHttpClient::IHttpRequestPtr Request_; + IOutputStream* Underlying_; +}; + +struct THttpBufferedRequestStream + : public IOutputStream +{ +public: + THttpBufferedRequestStream(NHttpClient::IHttpRequestPtr request, size_t bufferSize) : Request_(std::move(request)) , Underlying_(std::make_unique(Request_->GetStream(), bufferSize)) { } @@ -334,7 +360,7 @@ private: private: NHttpClient::IHttpRequestPtr Request_; - std::unique_ptr Underlying_; + std::unique_ptr Underlying_; }; std::unique_ptr WriteTable( @@ -353,7 +379,27 @@ std::unique_ptr WriteTable( TRequestConfig config; config.IsHeavy = true; auto request = StartRequestWithoutRetry(context, header, config); - return std::make_unique(std::move(request), options.BufferSize_); + if (options.SingleHttpRequest_) { + return std::make_unique(std::move(request), options.BufferSize_); + } + return std::make_unique(std::move(request)); +} + +std::unique_ptr WriteFile( + const TClientContext& context, + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileWriterOptions& options) +{ + THttpHeader header("PUT", GetWriteFileCommand(context.Config->ApiVersion)); + header.AddTransactionId(transactionId); + header.SetRequestCompression(ToString(context.Config->ContentEncoding)); + header.MergeParameters(FormIORequestParameters(path, options)); + + TRequestConfig config; + config.IsHeavy = true; + auto request = StartRequestWithoutRetry(context, header, config); + return std::make_unique(std::move(request)); } TAuthorizationInfo WhoAmI(const TClientContext& context) diff --git a/yt/cpp/mapreduce/http_client/raw_requests.h b/yt/cpp/mapreduce/http_client/raw_requests.h index 63f39f62bba..d7e1c1eebc2 100644 --- a/yt/cpp/mapreduce/http_client/raw_requests.h +++ b/yt/cpp/mapreduce/http_client/raw_requests.h @@ -55,6 +55,12 @@ std::unique_ptr WriteTable( const TMaybe& format, const TTableWriterOptions& options); +std::unique_ptr WriteFile( + const TClientContext& context, + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileWriterOptions& options); + TAuthorizationInfo WhoAmI(const TClientContext& context); //////////////////////////////////////////////////////////////////////////////// -- cgit v1.3