summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/client.cpp
diff options
context:
space:
mode:
authorachains <[email protected]>2025-11-10 16:32:52 +0300
committerachains <[email protected]>2025-11-10 17:26:03 +0300
commit28244e705d32f688896c3f2986d012d74fc1e487 (patch)
treec107ae43ed3c34840ba830965fb45e987f647fec /yt/cpp/mapreduce/client/client.cpp
parent4a9fcc0815e9f0896644f22f0de847abffa8ebcc (diff)
YT-26425: Distributed API http proxy light requests
* Changelog entry Type: feature Component: cpp-sdk Support distributed API in C\+\+ SDK <Message for release notes> commit_hash:689a3c978864fa4623f3b38ce031faa96532b3fe
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp101
1 files changed, 101 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index c060634ec4e..ce7ceaba581 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -5,6 +5,7 @@
#include "client_writer.h"
#include "file_reader.h"
#include "file_writer.h"
+#include "file_fragment_writer.h"
#include "format_hints.h"
#include "init.h"
#include "lock.h"
@@ -401,6 +402,13 @@ IFileWriterPtr TClientBase::CreateFileWriter(
return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
}
+IFileFragmentWriterPtr TClientBase::CreateFileFragmentWriter(
+ const TDistributedWriteFileCookie& cookie,
+ const TFileFragmentWriterOptions& options)
+{
+ return NDetail::CreateFileFragmentWriter(RawClient_, ClientRetryPolicy_->CreatePolicyForGenericRequest(), cookie, options);
+}
+
TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options)
{
@@ -978,6 +986,23 @@ THolder<TClientWriter> TClientBase::CreateClientWriter(
}
}
+::TIntrusivePtr<ITableFragmentWriter<TNode>> TClientBase::CreateNodeFragmentWriter(
+ const TDistributedWriteTableCookie& cookie,
+ const TTableFragmentWriterOptions& options)
+{
+ auto format = TFormat::YsonBinary();
+
+ // TODO(achains): Make proper wrapper class with retries and auto ping.
+ auto stream = NDetail::RequestWithRetry<std::unique_ptr<IOutputStreamWithResponse>>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [&] (TMutationId /*mutationId*/) {
+ return RawClient_->WriteTableFragment(cookie, format, options);
+ }
+ );
+
+ return ::MakeIntrusive<TNodeTableFragmentWriter>(std::move(stream));
+}
+
TBatchRequestPtr TClientBase::CreateBatchRequest()
{
return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl());
@@ -1497,6 +1522,82 @@ void TClient::ResumeOperation(
});
}
+TDistributedWriteTableSessionWithCookies TClient::StartDistributedWriteTableSession(
+ const TRichYPath& richPath,
+ i64 cookieCount,
+ const TStartDistributedWriteTableOptions& options)
+{
+ CheckShutdown();
+ return RequestWithRetry<TDistributedWriteTableSessionWithCookies>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, cookieCount, &richPath, &options] (TMutationId& mutationId) {
+ return RawClient_->StartDistributedWriteTableSession(mutationId, richPath, cookieCount, options);
+ });
+}
+
+void TClient::PingDistributedWriteTableSession(
+ const TDistributedWriteTableSession& session,
+ const TPingDistributedWriteTableOptions& options)
+{
+ CheckShutdown();
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &session, &options] (TMutationId& /*mutationId*/) {
+ RawClient_->PingDistributedWriteTableSession(session, options);
+ });
+}
+
+void TClient::FinishDistributedWriteTableSession(
+ const TDistributedWriteTableSession& session,
+ const TVector<TWriteTableFragmentResult>& results,
+ const TFinishDistributedWriteTableOptions& options)
+{
+ CheckShutdown();
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &session, &results, &options] (TMutationId& mutationId) {
+ RawClient_->FinishDistributedWriteTableSession(mutationId, session, results, options);
+ });
+}
+
+TDistributedWriteFileSessionWithCookies TClient::StartDistributedWriteFileSession(
+ const TRichYPath& richPath,
+ i64 cookieCount,
+ const TStartDistributedWriteFileOptions& options)
+{
+ CheckShutdown();
+ return RequestWithRetry<TDistributedWriteFileSessionWithCookies>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, cookieCount, &richPath, &options] (TMutationId& mutationId) {
+ return RawClient_->StartDistributedWriteFileSession(mutationId, richPath, cookieCount, options);
+ });
+}
+
+void TClient::PingDistributedWriteFileSession(
+ const TDistributedWriteFileSession& session,
+ const TPingDistributedWriteFileOptions& options)
+{
+ CheckShutdown();
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &session, &options] (TMutationId& /*mutationId*/) {
+ RawClient_->PingDistributedWriteFileSession(session, options);
+ });
+}
+
+void TClient::FinishDistributedWriteFileSession(
+ const TDistributedWriteFileSession& session,
+ const TVector<TWriteFileFragmentResult>& results,
+ const TFinishDistributedWriteFileOptions& options)
+{
+ CheckShutdown();
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &session, &results, &options] (TMutationId& mutationId) {
+ RawClient_->FinishDistributedWriteFileSession(mutationId, session, results, options);
+ });
+}
+
TYtPoller& TClient::GetYtPoller()
{
auto g = Guard(Lock_);