diff options
| author | achains <[email protected]> | 2025-08-28 13:27:48 +0300 |
|---|---|---|
| committer | achains <[email protected]> | 2025-08-28 13:47:27 +0300 |
| commit | 77ea11423f959e51795cc3ef36a48d808b4ffb98 (patch) | |
| tree | 779bbe91b4a23bc8d729260203f9bdf7f6e31d29 | |
| parent | 20d1c4dc5dd3136b9b4fc6cd3498c1e43a98c83c (diff) | |
YT-23616: support rpc write table
* Changelog entry
Type: feature
Component: cpp-sdk
Support write table via RPC Proxy in C\+\+ client
commit_hash:fccc00849eafded757e3f1332da1fe209a127ec9
| -rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.cpp | 7 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http_client/raw_client.cpp | 45 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http_client/raw_client.h | 6 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http_client/raw_requests.h | 6 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 6 |
5 files changed, 29 insertions, 41 deletions
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index 36c68fb4ce4..01ef0adf027 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -1,7 +1,5 @@ #include "retryful_writer.h" -#include "retry_heavy_write_request.h" - #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/interface/errors.h> @@ -9,9 +7,6 @@ #include <yt/cpp/mapreduce/interface/logging/yt_log.h> -#include <yt/cpp/mapreduce/http_client/raw_client.h> -#include <yt/cpp/mapreduce/http_client/raw_requests.h> - #include <util/generic/size_literals.h> namespace NYT { @@ -119,7 +114,7 @@ void TRetryfulWriter::Send(const TBuffer& buffer) if constexpr (std::is_same_v<TType, TFileWriterOptions>) { stream = RawClient_->WriteFile(attemptTx.GetId(), Path_, options); } else if constexpr (std::is_same_v<TType, TTableWriterOptions>) { - stream = NDetail::NRawClient::WriteTable(Context_, attemptTx.GetId(), Path_, Format_, options); + stream = RawClient_->WriteTable(attemptTx.GetId(), Path_, Format_, options); } else { static_assert(TDependentFalse<TType>); } diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp index d2c64257222..48b0265f087 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.cpp +++ b/yt/cpp/mapreduce/http_client/raw_client.cpp @@ -748,6 +748,15 @@ TNode::TListType THttpRawClient::SelectRows( return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList(); } +std::unique_ptr<IOutputStream> THttpRawClient::WriteTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableWriterOptions& options) +{ + return NRawClient::WriteTable(Context_, transactionId, path, format, options); +} + std::unique_ptr<IInputStream> THttpRawClient::ReadTable( const TTransactionId& transactionId, const TRichYPath& path, @@ -767,46 +776,12 @@ std::unique_ptr<IInputStream> THttpRawClient::ReadTable( return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo)); } -struct THttpRequestStream - : public IOutputStream -{ -public: - THttpRequestStream(NHttpClient::IHttpRequestPtr request) - : Request_(std::move(request)) - , Underlying_(Request_->GetStream()) - { } - -private: - void DoWrite(const void* buf, size_t len) override - { - Underlying_->Write(buf, len); - } - - void DoFinish() override - { - Underlying_->Finish(); - Request_->Finish()->GetResponse(); - } - -private: - NHttpClient::IHttpRequestPtr Request_; - IOutputStream* Underlying_; -}; - std::unique_ptr<IOutputStream> THttpRawClient::WriteFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileWriterOptions& options) { - THttpHeader header("PUT", GetWriteFileCommand(Context_.Config->ApiVersion)); - header.AddTransactionId(transactionId); - header.SetRequestCompression(ToString(Context_.Config->ContentEncoding)); - header.MergeParameters(FormIORequestParameters(path, options)); - - TRequestConfig config; - config.IsHeavy = true; - auto request = StartRequestWithoutRetry(Context_, header, config); - return std::make_unique<THttpRequestStream>(std::move(request)); + return NRawClient::WriteFile(Context_, transactionId, path, options); } std::unique_ptr<IInputStream> THttpRawClient::ReadTablePartition( diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h index 0f1b9df9fec..be2f6bfba14 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.h +++ b/yt/cpp/mapreduce/http_client/raw_client.h @@ -279,6 +279,12 @@ public: const TString& query, const TSelectRowsOptions& options = {}) override; + std::unique_ptr<IOutputStream> WriteTable( + const TTransactionId& transactionId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableWriterOptions& options = {}) override; + std::unique_ptr<IInputStream> ReadTable( const TTransactionId& transactionId, const TRichYPath& path, diff --git a/yt/cpp/mapreduce/http_client/raw_requests.h b/yt/cpp/mapreduce/http_client/raw_requests.h index 63f39f62bba..d7e1c1eebc2 100644 --- a/yt/cpp/mapreduce/http_client/raw_requests.h +++ b/yt/cpp/mapreduce/http_client/raw_requests.h @@ -55,6 +55,12 @@ std::unique_ptr<IOutputStream> WriteTable( const TMaybe<TFormat>& format, const TTableWriterOptions& options); +std::unique_ptr<IOutputStream> WriteFile( + const TClientContext& context, + const TTransactionId& transactionId, + const TRichYPath& path, + const TFileWriterOptions& options); + TAuthorizationInfo WhoAmI(const TClientContext& context); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 684ac6ca707..27a907dd10e 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -281,6 +281,12 @@ public: const TYPath& path, const TAlterTableOptions& options = {}) = 0; + virtual std::unique_ptr<IOutputStream> WriteTable( + const TTransactionId& transcationId, + const TRichYPath& path, + const TMaybe<TFormat>& format, + const TTableWriterOptions& options = {}) = 0; + virtual std::unique_ptr<IInputStream> ReadTable( const TTransactionId& transactionId, const TRichYPath& path, |
