diff options
| author | achains <[email protected]> | 2025-11-10 16:32:52 +0300 |
|---|---|---|
| committer | achains <[email protected]> | 2025-11-10 17:26:03 +0300 |
| commit | 28244e705d32f688896c3f2986d012d74fc1e487 (patch) | |
| tree | c107ae43ed3c34840ba830965fb45e987f647fec /yt/cpp/mapreduce/client/client.cpp | |
| parent | 4a9fcc0815e9f0896644f22f0de847abffa8ebcc (diff) | |
YT-26425: Distributed API http proxy light requests
* Changelog entry
Type: feature
Component: cpp-sdk
Support distributed API in C\+\+ SDK
<Message for release notes>
commit_hash:689a3c978864fa4623f3b38ce031faa96532b3fe
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index c060634ec4e..ce7ceaba581 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -5,6 +5,7 @@ #include "client_writer.h" #include "file_reader.h" #include "file_writer.h" +#include "file_fragment_writer.h" #include "format_hints.h" #include "init.h" #include "lock.h" @@ -401,6 +402,13 @@ IFileWriterPtr TClientBase::CreateFileWriter( return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options); } +IFileFragmentWriterPtr TClientBase::CreateFileFragmentWriter( + const TDistributedWriteFileCookie& cookie, + const TFileFragmentWriterOptions& options) +{ + return NDetail::CreateFileFragmentWriter(RawClient_, ClientRetryPolicy_->CreatePolicyForGenericRequest(), cookie, options); +} + TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter( const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options) { @@ -978,6 +986,23 @@ THolder<TClientWriter> TClientBase::CreateClientWriter( } } +::TIntrusivePtr<ITableFragmentWriter<TNode>> TClientBase::CreateNodeFragmentWriter( + const TDistributedWriteTableCookie& cookie, + const TTableFragmentWriterOptions& options) +{ + auto format = TFormat::YsonBinary(); + + // TODO(achains): Make proper wrapper class with retries and auto ping. + auto stream = NDetail::RequestWithRetry<std::unique_ptr<IOutputStreamWithResponse>>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [&] (TMutationId /*mutationId*/) { + return RawClient_->WriteTableFragment(cookie, format, options); + } + ); + + return ::MakeIntrusive<TNodeTableFragmentWriter>(std::move(stream)); +} + TBatchRequestPtr TClientBase::CreateBatchRequest() { return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl()); @@ -1497,6 +1522,82 @@ void TClient::ResumeOperation( }); } +TDistributedWriteTableSessionWithCookies TClient::StartDistributedWriteTableSession( + const TRichYPath& richPath, + i64 cookieCount, + const TStartDistributedWriteTableOptions& options) +{ + CheckShutdown(); + return RequestWithRetry<TDistributedWriteTableSessionWithCookies>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, cookieCount, &richPath, &options] (TMutationId& mutationId) { + return RawClient_->StartDistributedWriteTableSession(mutationId, richPath, cookieCount, options); + }); +} + +void TClient::PingDistributedWriteTableSession( + const TDistributedWriteTableSession& session, + const TPingDistributedWriteTableOptions& options) +{ + CheckShutdown(); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &session, &options] (TMutationId& /*mutationId*/) { + RawClient_->PingDistributedWriteTableSession(session, options); + }); +} + +void TClient::FinishDistributedWriteTableSession( + const TDistributedWriteTableSession& session, + const TVector<TWriteTableFragmentResult>& results, + const TFinishDistributedWriteTableOptions& options) +{ + CheckShutdown(); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &session, &results, &options] (TMutationId& mutationId) { + RawClient_->FinishDistributedWriteTableSession(mutationId, session, results, options); + }); +} + +TDistributedWriteFileSessionWithCookies TClient::StartDistributedWriteFileSession( + const TRichYPath& richPath, + i64 cookieCount, + const TStartDistributedWriteFileOptions& options) +{ + CheckShutdown(); + return RequestWithRetry<TDistributedWriteFileSessionWithCookies>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, cookieCount, &richPath, &options] (TMutationId& mutationId) { + return RawClient_->StartDistributedWriteFileSession(mutationId, richPath, cookieCount, options); + }); +} + +void TClient::PingDistributedWriteFileSession( + const TDistributedWriteFileSession& session, + const TPingDistributedWriteFileOptions& options) +{ + CheckShutdown(); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &session, &options] (TMutationId& /*mutationId*/) { + RawClient_->PingDistributedWriteFileSession(session, options); + }); +} + +void TClient::FinishDistributedWriteFileSession( + const TDistributedWriteFileSession& session, + const TVector<TWriteFileFragmentResult>& results, + const TFinishDistributedWriteFileOptions& options) +{ + CheckShutdown(); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &session, &results, &options] (TMutationId& mutationId) { + RawClient_->FinishDistributedWriteFileSession(mutationId, session, results, options); + }); +} + TYtPoller& TClient::GetYtPoller() { auto g = Guard(Lock_); |
