diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-10-14 23:33:03 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-10-14 23:43:57 +0300 |
commit | 9cd00d5f998588423d9ff891fbc504d9cd09bb31 (patch) | |
tree | fac56bcb6929d7b653b1a41800f4943fb37dc264 | |
parent | 608385daa2cf9d3ccca4e8d2354d44451cb58a85 (diff) | |
download | ydb-9cd00d5f998588423d9ff891fbc504d9cd09bb31.tar.gz |
YT-22307: Implement distributed write protocol (no signatures yet)
MaxChildrenPerAttachRequest default value is the default from the same parameter in operation controller.
commit_hash:e977687ef81e62ad75d8eb57b156e4efe4ee132d
25 files changed, 519 insertions, 232 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index 5b5e9accf3..6f85657139 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -856,9 +856,9 @@ public: const TDistributedWriteSessionFinishOptions& options), (std::move(session), options)) - DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, ( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options), + DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, ( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options), (cookie, options)) // Shuffle Service diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h index 9ac983371c..d1bca5bc3a 100644 --- a/yt/yt/client/api/distributed_table_client.h +++ b/yt/yt/client/api/distributed_table_client.h @@ -4,6 +4,8 @@ #include <yt/yt/client/table_client/config.h> +#include <library/cpp/yt/misc/non_null_ptr.h> + namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// @@ -13,10 +15,11 @@ struct TDistributedWriteSessionStartOptions { }; struct TDistributedWriteSessionFinishOptions - : public TTransactionalOptions -{ }; +{ + int MaxChildrenPerAttachRequest = 10'000; +}; -struct TParticipantTableWriterOptions +struct TFragmentTableWriterOptions : public TTableWriterOptions { }; @@ -24,6 +27,7 @@ struct TParticipantTableWriterOptions struct IDistributedTableClientBase { +public: virtual ~IDistributedTableClientBase() = default; virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( @@ -33,6 +37,13 @@ struct IDistributedTableClientBase virtual TFuture<void> FinishDistributedWriteSession( TDistributedWriteSessionPtr session, const TDistributedWriteSessionFinishOptions& options = {}) = 0; + + // Helper used to implement FinishDistributedWriteSession efficiently + // without compromising privacy of session fields. + // defined in yt/yt/client/api/distributed_table_session.cpp. + void* GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session); + // Used in chunk writer for results recording + void RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult); }; //////////////////////////////////////////////////////////////////////////////// @@ -41,9 +52,9 @@ struct IDistributedTableClient { virtual ~IDistributedTableClient() = default; - virtual TFuture<ITableWriterPtr> CreateParticipantTableWriter( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options = {}) = 0; + virtual TFuture<ITableWriterPtr> CreateFragmentTableWriter( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/distributed_table_session.cpp b/yt/yt/client/api/distributed_table_session.cpp new file mode 100644 index 0000000000..d67998b9d2 --- /dev/null +++ b/yt/yt/client/api/distributed_table_session.cpp @@ -0,0 +1,197 @@ +#include "distributed_table_session.h" + +#include "client.h" +#include "transaction.h" + +namespace NYT::NApi { + +using namespace NYPath; +using namespace NObjectClient; +using namespace NTableClient; +using namespace NTransactionClient; +using namespace NYTree; +using namespace NCypressClient; +using namespace NChunkClient; + +//////////////////////////////////////////////////////////////////////////////// + +TTableWriterPatchInfo::TTableWriterPatchInfo( + TRichYPath richPath, + TObjectId objectId, + TCellTag externalCellTag, + TMasterTableSchemaId chunkSchemaId, + TTableSchemaPtr chunkSchema, + std::optional<TLegacyOwningKey> writerLastKey, + int maxHeavyColumns, + TTimestamp timestamp, + const IAttributeDictionary& tableAttributes) + : TTableWriterPatchInfo() +{ + ObjectId = objectId; + RichPath = std::move(richPath); + + ChunkSchemaId = chunkSchemaId; + ChunkSchema = std::move(chunkSchema); + + WriterLastKey = writerLastKey; + MaxHeavyColumns = maxHeavyColumns; + + TableAttributes = tableAttributes.ToMap(); + + ExternalCellTag = externalCellTag; + + Timestamp = timestamp; +} + +void TTableWriterPatchInfo::Register(TRegistrar registrar) +{ + registrar.Parameter("table_id", &TThis::ObjectId); + registrar.Parameter("table_path", &TThis::RichPath); + + registrar.Parameter("chunk_schema_id", &TThis::ChunkSchemaId); + registrar.Parameter("chunk_schema", &TThis::ChunkSchema); + + registrar.Parameter("writer_last_key", &TThis::WriterLastKey); + registrar.Parameter("max_heavy_columns", &TThis::MaxHeavyColumns); + + registrar.Parameter("table_attributes", &TThis::TableAttributes); + + registrar.Parameter("external_cell_tag", &TThis::ExternalCellTag); + + registrar.Parameter("timestamp", &TThis::Timestamp); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TFragmentWriteResult::Register(TRegistrar registrar) +{ + registrar.Parameter("min_boundary_key", &TThis::MinBoundaryKey); + registrar.Parameter("max_boundary_key", &TThis::MaxBoundaryKey); + registrar.Parameter("chunk_list_id", &TThis::ChunkListId); +} + +//////////////////////////////////////////////////////////////////////////////// + +const TTableWriterPatchInfo& TFragmentWriteCookie::GetPatchInfo() const +{ + return PatchInfo_; +} + +TTransactionId TFragmentWriteCookie::GetMainTransactionId() const +{ + return MainTxId_; +} + +TTransactionId TFragmentWriteCookie::GetUploadTransactionId() const +{ + return UploadTxId_; +} + +void TFragmentWriteCookie::Register(TRegistrar registrar) +{ + registrar.Parameter("session_id", &TThis::Id_); + + registrar.Parameter("tx_id", &TThis::MainTxId_); + registrar.Parameter("upload_tx_id", &TThis::UploadTxId_); + + registrar.Parameter("patch_info", &TThis::PatchInfo_); + + registrar.Parameter("writer_results", &TThis::WriteResults_); +} + +//////////////////////////////////////////////////////////////////////////////// + +TDistributedWriteSession::TDistributedWriteSession( + TTransactionId mainTxId, + TTransactionId uploadTxId, + TChunkListId rootChunkListId, + TTableWriterPatchInfo patchInfo) + : TDistributedWriteSession() +{ + Id_ = TDistributedWriteSessionId(TGuid::Create()); + MainTxId_ = mainTxId; + UploadTxId_ = uploadTxId; + + RootChunkListId_ = rootChunkListId; + + PatchInfo_ = std::move(patchInfo); + + // Signatures? +} + +TTransactionId TDistributedWriteSession::GetMainTransactionId() const +{ + return MainTxId_; +} + +TTransactionId TDistributedWriteSession::GetUploadTransactionId() const +{ + return UploadTxId_; +} + +const TTableWriterPatchInfo& TDistributedWriteSession::GetPatchInfo() const Y_LIFETIME_BOUND +{ + return PatchInfo_; +} + +TChunkListId TDistributedWriteSession::GetRootChunkListId() const +{ + return RootChunkListId_; +} + +TFragmentWriteCookiePtr TDistributedWriteSession::GiveCookie() +{ + auto cookie = New<TFragmentWriteCookie>(); + cookie->Id_ = Id_; + cookie->MainTxId_ = MainTxId_; + cookie->UploadTxId_ = UploadTxId_; + cookie->PatchInfo_ = PatchInfo_; + + return cookie; +} + +void TDistributedWriteSession::TakeCookie(TFragmentWriteCookiePtr cookie) +{ + // Verify cookie signature? + WriteResults_.reserve(std::ssize(WriteResults_) + std::ssize(cookie->WriteResults_)); + for (const auto& writeResult : cookie->WriteResults_) { + WriteResults_.push_back(writeResult); + } +} + +TFuture<void> TDistributedWriteSession::Ping(IClientPtr client) +{ + // NB(arkady-e1ppa): AutoAbort = false by default. + auto mainTx = client->AttachTransaction(MainTxId_); + + return mainTx->Ping(); +} + +void TDistributedWriteSession::Register(TRegistrar registrar) +{ + registrar.Parameter("session_id", &TThis::Id_); + registrar.Parameter("tx_id", &TThis::MainTxId_); + registrar.Parameter("upload_tx_id", &TThis::UploadTxId_); + + registrar.Parameter("root_chunk_list_id", &TThis::RootChunkListId_); + + registrar.Parameter("patch_info", &TThis::PatchInfo_); + + registrar.Parameter("write_results", &TThis::WriteResults_); +} + +//////////////////////////////////////////////////////////////////////////////// + +void* IDistributedTableClientBase::GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session) +{ + return static_cast<void*>(&session->WriteResults_); +} + +void IDistributedTableClientBase::RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult) +{ + auto* concrete = static_cast<TFragmentWriteResult*>(opaqueWriteResult); + YT_ASSERT(concrete); + cookie->WriteResults_.push_back(*concrete); +} + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_session.h b/yt/yt/client/api/distributed_table_session.h new file mode 100644 index 0000000000..1932284206 --- /dev/null +++ b/yt/yt/client/api/distributed_table_session.h @@ -0,0 +1,158 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/ytlib/table_client/table_upload_options.h> + +#include <yt/yt/client/chunk_client/public.h> + +#include <yt/yt/client/table_client/key.h> + +#include <yt/yt/client/ypath/rich.h> + +#include <yt/yt/core/ytree/yson_struct.h> + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid); + +//////////////////////////////////////////////////////////////////////////////// + +// Used by distributed table writer to patch +// its config based on table data. +// NB(arkady-e1ppa): TableUploadOptions are +// encoded in RichPath + TableAttributes and thus +// are not required to be stored directly. +struct TTableWriterPatchInfo + : public NYTree::TYsonStructLite +{ +public: + NObjectClient::TObjectId ObjectId; + + NYPath::TRichYPath RichPath; + + // NB(arkady-e1ppa): Empty writer last key Serialization into Deserialization + // somehow results in a []; row which is considered non-empty for some reason. + // This matters too little for me to bother saving up the std::optional wrapper. + std::optional<NTableClient::TLegacyOwningKey> WriterLastKey; + int MaxHeavyColumns; + + NTableClient::TMasterTableSchemaId ChunkSchemaId; + NTableClient::TTableSchemaPtr ChunkSchema; + + NYTree::INodePtr TableAttributes; + + NObjectClient::TCellTag ExternalCellTag; + + NTransactionClient::TTimestamp Timestamp; + + TTableWriterPatchInfo( + NYPath::TRichYPath richPath, + NObjectClient::TObjectId objectId, + NObjectClient::TCellTag externalCellTag, + NTableClient::TMasterTableSchemaId chunkSchemaId, + NTableClient::TTableSchemaPtr chunkSchema, + std::optional<NTableClient::TLegacyOwningKey> writerLastKey, + int maxHeavyColumns, + NTransactionClient::TTimestamp timestamp, + const NYTree::IAttributeDictionary& tableAttributes); + + REGISTER_YSON_STRUCT_LITE(TTableWriterPatchInfo) + + static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TFragmentWriteResult + : public NYTree::TYsonStructLite +{ + NTableClient::TLegacyOwningKey MinBoundaryKey; + NTableClient::TLegacyOwningKey MaxBoundaryKey; + NChunkClient::TChunkListId ChunkListId; + + REGISTER_YSON_STRUCT_LITE(TFragmentWriteResult) + + static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TFragmentWriteCookie + : public NYTree::TYsonStruct +{ +public: + const TTableWriterPatchInfo& GetPatchInfo() const; + + NCypressClient::TTransactionId GetMainTransactionId() const; + NCypressClient::TTransactionId GetUploadTransactionId() const; + +private: + TDistributedWriteSessionId Id_; + + NCypressClient::TTransactionId MainTxId_; + NCypressClient::TTransactionId UploadTxId_; + + TTableWriterPatchInfo PatchInfo_; + + std::vector<TFragmentWriteResult> WriteResults_; + + REGISTER_YSON_STRUCT(TFragmentWriteCookie); + + static void Register(TRegistrar registrar); + + friend class TDistributedWriteSession; + friend struct IDistributedTableClientBase; +}; + +DEFINE_REFCOUNTED_TYPE(TFragmentWriteCookie); + +//////////////////////////////////////////////////////////////////////////////// + +class TDistributedWriteSession + : public NYTree::TYsonStruct +{ +public: + TDistributedWriteSession( + NCypressClient::TTransactionId mainTxId, + NCypressClient::TTransactionId uploadTxId, + NChunkClient::TChunkListId rootChunkListId, + TTableWriterPatchInfo patchInfo); + + NCypressClient::TTransactionId GetMainTransactionId() const; + NCypressClient::TTransactionId GetUploadTransactionId() const; + const TTableWriterPatchInfo& GetPatchInfo() const Y_LIFETIME_BOUND; + NChunkClient::TChunkListId GetRootChunkListId() const; + + TFragmentWriteCookiePtr GiveCookie(); + void TakeCookie(TFragmentWriteCookiePtr cookie); + + TFuture<void> Ping(IClientPtr client); + + REGISTER_YSON_STRUCT(TDistributedWriteSession); + + static void Register(TRegistrar registrar); + +private: + TDistributedWriteSessionId Id_; + + NCypressClient::TTransactionId MainTxId_; + NCypressClient::TTransactionId UploadTxId_; + + NChunkClient::TChunkListId RootChunkListId_; + + TTableWriterPatchInfo PatchInfo_; + + // This is used to commit changes when session is over. + std::vector<TFragmentWriteResult> WriteResults_; + + friend struct IDistributedTableClientBase; +}; + +DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_sessions.cpp b/yt/yt/client/api/distributed_table_sessions.cpp deleted file mode 100644 index b454ad476b..0000000000 --- a/yt/yt/client/api/distributed_table_sessions.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include "distributed_table_sessions.h" - -namespace NYT::NApi { - -//////////////////////////////////////////////////////////////////////////////// - -void TDistributedWriteCookie::Register(TRegistrar /*registrar*/) -{ } - -//////////////////////////////////////////////////////////////////////////////// - -void TDistributedWriteSession::Register(TRegistrar /*registrar*/) -{ } - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_sessions.h b/yt/yt/client/api/distributed_table_sessions.h deleted file mode 100644 index 19b3c0f5c0..0000000000 --- a/yt/yt/client/api/distributed_table_sessions.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/ytree/yson_struct.h> - -namespace NYT::NApi { - -//////////////////////////////////////////////////////////////////////////////// - -YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid); - -//////////////////////////////////////////////////////////////////////////////// - -class TDistributedWriteCookie - : public NYTree::TYsonStruct -{ -public: - REGISTER_YSON_STRUCT(TDistributedWriteCookie); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TDistributedWriteCookie); - -//////////////////////////////////////////////////////////////////////////////// - -class TDistributedWriteSession - : public NYTree::TYsonStruct -{ -public: - REGISTER_YSON_STRUCT(TDistributedWriteSession); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NApi diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index be710a04ce..47eea56fb8 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -188,7 +188,8 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest) DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter) DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession) -DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie) +DECLARE_REFCOUNTED_CLASS(TFragmentWriteCookie) +struct IDistributedTableClientBase; DECLARE_REFCOUNTED_STRUCT(TShuffleHandle) diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index 4dcb04c33f..5f1c945e3f 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -208,7 +208,7 @@ public: // Distributed table client DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartDistributedWriteSession); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishDistributedWriteSession); - DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable, + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, WriteTableFragment, .SetStreamingEnabled(true)); // Shuffle service diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index c86a1305af..6ad3c152b4 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -12,7 +12,7 @@ #include "table_writer.h" #include "transaction.h" -#include <yt/yt/client/api/distributed_table_sessions.h> +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/file_reader.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 50bf51f247..430b366714 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -8,6 +8,7 @@ #include "timestamp_provider.h" #include "transaction.h" +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/helpers.h> #include <yt/yt/client/api/rowset.h> #include <yt/yt/client/api/transaction.h> @@ -752,12 +753,14 @@ TFuture<void> TClient::AlterReplicationCard( return req->Invoke().As<void>(); } -TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options) +TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options) { + using TRspPtr = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspWriteTableFragment>>; + auto proxy = CreateApiServiceProxy(); - auto req = proxy.ParticipantWriteTable(); + auto req = proxy.WriteTableFragment(); InitStreamingRequest(*req); FillRequest(req.Get(), cookie, options); @@ -768,12 +771,15 @@ TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter( BIND ([=] (const TSharedRef& metaRef) { NApi::NRpcProxy::NProto::TWriteTableMeta meta; if (!TryDeserializeProto(&meta, metaRef)) { - THROW_ERROR_EXCEPTION("Failed to deserialize schema for participant table writer"); + THROW_ERROR_EXCEPTION("Failed to deserialize schema for fragment table writer"); } FromProto(schema.Get(), meta.schema()); + }), + BIND([=] (TRspPtr&& rsp) mutable { + Deserialize(*cookie, ConvertToNode(TYsonString(rsp->cookie()))); })) - .Apply(BIND([=] (IAsyncZeroCopyOutputStreamPtr outputStream) { + .ApplyUnique(BIND([=] (IAsyncZeroCopyOutputStreamPtr&& outputStream) { return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema)); })).As<ITableWriterPtr>(); } diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index 9f8ee540a2..16e9400391 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -130,9 +130,9 @@ public: const TAlterReplicationCardOptions& options = {}) override; // Distributed table client - TFuture<ITableWriterPtr> CreateParticipantTableWriter( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options) override; + TFuture<ITableWriterPtr> CreateFragmentTableWriter( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options) override; // Queues. TFuture<NQueueClient::IQueueRowsetPtr> PullQueue( diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index af3b4b7eb6..3a6a3650be 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -1,5 +1,6 @@ #include "helpers.h" +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/rowset.h> #include <yt/yt/client/api/table_client.h> @@ -58,6 +59,17 @@ void ToProto( proto->set_suppress_upstream_sync(options.SuppressUpstreamSync); } +void FromProto( + NApi::TTransactionalOptions* options, + const NProto::TTransactionalOptions& proto) +{ + FromProto(&options->TransactionId, proto.transaction_id()); + options->Ping = proto.ping(); + options->PingAncestors = proto.ping_ancestors(); + options->SuppressTransactionCoordinatorSync = proto.suppress_transaction_coordinator_sync(); + options->SuppressUpstreamSync = proto.suppress_upstream_sync(); +} + void ToProto( NProto::TPrerequisiteOptions* proto, const NApi::TPrerequisiteOptions& options) @@ -1946,28 +1958,22 @@ void FillRequest( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options) { - Y_UNUSED(req, path, options); + ToProto(req->mutable_path(), path); + + if (options.TransactionId) { + ToProto(req->mutable_transactional_options(), options); + } } -void FromProto( +void ParseRequest( + NYPath::TRichYPath* mutablePath, TDistributedWriteSessionStartOptions* mutableOptions, const TReqStartDistributedWriteSession& req) { - Y_UNUSED(req, mutableOptions); -} - -void FromProto( - TDistributedWriteSession* mutableSession, - TRspStartDistributedWriteSession&& rsp) -{ - Y_UNUSED(mutableSession, rsp); -} - -void ToProto( - TRspStartDistributedWriteSession* rsp, - const TDistributedWriteSessionPtr& session) -{ - Y_UNUSED(rsp, session); + *mutablePath = FromProto<NYPath::TRichYPath>(req.path()); + if (req.has_transactional_options()) { + FromProto(mutableOptions, req.transactional_options()); + } } //////////////////////////////////////////////////////////////////////////////// @@ -1977,52 +1983,44 @@ void FillRequest( TDistributedWriteSessionPtr session, const TDistributedWriteSessionFinishOptions& options) { - Y_UNUSED(req, session, options); + req->set_session(ConvertToYsonString(session).ToString()); + req->set_max_children_per_attach_request(options.MaxChildrenPerAttachRequest); } -void FromProto( +void ParseRequest( + TDistributedWriteSessionPtr* mutableSession, TDistributedWriteSessionFinishOptions* mutableOptions, const TReqFinishDistributedWriteSession& req) { - Y_UNUSED(req, mutableOptions); -} - -void FromProto( - TDistributedWriteSession* mutableSession, - const TReqFinishDistributedWriteSession& req) -{ - Y_UNUSED(mutableSession, req); + *mutableSession = ConvertTo<TDistributedWriteSessionPtr>(TYsonString(req.session())); + mutableOptions->MaxChildrenPerAttachRequest = req.max_children_per_attach_request(); } //////////////////////////////////////////////////////////////////////////////// void FillRequest( - TReqParticipantWriteTable* req, - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options) + TReqWriteTableFragment* req, + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options) { - Y_UNUSED(req, cookie, options); -} + req->set_cookie(ConvertToYsonString(cookie).ToString()); -void FromProto( - TParticipantTableWriterOptions* mutableOptions, - const TReqParticipantWriteTable& req) -{ - Y_UNUSED(req, mutableOptions); -} - -void FromProto( - TDistributedWriteCookie* cookie, - const TReqParticipantWriteTable& req) -{ - Y_UNUSED(cookie, req); + if (options.Config) { + req->set_config(ConvertToYsonString(*options.Config).ToString()); + } } -void ToProto( - TRspParticipantWriteTable* rsp, - const TDistributedWriteCookiePtr& cookie) +void ParseRequest( + TFragmentWriteCookiePtr* mutableCookie, + TFragmentTableWriterOptions* mutableOptions, + const TReqWriteTableFragment& req) { - Y_UNUSED(rsp, cookie); + *mutableCookie = ConvertTo<TFragmentWriteCookiePtr>(TYsonString(req.cookie())); + if (req.has_config()) { + mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(req.config())); + } else { + mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(TStringBuf("{}"))); + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h index ca99986c72..8849b7be44 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.h +++ b/yt/yt/client/api/rpc_proxy/helpers.h @@ -30,6 +30,10 @@ void ToProto( NProto::TTransactionalOptions* proto, const NApi::TTransactionalOptions& options); +void FromProto( + NApi::TTransactionalOptions* options, + const NProto::TTransactionalOptions& proto); + void ToProto( NProto::TPrerequisiteOptions* proto, const NApi::TPrerequisiteOptions& options); @@ -287,18 +291,11 @@ void FillRequest( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options); -void FromProto( +void ParseRequest( + NYPath::TRichYPath* mutablePath, TDistributedWriteSessionStartOptions* mutableOptions, const TReqStartDistributedWriteSession& req); -void FromProto( - TDistributedWriteSession* mutableSession, - TRspStartDistributedWriteSession&& rsp); - -void ToProto( - TRspStartDistributedWriteSession* rsp, - const TDistributedWriteSessionPtr& session); - //////////////////////////////////////////////////////////////////////////////// void FillRequest( @@ -306,32 +303,22 @@ void FillRequest( TDistributedWriteSessionPtr session, const TDistributedWriteSessionFinishOptions& options); -void FromProto( +void ParseRequest( + TDistributedWriteSessionPtr* mutableSession, TDistributedWriteSessionFinishOptions* mutableOptions, const TReqFinishDistributedWriteSession& req); -void FromProto( - TDistributedWriteSession* mutableSession, - const TReqFinishDistributedWriteSession& req); - //////////////////////////////////////////////////////////////////////////////// void FillRequest( - TReqParticipantWriteTable* req, - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options); - -void FromProto( - TParticipantTableWriterOptions* mutableOptions, - const TReqParticipantWriteTable& req); - -void FromProto( - TDistributedWriteCookie* cookie, - const TReqParticipantWriteTable& req); - -void ToProto( - TRspParticipantWriteTable* rsp, - const TDistributedWriteCookiePtr& cookie); + TReqWriteTableFragment* req, + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options); + +void ParseRequest( + TFragmentWriteCookiePtr* mutableCookie, + TFragmentTableWriterOptions* mutableOptions, + const TReqWriteTableFragment& req); } // namespace NProto diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index e719d41f8c..e52fdf7acb 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -938,7 +938,7 @@ TFuture<void> TTransaction::FinishDistributedWriteSession( ValidateActive(); return Client_->FinishDistributedWriteSession( std::move(session), - PatchTransactionId(options)); + options); } TFuture<void> TTransaction::DoAbort( diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp index 5d41f1abc8..7bd88c5279 100644 --- a/yt/yt/client/driver/distributed_table_commands.cpp +++ b/yt/yt/client/driver/distributed_table_commands.cpp @@ -2,34 +2,16 @@ #include "config.h" #include "helpers.h" -#include <yt/yt/client/api/distributed_table_sessions.h> - -// #include <yt/yt/client/api/rowset.h> -// #include <yt/yt/client/api/skynet.h> - -// #include <yt/yt/client/chaos_client/replication_card_serialization.h> +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/formats/config.h> #include <yt/yt/client/formats/parser.h> -// #include <yt/yt/client/table_client/adapters.h> -// #include <yt/yt/client/table_client/blob_reader.h> -// #include <yt/yt/client/table_client/columnar_statistics.h> -// #include <yt/yt/client/table_client/row_buffer.h> -// #include <yt/yt/client/table_client/table_consumer.h> -// #include <yt/yt/client/table_client/table_output.h> -// #include <yt/yt/client/table_client/unversioned_writer.h> -// #include <yt/yt/client/table_client/versioned_writer.h> -// #include <yt/yt/client/table_client/wire_protocol.h> - -// #include <yt/yt/client/tablet_client/table_mount_cache.h> - #include <yt/yt/client/ypath/public.h> #include <yt/yt/library/formats/format.h> #include <yt/yt/core/concurrency/scheduler_api.h> -// #include <yt/yt/core/misc/finally.h> #include <yt/yt/core/ytree/convert.h> @@ -45,20 +27,6 @@ using namespace NYPath; //////////////////////////////////////////////////////////////////////////////// -// namespace { - -// NLogging::TLogger WithCommandTag( -// const NLogging::TLogger& logger, -// const ICommandContextPtr& context) -// { -// return logger.WithTag("Command: %v", -// context->Request().CommandName); -// } - -// } // namespace - -//////////////////////////////////////////////////////////////////////////////// - void TStartDistributedWriteSessionCommand::Register(TRegistrar registrar) { registrar.Parameter("path", &TThis::Path); @@ -88,8 +56,6 @@ void TFinishDistributedWriteSessionCommand::Register(TRegistrar registrar) // -> Nothing void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context) { - auto transaction = AttachTransaction(context, /*required*/ false); - auto session = ConvertTo<TDistributedWriteSessionPtr>(Session); WaitFor(context->GetClient()->FinishDistributedWriteSession( @@ -102,35 +68,35 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context //////////////////////////////////////////////////////////////////////////////// -void TParticipantWriteTableCommand::Execute(ICommandContextPtr context) +void TWriteTableFragmentCommand::Execute(ICommandContextPtr context) { - TTypedCommand<NApi::TParticipantTableWriterOptions>::Execute(std::move(context)); + TTypedCommand<NApi::TFragmentTableWriterOptions>::Execute(std::move(context)); } -void TParticipantWriteTableCommand::Register(TRegistrar /*registrar*/) +void TWriteTableFragmentCommand::Register(TRegistrar /*registrar*/) { } -TFuture<NApi::ITableWriterPtr> TParticipantWriteTableCommand::CreateTableWriter( +TFuture<NApi::ITableWriterPtr> TWriteTableFragmentCommand::CreateTableWriter( const ICommandContextPtr& context) const { PutMethodInfoInTraceContext("participant_write_table"); return context ->GetClient() - ->CreateParticipantTableWriter( - StaticPointerCast<TDistributedWriteCookie>(ResultingCookie), - TTypedCommand<TParticipantTableWriterOptions>::Options); + ->CreateFragmentTableWriter( + StaticPointerCast<TFragmentWriteCookie>(ResultingCookie), + TTypedCommand<TFragmentTableWriterOptions>::Options); } // -> Cookie -void TParticipantWriteTableCommand::DoExecute(ICommandContextPtr context) +void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context) { - auto cookie = ConvertTo<TDistributedWriteCookiePtr>(Cookie); + auto cookie = ConvertTo<TFragmentWriteCookiePtr>(Cookie); ResultingCookie = StaticPointerCast<TRefCounted>(std::move(cookie)); DoExecuteImpl(context); ProduceOutput(context, [cookie = std::move(ResultingCookie)] (IYsonConsumer* consumer) { - Serialize(StaticPointerCast<TDistributedWriteCookie>(cookie), consumer); + Serialize(StaticPointerCast<TFragmentWriteCookie>(cookie), consumer); }); } diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h index ea3f7a9b7b..fa3d6bb4e5 100644 --- a/yt/yt/client/driver/distributed_table_commands.h +++ b/yt/yt/client/driver/distributed_table_commands.h @@ -45,8 +45,8 @@ private: //////////////////////////////////////////////////////////////////////////////// // -> Cookie -class TParticipantWriteTableCommand - : public TTypedCommand<NApi::TParticipantTableWriterOptions> +class TWriteTableFragmentCommand + : public TTypedCommand<NApi::TFragmentTableWriterOptions> , private TWriteTableCommand { public: @@ -54,7 +54,7 @@ public: // ambiguity in dispatch. void Execute(ICommandContextPtr context) override; - REGISTER_YSON_STRUCT_LITE(TParticipantWriteTableCommand); + REGISTER_YSON_STRUCT_LITE(TWriteTableFragmentCommand); static void Register(TRegistrar registrar); diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index 4fb7783697..e13634f539 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -396,9 +396,9 @@ public: REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false); REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false); REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false); - REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false); - REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false); - REGISTER_ALL(TParticipantWriteTableCommand, "participant_write_table", Tabular, Structured, true, true ); + REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false); + REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false); + REGISTER_ALL(TWriteTableFragmentCommand, "distributed_write_table_partition", Tabular, Structured, true, true ); } #undef REGISTER diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index a2099a6d0e..f43f76958d 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -486,7 +486,7 @@ public: UNIMPLEMENTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&)); UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); - UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&)); + UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&)); UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const TString& , int, const TStartShuffleOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, FinishShuffle, (const TShuffleHandlePtr&, const TFinishShuffleOptions&)); UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index 707a6759b8..3fb36f8560 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -111,7 +111,7 @@ public: UNSUPPORTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const TYPath&, const TJournalWriterOptions&)); UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); - UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&)); + UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&)); // IClient methods. // Unsupported methods. diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index e90e6b8e71..43e8056b1e 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -2,7 +2,7 @@ #include <yt/yt/client/api/connection.h> #include <yt/yt/client/api/client.h> -#include <yt/yt/client/api/distributed_table_sessions.h> +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> #include <yt/yt/client/api/journal_writer.h> @@ -841,9 +841,9 @@ public: const TDistributedWriteSessionFinishOptions& options), (override)); - MOCK_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, ( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options), + MOCK_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, ( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options), (override)); MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, ( diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index 9bb296dbe8..943e4fa201 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -1,6 +1,6 @@ #pragma once -#include <yt/yt/client/api/distributed_table_sessions.h> +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> #include <yt/yt/client/api/journal_writer.h> diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index a0cc4caf88..9e468fbf8d 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -13,7 +13,7 @@ SRCS( api/client_cache.cpp api/delegating_client.cpp api/delegating_transaction.cpp - api/distributed_table_sessions.cpp + api/distributed_table_session.cpp api/etc_client.cpp api/journal_client.cpp api/operation_client.cpp diff --git a/yt/yt/core/rpc/stream-inl.h b/yt/yt/core/rpc/stream-inl.h index 4474158d4e..8f7ddd274b 100644 --- a/yt/yt/core/rpc/stream-inl.h +++ b/yt/yt/core/rpc/stream-inl.h @@ -43,11 +43,28 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream { auto invokeResult = request->Invoke().template As<void>(); auto metaHandlerResult = request->GetResponseAttachmentsStream()->Read() - .Apply(metaHandler); - return metaHandlerResult.Apply(BIND ([=] () { + .Apply(std::move(metaHandler)); + return metaHandlerResult.Apply(BIND ([req = std::move(request), res = std::move(invokeResult)] () mutable { return NDetail::CreateRpcClientOutputStreamFromInvokedRequest( - std::move(request), - std::move(invokeResult)); + std::move(req), + std::move(res)); + })); +} + +template <class TRequestMessage, class TResponse> +TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream( + TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request, + TCallback<void(TSharedRef)> metaHandler, + TCallback<void(TIntrusivePtr<TResponse>&&)> rspHandler) +{ + auto invokeResult = request->Invoke() + .ApplyUnique(std::move(rspHandler)); + auto metaHandlerResult = request->GetResponseAttachmentsStream()->Read() + .Apply(std::move(metaHandler)); + return metaHandlerResult.Apply(BIND ([req = std::move(request), res = std::move(invokeResult)] () mutable { + return NDetail::CreateRpcClientOutputStreamFromInvokedRequest( + std::move(req), + std::move(res)); })); } diff --git a/yt/yt/core/rpc/stream.h b/yt/yt/core/rpc/stream.h index 93e54155a8..f2220e7425 100644 --- a/yt/yt/core/rpc/stream.h +++ b/yt/yt/core/rpc/stream.h @@ -265,6 +265,13 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request, TCallback<void(TSharedRef)> metaHandler); +//! This variant additionally allows non-trivial response of streaming request to be handled. +template <class TRequestMessage, class TResponse> +TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream( + TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request, + TCallback<void(TSharedRef)> metaHandler, + TCallback<void(TIntrusivePtr<TResponse>&&)> rspHandler); + //////////////////////////////////////////////////////////////////////////////// //! Handles an incoming streaming request that uses the #CreateRpcClientInputStream diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 0c7bc9eddc..9c9e018be9 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -3409,15 +3409,14 @@ message TRspGetQueryTrackerInfo } //////////////////////////////////////////////////////////////////////////////// -// NB(arkady-e1ppa): Under construction. // Distributed table client //////////////////////////////////////////////////////////////////////////////// message TReqStartDistributedWriteSession { - optional string path = 1; - - // TDistributedWriteSessionStartOptions contents... + required string path = 1; + + optional TTransactionalOptions transactional_options = 100; } message TRspStartDistributedWriteSession @@ -3427,29 +3426,27 @@ message TRspStartDistributedWriteSession message TReqFinishDistributedWriteSession { - required bytes session = 2; // YSON-serialized TDistributedWriteSession + required bytes session = 1; // YSON-serialized TDistributedWriteSession - // TDistributedWriteSessionFinishOptions contents... + required int32 max_children_per_attach_request = 2; } message TRspFinishDistributedWriteSession { } -message TReqParticipantWriteTable +message TReqWriteTableFragment { optional bytes config = 1; // YSON-serialized TTableWriterConfig optional bytes format = 2; // YSON-serialized TFormat - required bytes cookie = 3; // YSON-serialized TDistributedWriteCookie - - // TParticipantTableWriterOptions contents... + required bytes cookie = 3; // YSON-serialized TFragmentWriteCookie } -message TRspParticipantWriteTable +message TRspWriteTableFragment { - required bytes cookie = 1; // YSON-serialized TDistributedWriteCookie + required bytes cookie = 1; // YSON-serialized TFragmentWriteCookie } /////////////////////////////////////////////////////////////////////////////// |