summaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authorachains <[email protected]>2025-11-13 10:43:43 +0300
committerachains <[email protected]>2025-11-13 11:03:19 +0300
commit7b74fa8fc7cc82d96a0cba57d21b772fd1400a0a (patch)
treef36d72a389c6413e56a065295c05c505f953be8c /yt/cpp
parent1c9f50a532cbec8d3be39d0cf6a008edb60929b3 (diff)
YT-26425: RPC support distributed API
* Changelog entry Type: feature Component: cpp-sdk Support Distributed write API methods for RPC proxies in C\+\+ client. commit_hash:1b1247f95c77a2de02d16f56dcc3291e772be2f2
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/interface/distributed_session.h20
-rw-r--r--yt/cpp/mapreduce/rpc_client/raw_client.cpp282
-rw-r--r--yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp58
-rw-r--r--yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h18
4 files changed, 343 insertions, 35 deletions
diff --git a/yt/cpp/mapreduce/interface/distributed_session.h b/yt/cpp/mapreduce/interface/distributed_session.h
index 70cf4db0b5d..2e6e23ce91a 100644
--- a/yt/cpp/mapreduce/interface/distributed_session.h
+++ b/yt/cpp/mapreduce/interface/distributed_session.h
@@ -11,6 +11,8 @@
#include <library/cpp/yt/misc/strong_typedef.h>
+#include <util/datetime/base.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -64,7 +66,16 @@ struct TDistributedWriteTableSessionWithCookies
struct TStartDistributedWriteTableOptions
{
+ /// @cond Doxygen_Suppress
using TSelf = TStartDistributedWriteTableOptions;
+ /// @endcond
+
+ ///
+ /// @brief How long session lives after last ping.
+ ///
+ /// If server doesn't receive any pings for session transaction for this time
+ /// session will be aborted. By default timeout is 15 seconds.
+ FLUENT_FIELD_OPTION(TDuration, Timeout);
};
struct TPingDistributedWriteTableOptions
@@ -90,7 +101,16 @@ struct TDistributedWriteFileSessionWithCookies
struct TStartDistributedWriteFileOptions
{
+ /// @cond Doxygen_Suppress
using TSelf = TStartDistributedWriteFileOptions;
+ /// @endcond
+
+ ///
+ /// @brief How long session lives after last ping.
+ ///
+ /// If server doesn't receive any pings for session transaction for this time
+ /// session will be aborted. By default timeout is 15 seconds.
+ FLUENT_FIELD_OPTION(TDuration, Timeout);
};
struct TPingDistributedWriteFileOptions
diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp
index 64d51f87499..f35fc6ce924 100644
--- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp
@@ -20,6 +20,8 @@
#include <yt/yt/client/table_client/name_table.h>
#include <yt/yt/client/table_client/row_buffer.h>
+#include <yt/yt/client/signature/signature.h>
+
#include <yt/yt/client/transaction_client/timestamp_provider.h>
#include <yt/yt/core/misc/protobuf_helpers.h>
@@ -177,6 +179,11 @@ EJobState FromApiJobState(NJobTrackerClient::EJobState state)
}
}
+NYTree::INodePtr ToApiNode(const TNode& node)
+{
+ return NYTree::ConvertToNode(NYson::TYsonString(NodeToYsonString(node, NYson::EYsonFormat::Binary)));
+}
+
////////////////////////////////////////////////////////////////////////////////
class TSyncRpcInputStream
@@ -1048,8 +1055,7 @@ void TRpcRawClient::ReshardTableByPivotKeys(
}
NTableClient::TLegacyOwningKey pivotKey;
- Deserialize(pivotKey, NYTree::ConvertToNode(NYson::TYsonString(
- NodeToYsonString(keysNodesList, NYson::EYsonFormat::Binary))));
+ Deserialize(pivotKey, ToApiNode(keysNodesList));
pivotKeys.emplace_back(std::move(pivotKey));
}
@@ -1319,8 +1325,7 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable(
lowerLimitKeyNode.Add(lowerKeyNode);
NTableClient::TOwningKeyBound lowerKeyBound;
- Deserialize(lowerKeyBound, NYTree::ConvertToNode(NYson::TYsonString(
- NodeToYsonString(lowerLimitKeyNode, NYson::EYsonFormat::Binary))));
+ Deserialize(lowerKeyBound, ToApiNode(lowerLimitKeyNode));
auto upperKeyNode = TNode::CreateList(key.Parts_);
upperKeyNode.Add(std::numeric_limits<i64>::max());
@@ -1330,8 +1335,7 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable(
upperLimitKeyNode.Add(upperKeyNode);
NTableClient::TOwningKeyBound upperKeyBound;
- Deserialize(upperKeyBound, NYTree::ConvertToNode(NYson::TYsonString(
- NodeToYsonString(upperLimitKeyNode, NYson::EYsonFormat::Binary))));
+ Deserialize(upperKeyBound, ToApiNode(upperLimitKeyNode));
auto richPath = ToApiRichPath(path);
richPath.SetRanges({
@@ -1427,68 +1431,276 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const NApi::TCheckPermissi
}
TDistributedWriteTableSessionWithCookies TRpcRawClient::StartDistributedWriteTableSession(
- TMutationId& /*mutationId*/,
- const TRichYPath& /*richPath*/,
- i64 /*cookieCount*/,
- const TStartDistributedWriteTableOptions& /*options*/)
+ TMutationId& mutationId,
+ const TRichYPath& richPath,
+ i64 cookieCount,
+ const TStartDistributedWriteTableOptions& options)
{
- ythrow TApiUsageError() << "Not implemented";
+ auto future = Client_->StartDistributedWriteSession(
+ ToApiRichPath(richPath),
+ SerializeOptionsForStartDistributedTableSession(mutationId, cookieCount, options));
+
+ auto apiSession = WaitAndProcess(future);
+
+ TNode session;
+ TNodeBuilder builder(&session);
+ NYTree::Serialize(apiSession.Session, &builder);
+
+ TVector<TDistributedWriteTableCookie> cookies;
+ cookies.reserve(apiSession.Cookies.size());
+ for (const auto& cookie : apiSession.Cookies) {
+ TNode cookieNode;
+ TNodeBuilder cookieNodeBuilder(&cookieNode);
+ NYTree::Serialize(cookie, &cookieNodeBuilder);
+ cookies.push_back(TDistributedWriteTableCookie(std::move(cookieNode)));
+ }
+
+ TDistributedWriteTableSessionWithCookies result;
+ result.Session(TDistributedWriteTableSession(std::move(session)));
+ result.Cookies(std::move(cookies));
+ return result;
}
void TRpcRawClient::PingDistributedWriteTableSession(
- const TDistributedWriteTableSession& /*session*/,
+ const TDistributedWriteTableSession& session,
const TPingDistributedWriteTableOptions& /*options*/)
{
- ythrow TApiUsageError() << "Not implemented";
+ auto apiSession = NYTree::ConvertTo<NApi::TSignedDistributedWriteSessionPtr>(ToApiNode(session.Underlying()));
+
+ auto future = Client_->PingDistributedWriteSession(apiSession);
+ WaitAndProcess(future);
}
void TRpcRawClient::FinishDistributedWriteTableSession(
- TMutationId& /*mutationId*/,
- const TDistributedWriteTableSession& /*session*/,
- const TVector<TWriteTableFragmentResult>& /*results*/,
- const TFinishDistributedWriteTableOptions& /*options*/)
+ TMutationId& mutationId,
+ const TDistributedWriteTableSession& session,
+ const TVector<TWriteTableFragmentResult>& results,
+ const TFinishDistributedWriteTableOptions& options)
{
- ythrow TApiUsageError() << "Not implemented";
+ auto apiSession = NYTree::ConvertTo<NApi::TSignedDistributedWriteSessionPtr>(ToApiNode(session.Underlying()));
+
+ std::vector<NApi::TSignedWriteFragmentResultPtr> apiResults;
+ apiResults.reserve(results.size());
+
+ for (const auto& writeFragment : results) {
+ apiResults.push_back(NYTree::ConvertTo<NApi::TSignedWriteFragmentResultPtr>(ToApiNode(writeFragment.Underlying())));
+ }
+
+ NApi::TDistributedWriteSessionWithResults sessionWithResults;
+ sessionWithResults.Session = std::move(apiSession);
+ sessionWithResults.Results = std::move(apiResults);
+
+ auto future = Client_->FinishDistributedWriteSession(
+ sessionWithResults,
+ SerializeOptionsForFinishDistributedTableSession(mutationId, options));
+
+ WaitAndProcess(future);
}
+class TTableFragmentStreamWithResponse
+ : public IOutputStreamWithResponse
+{
+ using TPromise = TPromise<NApi::TSignedWriteFragmentResultPtr>;
+
+public:
+ TTableFragmentStreamWithResponse(std::unique_ptr<IOutputStream> underlying, TPromise promise)
+ : Underlying_(std::move(underlying))
+ , ResponsePromise_(std::move(promise))
+ { }
+
+ TString GetResponse() const override
+ {
+ if (!ResponsePromise_.IsSet()) {
+ ythrow TApiUsageError() << "Can't get response before stream is closed.";
+ }
+ return Response_;
+ }
+
+private:
+ const std::unique_ptr<IOutputStream> Underlying_;
+ TPromise ResponsePromise_;
+ TString Response_;
+
+ void DoWrite(const void* buf, size_t len) override
+ {
+ Underlying_->Write(buf, len);
+ }
+
+ void DoFinish() override
+ {
+ Underlying_->Finish();
+ Response_ = ParseResponse();
+ }
+
+ TString ParseResponse() const
+ {
+ auto rsp = ResponsePromise_.Get().ValueOrThrow();
+ TNode writeResult;
+ TNodeBuilder builder(&writeResult);
+ NYTree::Serialize(rsp, &builder);
+ return NodeToYsonString(writeResult, NYson::EYsonFormat::Binary);
+ }
+};
+
std::unique_ptr<IOutputStreamWithResponse> TRpcRawClient::WriteTableFragment(
- const TDistributedWriteTableCookie& /*cookie*/,
- const TMaybe<TFormat>& /*format*/,
+ const TDistributedWriteTableCookie& cookie,
+ const TMaybe<TFormat>& format,
const TTableFragmentWriterOptions& /*options*/)
{
- ythrow TApiUsageError() << "Not implemented";
+ using TRspPtr = TIntrusivePtr<NRpc::TTypedClientResponse<NApi::NRpcProxy::NProto::TRspWriteTableFragment>>;
+
+ auto* clientBase = VerifyDynamicCast<NApi::NRpcProxy::TClientBase*>(Client_.Get());
+
+ auto proxy = clientBase->CreateApiServiceProxy();
+
+ auto req = proxy.WriteTableFragment();
+ clientBase->InitStreamingRequest(*req);
+
+ req->set_signed_cookie(NYson::TYsonString(NodeToYsonString(cookie.Underlying(), NYson::EYsonFormat::Text)).ToString());
+
+ if (format) {
+ req->set_format(NYson::TYsonString(NodeToYsonString(format->Config, NYson::EYsonFormat::Text)).ToString());
+ }
+
+ auto promise = NewPromise<NApi::TSignedWriteFragmentResultPtr>();
+
+ auto future = CreateRpcClientOutputStream(
+ std::move(req),
+ BIND ([=] (const TSharedRef& metaRef) {
+ NApi::NRpcProxy::NProto::TWriteTableMeta meta;
+ if (!TryDeserializeProto(&meta, metaRef)) {
+ THROW_ERROR_EXCEPTION("Failed to deserialize schema for table fragment writer");
+ }
+ }),
+ BIND([=] (TRspPtr&& rsp) {
+ promise.Set(ConvertTo<NApi::TSignedWriteFragmentResultPtr>(NYson::TYsonString(rsp->signed_write_result())));
+ }));
+
+ auto stream = WaitAndProcess(future);
+
+ auto rowStream = New<TSerializingRowStream>(std::move(stream));
+ auto syncStream = std::make_unique<TSyncRpcOutputStream>(std::move(rowStream));
+
+ return std::make_unique<TTableFragmentStreamWithResponse>(std::move(syncStream), std::move(promise));
}
TDistributedWriteFileSessionWithCookies TRpcRawClient::StartDistributedWriteFileSession(
- TMutationId& /*mutationId*/,
- const TRichYPath& /*richPath*/,
- i64 /*cookieCount*/,
- const TStartDistributedWriteFileOptions& /*options*/)
+ TMutationId& mutationId,
+ const TRichYPath& richPath,
+ i64 cookieCount,
+ const TStartDistributedWriteFileOptions& options)
{
- ythrow TApiUsageError() << "Not implemented";
+ auto future = Client_->StartDistributedWriteFileSession(
+ ToApiRichPath(richPath),
+ SerializeOptionsForStartDistributedFileSession(mutationId, cookieCount, options));
+
+ auto apiSession = WaitAndProcess(future);
+
+ TNode session;
+ TNodeBuilder builder(&session);
+ NYTree::Serialize(apiSession.Session, &builder);
+
+ TVector<TDistributedWriteFileCookie> cookies;
+ cookies.reserve(apiSession.Cookies.size());
+ for (const auto& cookie : apiSession.Cookies) {
+ TNode cookieNode;
+ TNodeBuilder cookieNodeBuilder(&cookieNode);
+ NYTree::Serialize(cookie, &cookieNodeBuilder);
+ cookies.push_back(TDistributedWriteFileCookie(std::move(cookieNode)));
+ }
+
+ TDistributedWriteFileSessionWithCookies result;
+ result.Session(TDistributedWriteFileSession(std::move(session)));
+ result.Cookies(std::move(cookies));
+ return result;
}
void TRpcRawClient::PingDistributedWriteFileSession(
- const TDistributedWriteFileSession& /*session*/,
+ const TDistributedWriteFileSession& session,
const TPingDistributedWriteFileOptions& /*options*/)
{
- ythrow TApiUsageError() << "Not implemented";
+ auto apiSession = NYTree::ConvertTo<NApi::TSignedDistributedWriteFileSessionPtr>(ToApiNode(session.Underlying()));
+
+ auto future = Client_->PingDistributedWriteFileSession(apiSession);
+ WaitAndProcess(future);
}
void TRpcRawClient::FinishDistributedWriteFileSession(
- TMutationId& /*mutationId*/,
- const TDistributedWriteFileSession& /*session*/,
- const TVector<TWriteFileFragmentResult>& /*results*/,
- const TFinishDistributedWriteFileOptions& /*options*/)
+ TMutationId& mutationId,
+ const TDistributedWriteFileSession& session,
+ const TVector<TWriteFileFragmentResult>& results,
+ const TFinishDistributedWriteFileOptions& options)
{
- ythrow TApiUsageError() << "Not implemented";
+ auto apiSession = NYTree::ConvertTo<NApi::TSignedDistributedWriteFileSessionPtr>(ToApiNode(session.Underlying()));
+
+ std::vector<NApi::TSignedWriteFileFragmentResultPtr> apiResults;
+ apiResults.reserve(results.size());
+
+ for (const auto& writeFragment : results) {
+ apiResults.push_back(NYTree::ConvertTo<NApi::TSignedWriteFileFragmentResultPtr>(ToApiNode(writeFragment.Underlying())));
+ }
+
+ NApi::TDistributedWriteFileSessionWithResults sessionWithResults;
+ sessionWithResults.Session = std::move(apiSession);
+ sessionWithResults.Results = std::move(apiResults);
+
+ auto future = Client_->FinishDistributedWriteFileSession(
+ sessionWithResults,
+ SerializeOptionsForFinishDistributedFileSession(mutationId, options));
+
+ WaitAndProcess(future);
}
+class TFileFragmentStreamWithResponse
+ : public IOutputStreamWithResponse
+{
+public:
+ explicit TFileFragmentStreamWithResponse(NApi::IFileFragmentWriterPtr fileWriter)
+ : Underlying_(std::move(fileWriter))
+ {
+ WaitAndProcess(Underlying_->Open());
+ }
+
+ TString GetResponse() const override
+ {
+ if (!Response_) {
+ ythrow TApiUsageError() << "Can't get response before stream is closed.";
+ }
+ return *Response_;
+ }
+
+private:
+ NApi::IFileFragmentWriterPtr Underlying_;
+ TMaybe<TString> Response_;
+
+ void DoWrite(const void* buf, size_t len) override
+ {
+ WaitAndProcess(Underlying_->Write(TSharedRef::MakeCopy<TDefaultSharedBlobTag>(TRef(buf, len))));
+ }
+
+ void DoFinish() override
+ {
+ WaitAndProcess(Underlying_->Close());
+ Response_ = ParseResponse();
+ }
+
+ TString ParseResponse() const
+ {
+ TNode writeResult;
+ TNodeBuilder builder(&writeResult);
+ NYTree::Serialize(Underlying_->GetWriteFragmentResult(), &builder);
+ return NodeToYsonString(writeResult, NYson::EYsonFormat::Binary);
+ }
+};
+
std::unique_ptr<IOutputStreamWithResponse> TRpcRawClient::WriteFileFragment(
- const TDistributedWriteFileCookie& /*cookie*/,
+ const TDistributedWriteFileCookie& cookie,
const TFileFragmentWriterOptions& /*options*/)
{
- ythrow TApiUsageError() << "Not implemented";
+ auto apiCookie = NYTree::ConvertTo<NApi::TSignedWriteFileFragmentCookiePtr>(ToApiNode(cookie.Underlying()));
+ auto fileWriter = Client_->CreateFileFragmentWriter(apiCookie);
+
+ return std::make_unique<TFileFragmentStreamWithResponse>(std::move(fileWriter));
}
TCheckPermissionResponse TRpcRawClient::CheckPermission(
diff --git a/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp
index d8f36e16b6c..f5b0f2460a7 100644
--- a/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp
@@ -1179,6 +1179,64 @@ NApi::TPartitionTablesOptions SerializeOptionsForGetTablePartitions(
return result;
}
+NApi::TDistributedWriteSessionStartOptions SerializeOptionsForStartDistributedTableSession(
+ TMutationId& /*mutationId*/,
+ i64 cookieCount,
+ const TStartDistributedWriteTableOptions& options)
+{
+ NApi::TDistributedWriteSessionStartOptions result;
+
+ result.CookieCount = cookieCount;
+ // TODO(achains): Uncomment when TMutatingOptions are supported in native client distributed API.
+ // SetMutationId(&result, mutationId);
+
+ if (options.Timeout_) {
+ result.Timeout = *options.Timeout_;
+ }
+
+ return result;
+}
+
+NApi::TDistributedWriteSessionFinishOptions SerializeOptionsForFinishDistributedTableSession(
+ TMutationId& /*mutationId*/,
+ const TFinishDistributedWriteTableOptions& /*options*/)
+{
+ NApi::TDistributedWriteSessionFinishOptions result;
+
+ // TODO(achains): Uncomment when TMutatingOptions are supported in native client distributed API.
+ // SetMutationId(&result, mutationId);
+ return result;
+}
+
+NApi::TDistributedWriteFileSessionStartOptions SerializeOptionsForStartDistributedFileSession(
+ TMutationId& /*mutationId*/,
+ i64 cookieCount,
+ const TStartDistributedWriteFileOptions& options)
+{
+ NApi::TDistributedWriteFileSessionStartOptions result;
+
+ result.CookieCount = cookieCount;
+ // TODO(achains): Uncomment when TMutatingOptions are supported in native client distributed API.
+ // SetMutationId(&result, mutationId);
+
+ if (options.Timeout_) {
+ result.Timeout = *options.Timeout_;
+ }
+
+ return result;
+}
+
+NApi::TDistributedWriteFileSessionFinishOptions SerializeOptionsForFinishDistributedFileSession(
+ TMutationId& /*mutationId*/,
+ const TFinishDistributedWriteFileOptions& /*options*/)
+{
+ NApi::TDistributedWriteFileSessionFinishOptions result;
+
+ // TODO(achains): Uncomment when TMutatingOptions are supported in native client distributed API.
+ // SetMutationId(&result, mutationId);
+ return result;
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h
index 3c712efc2b6..a20c910a010 100644
--- a/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h
+++ b/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h
@@ -215,6 +215,24 @@ NApi::TPartitionTablesOptions SerializeOptionsForGetTablePartitions(
const TTransactionId& transactionId,
const TGetTablePartitionsOptions& options);
+NApi::TDistributedWriteSessionStartOptions SerializeOptionsForStartDistributedTableSession(
+ TMutationId& mutationId,
+ i64 cookieCount,
+ const TStartDistributedWriteTableOptions& options);
+
+NApi::TDistributedWriteSessionFinishOptions SerializeOptionsForFinishDistributedTableSession(
+ TMutationId& mutationId,
+ const TFinishDistributedWriteTableOptions& options);
+
+NApi::TDistributedWriteFileSessionStartOptions SerializeOptionsForStartDistributedFileSession(
+ TMutationId& mutationId,
+ i64 cookieCount,
+ const TStartDistributedWriteFileOptions& options);
+
+NApi::TDistributedWriteFileSessionFinishOptions SerializeOptionsForFinishDistributedFileSession(
+ TMutationId& mutationId,
+ const TFinishDistributedWriteFileOptions& options);
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail