diff options
| author | achains <[email protected]> | 2025-11-13 10:43:43 +0300 |
|---|---|---|
| committer | achains <[email protected]> | 2025-11-13 11:03:19 +0300 |
| commit | 7b74fa8fc7cc82d96a0cba57d21b772fd1400a0a (patch) | |
| tree | f36d72a389c6413e56a065295c05c505f953be8c /yt/cpp | |
| parent | 1c9f50a532cbec8d3be39d0cf6a008edb60929b3 (diff) | |
YT-26425: RPC support distributed API
* Changelog entry
Type: feature
Component: cpp-sdk
Support Distributed write API methods for RPC proxies in C\+\+ client.
commit_hash:1b1247f95c77a2de02d16f56dcc3291e772be2f2
Diffstat (limited to 'yt/cpp')
| -rw-r--r-- | yt/cpp/mapreduce/interface/distributed_session.h | 20 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/rpc_client/raw_client.cpp | 282 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp | 58 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h | 18 |
4 files changed, 343 insertions, 35 deletions
diff --git a/yt/cpp/mapreduce/interface/distributed_session.h b/yt/cpp/mapreduce/interface/distributed_session.h index 70cf4db0b5d..2e6e23ce91a 100644 --- a/yt/cpp/mapreduce/interface/distributed_session.h +++ b/yt/cpp/mapreduce/interface/distributed_session.h @@ -11,6 +11,8 @@ #include <library/cpp/yt/misc/strong_typedef.h> +#include <util/datetime/base.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -64,7 +66,16 @@ struct TDistributedWriteTableSessionWithCookies struct TStartDistributedWriteTableOptions { + /// @cond Doxygen_Suppress using TSelf = TStartDistributedWriteTableOptions; + /// @endcond + + /// + /// @brief How long session lives after last ping. + /// + /// If server doesn't receive any pings for session transaction for this time + /// session will be aborted. By default timeout is 15 seconds. + FLUENT_FIELD_OPTION(TDuration, Timeout); }; struct TPingDistributedWriteTableOptions @@ -90,7 +101,16 @@ struct TDistributedWriteFileSessionWithCookies struct TStartDistributedWriteFileOptions { + /// @cond Doxygen_Suppress using TSelf = TStartDistributedWriteFileOptions; + /// @endcond + + /// + /// @brief How long session lives after last ping. + /// + /// If server doesn't receive any pings for session transaction for this time + /// session will be aborted. By default timeout is 15 seconds. + FLUENT_FIELD_OPTION(TDuration, Timeout); }; struct TPingDistributedWriteFileOptions diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp index 64d51f87499..f35fc6ce924 100644 --- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp +++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp @@ -20,6 +20,8 @@ #include <yt/yt/client/table_client/name_table.h> #include <yt/yt/client/table_client/row_buffer.h> +#include <yt/yt/client/signature/signature.h> + #include <yt/yt/client/transaction_client/timestamp_provider.h> #include <yt/yt/core/misc/protobuf_helpers.h> @@ -177,6 +179,11 @@ EJobState FromApiJobState(NJobTrackerClient::EJobState state) } } +NYTree::INodePtr ToApiNode(const TNode& node) +{ + return NYTree::ConvertToNode(NYson::TYsonString(NodeToYsonString(node, NYson::EYsonFormat::Binary))); +} + //////////////////////////////////////////////////////////////////////////////// class TSyncRpcInputStream @@ -1048,8 +1055,7 @@ void TRpcRawClient::ReshardTableByPivotKeys( } NTableClient::TLegacyOwningKey pivotKey; - Deserialize(pivotKey, NYTree::ConvertToNode(NYson::TYsonString( - NodeToYsonString(keysNodesList, NYson::EYsonFormat::Binary)))); + Deserialize(pivotKey, ToApiNode(keysNodesList)); pivotKeys.emplace_back(std::move(pivotKey)); } @@ -1319,8 +1325,7 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable( lowerLimitKeyNode.Add(lowerKeyNode); NTableClient::TOwningKeyBound lowerKeyBound; - Deserialize(lowerKeyBound, NYTree::ConvertToNode(NYson::TYsonString( - NodeToYsonString(lowerLimitKeyNode, NYson::EYsonFormat::Binary)))); + Deserialize(lowerKeyBound, ToApiNode(lowerLimitKeyNode)); auto upperKeyNode = TNode::CreateList(key.Parts_); upperKeyNode.Add(std::numeric_limits<i64>::max()); @@ -1330,8 +1335,7 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable( upperLimitKeyNode.Add(upperKeyNode); NTableClient::TOwningKeyBound upperKeyBound; - Deserialize(upperKeyBound, NYTree::ConvertToNode(NYson::TYsonString( - NodeToYsonString(upperLimitKeyNode, NYson::EYsonFormat::Binary)))); + Deserialize(upperKeyBound, ToApiNode(upperLimitKeyNode)); auto richPath = ToApiRichPath(path); richPath.SetRanges({ @@ -1427,68 +1431,276 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const NApi::TCheckPermissi } TDistributedWriteTableSessionWithCookies TRpcRawClient::StartDistributedWriteTableSession( - TMutationId& /*mutationId*/, - const TRichYPath& /*richPath*/, - i64 /*cookieCount*/, - const TStartDistributedWriteTableOptions& /*options*/) + TMutationId& mutationId, + const TRichYPath& richPath, + i64 cookieCount, + const TStartDistributedWriteTableOptions& options) { - ythrow TApiUsageError() << "Not implemented"; + auto future = Client_->StartDistributedWriteSession( + ToApiRichPath(richPath), + SerializeOptionsForStartDistributedTableSession(mutationId, cookieCount, options)); + + auto apiSession = WaitAndProcess(future); + + TNode session; + TNodeBuilder builder(&session); + NYTree::Serialize(apiSession.Session, &builder); + + TVector<TDistributedWriteTableCookie> cookies; + cookies.reserve(apiSession.Cookies.size()); + for (const auto& cookie : apiSession.Cookies) { + TNode cookieNode; + TNodeBuilder cookieNodeBuilder(&cookieNode); + NYTree::Serialize(cookie, &cookieNodeBuilder); + cookies.push_back(TDistributedWriteTableCookie(std::move(cookieNode))); + } + + TDistributedWriteTableSessionWithCookies result; + result.Session(TDistributedWriteTableSession(std::move(session))); + result.Cookies(std::move(cookies)); + return result; } void TRpcRawClient::PingDistributedWriteTableSession( - const TDistributedWriteTableSession& /*session*/, + const TDistributedWriteTableSession& session, const TPingDistributedWriteTableOptions& /*options*/) { - ythrow TApiUsageError() << "Not implemented"; + auto apiSession = NYTree::ConvertTo<NApi::TSignedDistributedWriteSessionPtr>(ToApiNode(session.Underlying())); + + auto future = Client_->PingDistributedWriteSession(apiSession); + WaitAndProcess(future); } void TRpcRawClient::FinishDistributedWriteTableSession( - TMutationId& /*mutationId*/, - const TDistributedWriteTableSession& /*session*/, - const TVector<TWriteTableFragmentResult>& /*results*/, - const TFinishDistributedWriteTableOptions& /*options*/) + TMutationId& mutationId, + const TDistributedWriteTableSession& session, + const TVector<TWriteTableFragmentResult>& results, + const TFinishDistributedWriteTableOptions& options) { - ythrow TApiUsageError() << "Not implemented"; + auto apiSession = NYTree::ConvertTo<NApi::TSignedDistributedWriteSessionPtr>(ToApiNode(session.Underlying())); + + std::vector<NApi::TSignedWriteFragmentResultPtr> apiResults; + apiResults.reserve(results.size()); + + for (const auto& writeFragment : results) { + apiResults.push_back(NYTree::ConvertTo<NApi::TSignedWriteFragmentResultPtr>(ToApiNode(writeFragment.Underlying()))); + } + + NApi::TDistributedWriteSessionWithResults sessionWithResults; + sessionWithResults.Session = std::move(apiSession); + sessionWithResults.Results = std::move(apiResults); + + auto future = Client_->FinishDistributedWriteSession( + sessionWithResults, + SerializeOptionsForFinishDistributedTableSession(mutationId, options)); + + WaitAndProcess(future); } +class TTableFragmentStreamWithResponse + : public IOutputStreamWithResponse +{ + using TPromise = TPromise<NApi::TSignedWriteFragmentResultPtr>; + +public: + TTableFragmentStreamWithResponse(std::unique_ptr<IOutputStream> underlying, TPromise promise) + : Underlying_(std::move(underlying)) + , ResponsePromise_(std::move(promise)) + { } + + TString GetResponse() const override + { + if (!ResponsePromise_.IsSet()) { + ythrow TApiUsageError() << "Can't get response before stream is closed."; + } + return Response_; + } + +private: + const std::unique_ptr<IOutputStream> Underlying_; + TPromise ResponsePromise_; + TString Response_; + + void DoWrite(const void* buf, size_t len) override + { + Underlying_->Write(buf, len); + } + + void DoFinish() override + { + Underlying_->Finish(); + Response_ = ParseResponse(); + } + + TString ParseResponse() const + { + auto rsp = ResponsePromise_.Get().ValueOrThrow(); + TNode writeResult; + TNodeBuilder builder(&writeResult); + NYTree::Serialize(rsp, &builder); + return NodeToYsonString(writeResult, NYson::EYsonFormat::Binary); + } +}; + std::unique_ptr<IOutputStreamWithResponse> TRpcRawClient::WriteTableFragment( - const TDistributedWriteTableCookie& /*cookie*/, - const TMaybe<TFormat>& /*format*/, + const TDistributedWriteTableCookie& cookie, + const TMaybe<TFormat>& format, const TTableFragmentWriterOptions& /*options*/) { - ythrow TApiUsageError() << "Not implemented"; + using TRspPtr = TIntrusivePtr<NRpc::TTypedClientResponse<NApi::NRpcProxy::NProto::TRspWriteTableFragment>>; + + auto* clientBase = VerifyDynamicCast<NApi::NRpcProxy::TClientBase*>(Client_.Get()); + + auto proxy = clientBase->CreateApiServiceProxy(); + + auto req = proxy.WriteTableFragment(); + clientBase->InitStreamingRequest(*req); + + req->set_signed_cookie(NYson::TYsonString(NodeToYsonString(cookie.Underlying(), NYson::EYsonFormat::Text)).ToString()); + + if (format) { + req->set_format(NYson::TYsonString(NodeToYsonString(format->Config, NYson::EYsonFormat::Text)).ToString()); + } + + auto promise = NewPromise<NApi::TSignedWriteFragmentResultPtr>(); + + auto future = CreateRpcClientOutputStream( + std::move(req), + BIND ([=] (const TSharedRef& metaRef) { + NApi::NRpcProxy::NProto::TWriteTableMeta meta; + if (!TryDeserializeProto(&meta, metaRef)) { + THROW_ERROR_EXCEPTION("Failed to deserialize schema for table fragment writer"); + } + }), + BIND([=] (TRspPtr&& rsp) { + promise.Set(ConvertTo<NApi::TSignedWriteFragmentResultPtr>(NYson::TYsonString(rsp->signed_write_result()))); + })); + + auto stream = WaitAndProcess(future); + + auto rowStream = New<TSerializingRowStream>(std::move(stream)); + auto syncStream = std::make_unique<TSyncRpcOutputStream>(std::move(rowStream)); + + return std::make_unique<TTableFragmentStreamWithResponse>(std::move(syncStream), std::move(promise)); } TDistributedWriteFileSessionWithCookies TRpcRawClient::StartDistributedWriteFileSession( - TMutationId& /*mutationId*/, - const TRichYPath& /*richPath*/, - i64 /*cookieCount*/, - const TStartDistributedWriteFileOptions& /*options*/) + TMutationId& mutationId, + const TRichYPath& richPath, + i64 cookieCount, + const TStartDistributedWriteFileOptions& options) { - ythrow TApiUsageError() << "Not implemented"; + auto future = Client_->StartDistributedWriteFileSession( + ToApiRichPath(richPath), + SerializeOptionsForStartDistributedFileSession(mutationId, cookieCount, options)); + + auto apiSession = WaitAndProcess(future); + + TNode session; + TNodeBuilder builder(&session); + NYTree::Serialize(apiSession.Session, &builder); + + TVector<TDistributedWriteFileCookie> cookies; + cookies.reserve(apiSession.Cookies.size()); + for (const auto& cookie : apiSession.Cookies) { + TNode cookieNode; + TNodeBuilder cookieNodeBuilder(&cookieNode); + NYTree::Serialize(cookie, &cookieNodeBuilder); + cookies.push_back(TDistributedWriteFileCookie(std::move(cookieNode))); + } + + TDistributedWriteFileSessionWithCookies result; + result.Session(TDistributedWriteFileSession(std::move(session))); + result.Cookies(std::move(cookies)); + return result; } void TRpcRawClient::PingDistributedWriteFileSession( - const TDistributedWriteFileSession& /*session*/, + const TDistributedWriteFileSession& session, const TPingDistributedWriteFileOptions& /*options*/) { - ythrow TApiUsageError() << "Not implemented"; + auto apiSession = NYTree::ConvertTo<NApi::TSignedDistributedWriteFileSessionPtr>(ToApiNode(session.Underlying())); + + auto future = Client_->PingDistributedWriteFileSession(apiSession); + WaitAndProcess(future); } void TRpcRawClient::FinishDistributedWriteFileSession( - TMutationId& /*mutationId*/, - const TDistributedWriteFileSession& /*session*/, - const TVector<TWriteFileFragmentResult>& /*results*/, - const TFinishDistributedWriteFileOptions& /*options*/) + TMutationId& mutationId, + const TDistributedWriteFileSession& session, + const TVector<TWriteFileFragmentResult>& results, + const TFinishDistributedWriteFileOptions& options) { - ythrow TApiUsageError() << "Not implemented"; + auto apiSession = NYTree::ConvertTo<NApi::TSignedDistributedWriteFileSessionPtr>(ToApiNode(session.Underlying())); + + std::vector<NApi::TSignedWriteFileFragmentResultPtr> apiResults; + apiResults.reserve(results.size()); + + for (const auto& writeFragment : results) { + apiResults.push_back(NYTree::ConvertTo<NApi::TSignedWriteFileFragmentResultPtr>(ToApiNode(writeFragment.Underlying()))); + } + + NApi::TDistributedWriteFileSessionWithResults sessionWithResults; + sessionWithResults.Session = std::move(apiSession); + sessionWithResults.Results = std::move(apiResults); + + auto future = Client_->FinishDistributedWriteFileSession( + sessionWithResults, + SerializeOptionsForFinishDistributedFileSession(mutationId, options)); + + WaitAndProcess(future); } +class TFileFragmentStreamWithResponse + : public IOutputStreamWithResponse +{ +public: + explicit TFileFragmentStreamWithResponse(NApi::IFileFragmentWriterPtr fileWriter) + : Underlying_(std::move(fileWriter)) + { + WaitAndProcess(Underlying_->Open()); + } + + TString GetResponse() const override + { + if (!Response_) { + ythrow TApiUsageError() << "Can't get response before stream is closed."; + } + return *Response_; + } + +private: + NApi::IFileFragmentWriterPtr Underlying_; + TMaybe<TString> Response_; + + void DoWrite(const void* buf, size_t len) override + { + WaitAndProcess(Underlying_->Write(TSharedRef::MakeCopy<TDefaultSharedBlobTag>(TRef(buf, len)))); + } + + void DoFinish() override + { + WaitAndProcess(Underlying_->Close()); + Response_ = ParseResponse(); + } + + TString ParseResponse() const + { + TNode writeResult; + TNodeBuilder builder(&writeResult); + NYTree::Serialize(Underlying_->GetWriteFragmentResult(), &builder); + return NodeToYsonString(writeResult, NYson::EYsonFormat::Binary); + } +}; + std::unique_ptr<IOutputStreamWithResponse> TRpcRawClient::WriteFileFragment( - const TDistributedWriteFileCookie& /*cookie*/, + const TDistributedWriteFileCookie& cookie, const TFileFragmentWriterOptions& /*options*/) { - ythrow TApiUsageError() << "Not implemented"; + auto apiCookie = NYTree::ConvertTo<NApi::TSignedWriteFileFragmentCookiePtr>(ToApiNode(cookie.Underlying())); + auto fileWriter = Client_->CreateFileFragmentWriter(apiCookie); + + return std::make_unique<TFileFragmentStreamWithResponse>(std::move(fileWriter)); } TCheckPermissionResponse TRpcRawClient::CheckPermission( diff --git a/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp index d8f36e16b6c..f5b0f2460a7 100644 --- a/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.cpp @@ -1179,6 +1179,64 @@ NApi::TPartitionTablesOptions SerializeOptionsForGetTablePartitions( return result; } +NApi::TDistributedWriteSessionStartOptions SerializeOptionsForStartDistributedTableSession( + TMutationId& /*mutationId*/, + i64 cookieCount, + const TStartDistributedWriteTableOptions& options) +{ + NApi::TDistributedWriteSessionStartOptions result; + + result.CookieCount = cookieCount; + // TODO(achains): Uncomment when TMutatingOptions are supported in native client distributed API. + // SetMutationId(&result, mutationId); + + if (options.Timeout_) { + result.Timeout = *options.Timeout_; + } + + return result; +} + +NApi::TDistributedWriteSessionFinishOptions SerializeOptionsForFinishDistributedTableSession( + TMutationId& /*mutationId*/, + const TFinishDistributedWriteTableOptions& /*options*/) +{ + NApi::TDistributedWriteSessionFinishOptions result; + + // TODO(achains): Uncomment when TMutatingOptions are supported in native client distributed API. + // SetMutationId(&result, mutationId); + return result; +} + +NApi::TDistributedWriteFileSessionStartOptions SerializeOptionsForStartDistributedFileSession( + TMutationId& /*mutationId*/, + i64 cookieCount, + const TStartDistributedWriteFileOptions& options) +{ + NApi::TDistributedWriteFileSessionStartOptions result; + + result.CookieCount = cookieCount; + // TODO(achains): Uncomment when TMutatingOptions are supported in native client distributed API. + // SetMutationId(&result, mutationId); + + if (options.Timeout_) { + result.Timeout = *options.Timeout_; + } + + return result; +} + +NApi::TDistributedWriteFileSessionFinishOptions SerializeOptionsForFinishDistributedFileSession( + TMutationId& /*mutationId*/, + const TFinishDistributedWriteFileOptions& /*options*/) +{ + NApi::TDistributedWriteFileSessionFinishOptions result; + + // TODO(achains): Uncomment when TMutatingOptions are supported in native client distributed API. + // SetMutationId(&result, mutationId); + return result; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h index 3c712efc2b6..a20c910a010 100644 --- a/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/rpc_client/rpc_parameters_serialization.h @@ -215,6 +215,24 @@ NApi::TPartitionTablesOptions SerializeOptionsForGetTablePartitions( const TTransactionId& transactionId, const TGetTablePartitionsOptions& options); +NApi::TDistributedWriteSessionStartOptions SerializeOptionsForStartDistributedTableSession( + TMutationId& mutationId, + i64 cookieCount, + const TStartDistributedWriteTableOptions& options); + +NApi::TDistributedWriteSessionFinishOptions SerializeOptionsForFinishDistributedTableSession( + TMutationId& mutationId, + const TFinishDistributedWriteTableOptions& options); + +NApi::TDistributedWriteFileSessionStartOptions SerializeOptionsForStartDistributedFileSession( + TMutationId& mutationId, + i64 cookieCount, + const TStartDistributedWriteFileOptions& options); + +NApi::TDistributedWriteFileSessionFinishOptions SerializeOptionsForFinishDistributedFileSession( + TMutationId& mutationId, + const TFinishDistributedWriteFileOptions& options); + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail |
