summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorachains <[email protected]>2025-08-28 13:27:48 +0300
committerachains <[email protected]>2025-08-28 13:47:27 +0300
commit77ea11423f959e51795cc3ef36a48d808b4ffb98 (patch)
tree779bbe91b4a23bc8d729260203f9bdf7f6e31d29
parent20d1c4dc5dd3136b9b4fc6cd3498c1e43a98c83c (diff)
YT-23616: support rpc write table
* Changelog entry Type: feature Component: cpp-sdk Support write table via RPC Proxy in C\+\+ client commit_hash:fccc00849eafded757e3f1332da1fe209a127ec9
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.cpp7
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.cpp45
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.h6
-rw-r--r--yt/cpp/mapreduce/http_client/raw_requests.h6
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h6
5 files changed, 29 insertions, 41 deletions
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp
index 36c68fb4ce4..01ef0adf027 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer.cpp
@@ -1,7 +1,5 @@
#include "retryful_writer.h"
-#include "retry_heavy_write_request.h"
-
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/interface/errors.h>
@@ -9,9 +7,6 @@
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
-#include <yt/cpp/mapreduce/http_client/raw_client.h>
-#include <yt/cpp/mapreduce/http_client/raw_requests.h>
-
#include <util/generic/size_literals.h>
namespace NYT {
@@ -119,7 +114,7 @@ void TRetryfulWriter::Send(const TBuffer& buffer)
if constexpr (std::is_same_v<TType, TFileWriterOptions>) {
stream = RawClient_->WriteFile(attemptTx.GetId(), Path_, options);
} else if constexpr (std::is_same_v<TType, TTableWriterOptions>) {
- stream = NDetail::NRawClient::WriteTable(Context_, attemptTx.GetId(), Path_, Format_, options);
+ stream = RawClient_->WriteTable(attemptTx.GetId(), Path_, Format_, options);
} else {
static_assert(TDependentFalse<TType>);
}
diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp
index d2c64257222..48b0265f087 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/http_client/raw_client.cpp
@@ -748,6 +748,15 @@ TNode::TListType THttpRawClient::SelectRows(
return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
}
+std::unique_ptr<IOutputStream> THttpRawClient::WriteTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableWriterOptions& options)
+{
+ return NRawClient::WriteTable(Context_, transactionId, path, format, options);
+}
+
std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
@@ -767,46 +776,12 @@ std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
}
-struct THttpRequestStream
- : public IOutputStream
-{
-public:
- THttpRequestStream(NHttpClient::IHttpRequestPtr request)
- : Request_(std::move(request))
- , Underlying_(Request_->GetStream())
- { }
-
-private:
- void DoWrite(const void* buf, size_t len) override
- {
- Underlying_->Write(buf, len);
- }
-
- void DoFinish() override
- {
- Underlying_->Finish();
- Request_->Finish()->GetResponse();
- }
-
-private:
- NHttpClient::IHttpRequestPtr Request_;
- IOutputStream* Underlying_;
-};
-
std::unique_ptr<IOutputStream> THttpRawClient::WriteFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileWriterOptions& options)
{
- THttpHeader header("PUT", GetWriteFileCommand(Context_.Config->ApiVersion));
- header.AddTransactionId(transactionId);
- header.SetRequestCompression(ToString(Context_.Config->ContentEncoding));
- header.MergeParameters(FormIORequestParameters(path, options));
-
- TRequestConfig config;
- config.IsHeavy = true;
- auto request = StartRequestWithoutRetry(Context_, header, config);
- return std::make_unique<THttpRequestStream>(std::move(request));
+ return NRawClient::WriteFile(Context_, transactionId, path, options);
}
std::unique_ptr<IInputStream> THttpRawClient::ReadTablePartition(
diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h
index 0f1b9df9fec..be2f6bfba14 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.h
+++ b/yt/cpp/mapreduce/http_client/raw_client.h
@@ -279,6 +279,12 @@ public:
const TString& query,
const TSelectRowsOptions& options = {}) override;
+ std::unique_ptr<IOutputStream> WriteTable(
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableWriterOptions& options = {}) override;
+
std::unique_ptr<IInputStream> ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
diff --git a/yt/cpp/mapreduce/http_client/raw_requests.h b/yt/cpp/mapreduce/http_client/raw_requests.h
index 63f39f62bba..d7e1c1eebc2 100644
--- a/yt/cpp/mapreduce/http_client/raw_requests.h
+++ b/yt/cpp/mapreduce/http_client/raw_requests.h
@@ -55,6 +55,12 @@ std::unique_ptr<IOutputStream> WriteTable(
const TMaybe<TFormat>& format,
const TTableWriterOptions& options);
+std::unique_ptr<IOutputStream> WriteFile(
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TRichYPath& path,
+ const TFileWriterOptions& options);
+
TAuthorizationInfo WhoAmI(const TClientContext& context);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 684ac6ca707..27a907dd10e 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -281,6 +281,12 @@ public:
const TYPath& path,
const TAlterTableOptions& options = {}) = 0;
+ virtual std::unique_ptr<IOutputStream> WriteTable(
+ const TTransactionId& transcationId,
+ const TRichYPath& path,
+ const TMaybe<TFormat>& format,
+ const TTableWriterOptions& options = {}) = 0;
+
virtual std::unique_ptr<IInputStream> ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,