diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-08-23 11:29:08 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-08-23 11:40:02 +0300 |
commit | c872ef1cd2cb3bdc9dadb23fe2a72a24de87a32c (patch) | |
tree | 5f268e44c021acbb38e91eb2bbd522204db0a241 | |
parent | 7e4fd8b537c4391893cf62e3feaba46b2e69750e (diff) | |
download | ydb-c872ef1cd2cb3bdc9dadb23fe2a72a24de87a32c.tar.gz |
YT-22307: Empty template for dist write api
e9554dea04d2a2fb0c9d8cbee012afa88382e715
29 files changed, 725 insertions, 7 deletions
diff --git a/yt/yt/client/api/client.h b/yt/yt/client/api/client.h index b4ff0ffe17..d65619c86e 100644 --- a/yt/yt/client/api/client.h +++ b/yt/yt/client/api/client.h @@ -4,6 +4,7 @@ #include "accounting_client.h" #include "admin_client.h" #include "cypress_client.h" +#include "distributed_table_client.h" #include "etc_client.h" #include "file_client.h" #include "journal_client.h" @@ -37,6 +38,7 @@ struct IClientBase , public IJournalClientBase , public IQueueClientBase , public IEtcClientBase + , public IDistributedTableClientBase { virtual IConnectionPtr GetConnection() = 0; }; @@ -70,6 +72,7 @@ struct IClient , public IEtcClient , public NBundleControllerClient::IBundleControllerClient , public IFlowClient + , public IDistributedTableClient { //! Terminates all channels. //! Aborts all pending uncommitted transactions. diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index a7982186ac..1ac5ce7597 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -840,6 +840,22 @@ public: const TGetFlowViewOptions& options), (pipelinePath, viewPath, options)) + // Distributed client + DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, ( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options), + (path, options)) + + DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, ( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options), + (std::move(session), options)) + + DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, ( + const TDistributedWriteCookiePtr& cookie, + const TParticipantTableWriterOptions& options), + (cookie, options)) + #undef DELEGATE_METHOD protected: diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp index 3e539bf9f7..156eb1dde7 100644 --- a/yt/yt/client/api/delegating_transaction.cpp +++ b/yt/yt/client/api/delegating_transaction.cpp @@ -312,6 +312,16 @@ DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( const TPushQueueProducerOptions& options), (producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options)) +DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, ( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options), + (path, options)) + +DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, ( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options), + (std::move(session), options)) + #undef DELEGATE_METHOD //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h index 533a048204..e0a71dee00 100644 --- a/yt/yt/client/api/delegating_transaction.h +++ b/yt/yt/client/api/delegating_transaction.h @@ -256,6 +256,15 @@ public: const std::vector<TSharedRef>& serializedRows, const TPushQueueProducerOptions& options) override; + // Distributed table client + TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options = {}) override; + + TFuture<void> FinishDistributedWriteSession( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options = {}) override; + protected: const ITransactionPtr Underlying_; }; diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h new file mode 100644 index 0000000000..9ac983371c --- /dev/null +++ b/yt/yt/client/api/distributed_table_client.h @@ -0,0 +1,51 @@ +#pragma once + +#include "table_client.h" + +#include <yt/yt/client/table_client/config.h> + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +struct TDistributedWriteSessionStartOptions + : public TTransactionalOptions +{ }; + +struct TDistributedWriteSessionFinishOptions + : public TTransactionalOptions +{ }; + +struct TParticipantTableWriterOptions + : public TTableWriterOptions +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +struct IDistributedTableClientBase +{ + virtual ~IDistributedTableClientBase() = default; + + virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options = {}) = 0; + + virtual TFuture<void> FinishDistributedWriteSession( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options = {}) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct IDistributedTableClient +{ + virtual ~IDistributedTableClient() = default; + + virtual TFuture<ITableWriterPtr> CreateParticipantTableWriter( + const TDistributedWriteCookiePtr& cookie, + const TParticipantTableWriterOptions& options = {}) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_sessions.cpp b/yt/yt/client/api/distributed_table_sessions.cpp new file mode 100644 index 0000000000..b454ad476b --- /dev/null +++ b/yt/yt/client/api/distributed_table_sessions.cpp @@ -0,0 +1,17 @@ +#include "distributed_table_sessions.h" + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +void TDistributedWriteCookie::Register(TRegistrar /*registrar*/) +{ } + +//////////////////////////////////////////////////////////////////////////////// + +void TDistributedWriteSession::Register(TRegistrar /*registrar*/) +{ } + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_sessions.h b/yt/yt/client/api/distributed_table_sessions.h new file mode 100644 index 0000000000..19b3c0f5c0 --- /dev/null +++ b/yt/yt/client/api/distributed_table_sessions.h @@ -0,0 +1,41 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/ytree/yson_struct.h> + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid); + +//////////////////////////////////////////////////////////////////////////////// + +class TDistributedWriteCookie + : public NYTree::TYsonStruct +{ +public: + REGISTER_YSON_STRUCT(TDistributedWriteCookie); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TDistributedWriteCookie); + +//////////////////////////////////////////////////////////////////////////////// + +class TDistributedWriteSession + : public NYTree::TYsonStruct +{ +public: + REGISTER_YSON_STRUCT(TDistributedWriteSession); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index fe0a173f31..f67dbed45b 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -184,6 +184,9 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest) DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter) +DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession) +DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie) + //////////////////////////////////////////////////////////////////////////////// inline const TString ClusterNamePath("//sys/@cluster_name"); diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index 91fe45fc3d..14edf0f3c2 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -204,6 +204,12 @@ public: DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AlterQuery); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetQueryTrackerInfo); + // Distributed table client + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartDistributedWriteSession); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishDistributedWriteSession); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable, + .SetStreamingEnabled(true)); + // Misc DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CheckClusterLiveness); }; diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index 2711f8a37b..90b88789c3 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -12,6 +12,7 @@ #include "table_writer.h" #include "transaction.h" +#include <yt/yt/client/api/distributed_table_sessions.h> #include <yt/yt/client/api/file_reader.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> @@ -788,6 +789,38 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter( //////////////////////////////////////////////////////////////////////////////// +TFuture<TDistributedWriteSessionPtr> TClientBase::StartDistributedWriteSession( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options) +{ + using TRsp = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspStartDistributedWriteSession>>; + + auto proxy = CreateApiServiceProxy(); + + auto req = proxy.StartDistributedWriteSession(); + FillRequest(req.Get(), path, options); + + return req->Invoke() + .ApplyUnique(BIND([] (TRsp&& result) -> TDistributedWriteSessionPtr { + return ConvertTo<TDistributedWriteSessionPtr>(TYsonString(result->session())); + })); +} + +TFuture<void> TClientBase::FinishDistributedWriteSession( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options) +{ + auto proxy = CreateApiServiceProxy(); + + auto req = proxy.FinishDistributedWriteSession(); + + FillRequest(req.Get(), std::move(session), options); + + return req->Invoke().AsVoid(); +} + +//////////////////////////////////////////////////////////////////////////////// + TFuture<TUnversionedLookupRowsResult> TClientBase::LookupRows( const TYPath& path, TNameTablePtr nameTable, diff --git a/yt/yt/client/api/rpc_proxy/client_base.h b/yt/yt/client/api/rpc_proxy/client_base.h index 5a85160692..2ccdb1adb4 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.h +++ b/yt/yt/client/api/rpc_proxy/client_base.h @@ -183,6 +183,15 @@ public: TFuture<NApi::ITableWriterPtr> CreateTableWriter( const NYPath::TRichYPath& path, const NApi::TTableWriterOptions& options) override; + + // Distributed table client + TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options) override; + + TFuture<void> FinishDistributedWriteSession( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options) override; }; DEFINE_REFCOUNTED_TYPE(TClientBase) diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index beb3b391ec..567505e05f 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -4,6 +4,7 @@ #include "helpers.h" #include "private.h" #include "table_mount_cache.h" +#include "table_writer.h" #include "timestamp_provider.h" #include "transaction.h" @@ -51,6 +52,7 @@ using NYT::FromProto; using namespace NAuth; using namespace NChaosClient; using namespace NChunkClient; +using namespace NConcurrency; using namespace NObjectClient; using namespace NRpc; using namespace NScheduler; @@ -739,6 +741,32 @@ TFuture<void> TClient::AlterReplicationCard( return req->Invoke().As<void>(); } +TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter( + const TDistributedWriteCookiePtr& cookie, + const TParticipantTableWriterOptions& options) +{ + auto proxy = CreateApiServiceProxy(); + auto req = proxy.ParticipantWriteTable(); + InitStreamingRequest(*req); + + FillRequest(req.Get(), cookie, options); + + auto schema = New<TTableSchema>(); + return NRpc::CreateRpcClientOutputStream( + std::move(req), + BIND ([=] (const TSharedRef& metaRef) { + NApi::NRpcProxy::NProto::TWriteTableMeta meta; + if (!TryDeserializeProto(&meta, metaRef)) { + THROW_ERROR_EXCEPTION("Failed to deserialize schema for participant table writer"); + } + + FromProto(schema.Get(), meta.schema()); + })) + .Apply(BIND([=] (IAsyncZeroCopyOutputStreamPtr outputStream) { + return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema)); + })).As<ITableWriterPtr>(); +} + TFuture<IQueueRowsetPtr> TClient::PullQueue( const TRichYPath& queuePath, i64 offset, diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index f6020cd0af..ffce8539b5 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -129,6 +129,11 @@ public: NChaosClient::TReplicationCardId replicationCardId, const TAlterReplicationCardOptions& options = {}) override; + // Distributed table client + TFuture<ITableWriterPtr> CreateParticipantTableWriter( + const TDistributedWriteCookiePtr& cookie, + const TParticipantTableWriterOptions& options) override; + // Queues. TFuture<NQueueClient::IQueueRowsetPtr> PullQueue( const NYPath::TRichYPath& queuePath, diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index d5a4beb180..9ebde85ea7 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -1911,6 +1911,92 @@ NQueryTrackerClient::EQueryState ConvertQueryStateFromProto( //////////////////////////////////////////////////////////////////////////////// +void FillRequest( + TReqStartDistributedWriteSession* req, + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options) +{ + Y_UNUSED(req, path, options); +} + +void FromProto( + TDistributedWriteSessionStartOptions* mutableOptions, + const TReqStartDistributedWriteSession& req) +{ + Y_UNUSED(req, mutableOptions); +} + +void FromProto( + TDistributedWriteSession* mutableSession, + TRspStartDistributedWriteSession&& rsp) +{ + Y_UNUSED(mutableSession, rsp); +} + +void ToProto( + TRspStartDistributedWriteSession* rsp, + const TDistributedWriteSessionPtr& session) +{ + Y_UNUSED(rsp, session); +} + +//////////////////////////////////////////////////////////////////////////////// + +void FillRequest( + TReqFinishDistributedWriteSession* req, + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options) +{ + Y_UNUSED(req, session, options); +} + +void FromProto( + TDistributedWriteSessionFinishOptions* mutableOptions, + const TReqFinishDistributedWriteSession& req) +{ + Y_UNUSED(req, mutableOptions); +} + +void FromProto( + TDistributedWriteSession* mutableSession, + const TReqFinishDistributedWriteSession& req) +{ + Y_UNUSED(mutableSession, req); +} + +//////////////////////////////////////////////////////////////////////////////// + +void FillRequest( + TReqParticipantWriteTable* req, + const TDistributedWriteCookiePtr& cookie, + const TParticipantTableWriterOptions& options) +{ + Y_UNUSED(req, cookie, options); +} + +void FromProto( + TParticipantTableWriterOptions* mutableOptions, + const TReqParticipantWriteTable& req) +{ + Y_UNUSED(req, mutableOptions); +} + +void FromProto( + TDistributedWriteCookie* cookie, + const TReqParticipantWriteTable& req) +{ + Y_UNUSED(cookie, req); +} + +void ToProto( + TRspParticipantWriteTable* rsp, + const TDistributedWriteCookiePtr& cookie) +{ + Y_UNUSED(rsp, cookie); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NProto //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h index 9d398923b0..9027337359 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.h +++ b/yt/yt/client/api/rpc_proxy/helpers.h @@ -271,6 +271,60 @@ NProto::EQueryState ConvertQueryStateToProto( NQueryTrackerClient::EQueryState ConvertQueryStateFromProto( NProto::EQueryState proto); + +//////////////////////////////////////////////////////////////////////////////// + +void FillRequest( + TReqStartDistributedWriteSession* req, + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options); + +void FromProto( + TDistributedWriteSessionStartOptions* mutableOptions, + const TReqStartDistributedWriteSession& req); + +void FromProto( + TDistributedWriteSession* mutableSession, + TRspStartDistributedWriteSession&& rsp); + +void ToProto( + TRspStartDistributedWriteSession* rsp, + const TDistributedWriteSessionPtr& session); + +//////////////////////////////////////////////////////////////////////////////// + +void FillRequest( + TReqFinishDistributedWriteSession* req, + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options); + +void FromProto( + TDistributedWriteSessionFinishOptions* mutableOptions, + const TReqFinishDistributedWriteSession& req); + +void FromProto( + TDistributedWriteSession* mutableSession, + const TReqFinishDistributedWriteSession& req); + +//////////////////////////////////////////////////////////////////////////////// + +void FillRequest( + TReqParticipantWriteTable* req, + const TDistributedWriteCookiePtr& cookie, + const TParticipantTableWriterOptions& options); + +void FromProto( + TParticipantTableWriterOptions* mutableOptions, + const TReqParticipantWriteTable& req); + +void FromProto( + TDistributedWriteCookie* cookie, + const TReqParticipantWriteTable& req); + +void ToProto( + TRspParticipantWriteTable* rsp, + const TDistributedWriteCookiePtr& cookie); + } // namespace NProto //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index 6d32ba3995..e719d41f8c 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -921,6 +921,26 @@ IJournalWriterPtr TTransaction::CreateJournalWriter( PatchTransactionId(options)); } +TFuture<TDistributedWriteSessionPtr> TTransaction::StartDistributedWriteSession( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options) +{ + ValidateActive(); + return Client_->StartDistributedWriteSession( + path, + PatchTransactionId(options)); +} + +TFuture<void> TTransaction::FinishDistributedWriteSession( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options) +{ + ValidateActive(); + return Client_->FinishDistributedWriteSession( + std::move(session), + PatchTransactionId(options)); +} + TFuture<void> TTransaction::DoAbort( TGuard<NThreading::TSpinLock>* guard, const TTransactionAbortOptions& /*options*/) diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index 636b8fe2e2..3faa58d708 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -235,6 +235,14 @@ public: const NYPath::TYPath& path, const NApi::TJournalWriterOptions& options) override; + TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options = {}) override; + + TFuture<void> FinishDistributedWriteSession( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options = {}) override; + // Custom methods. //! Returns proxy address this transaction is sticking to. diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp new file mode 100644 index 0000000000..5d41f1abc8 --- /dev/null +++ b/yt/yt/client/driver/distributed_table_commands.cpp @@ -0,0 +1,139 @@ +#include "distributed_table_commands.h" +#include "config.h" +#include "helpers.h" + +#include <yt/yt/client/api/distributed_table_sessions.h> + +// #include <yt/yt/client/api/rowset.h> +// #include <yt/yt/client/api/skynet.h> + +// #include <yt/yt/client/chaos_client/replication_card_serialization.h> + +#include <yt/yt/client/formats/config.h> +#include <yt/yt/client/formats/parser.h> + +// #include <yt/yt/client/table_client/adapters.h> +// #include <yt/yt/client/table_client/blob_reader.h> +// #include <yt/yt/client/table_client/columnar_statistics.h> +// #include <yt/yt/client/table_client/row_buffer.h> +// #include <yt/yt/client/table_client/table_consumer.h> +// #include <yt/yt/client/table_client/table_output.h> +// #include <yt/yt/client/table_client/unversioned_writer.h> +// #include <yt/yt/client/table_client/versioned_writer.h> +// #include <yt/yt/client/table_client/wire_protocol.h> + +// #include <yt/yt/client/tablet_client/table_mount_cache.h> + +#include <yt/yt/client/ypath/public.h> + +#include <yt/yt/library/formats/format.h> + +#include <yt/yt/core/concurrency/scheduler_api.h> +// #include <yt/yt/core/misc/finally.h> + +#include <yt/yt/core/ytree/convert.h> + +namespace NYT::NDriver { + +using namespace NApi; +using namespace NConcurrency; +using namespace NFormats; +using namespace NTracing; +using namespace NYTree; +using namespace NYson; +using namespace NYPath; + +//////////////////////////////////////////////////////////////////////////////// + +// namespace { + +// NLogging::TLogger WithCommandTag( +// const NLogging::TLogger& logger, +// const ICommandContextPtr& context) +// { +// return logger.WithTag("Command: %v", +// context->Request().CommandName); +// } + +// } // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TStartDistributedWriteSessionCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("path", &TThis::Path); +} + +// -> DistributedWriteSession +void TStartDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context) +{ + auto transaction = AttachTransaction(context, /*required*/ false); + + auto session = WaitFor(context->GetClient()->StartDistributedWriteSession( + Path, + Options)); + + ProduceOutput(context, [session = std::move(session)] (IYsonConsumer* consumer) { + Serialize(session, consumer); + }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TFinishDistributedWriteSessionCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("session", &TThis::Session); +} + +// -> Nothing +void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context) +{ + auto transaction = AttachTransaction(context, /*required*/ false); + + auto session = ConvertTo<TDistributedWriteSessionPtr>(Session); + + WaitFor(context->GetClient()->FinishDistributedWriteSession( + std::move(session), + Options)) + .ThrowOnError(); + + ProduceEmptyOutput(context); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TParticipantWriteTableCommand::Execute(ICommandContextPtr context) +{ + TTypedCommand<NApi::TParticipantTableWriterOptions>::Execute(std::move(context)); +} + +void TParticipantWriteTableCommand::Register(TRegistrar /*registrar*/) +{ } + +TFuture<NApi::ITableWriterPtr> TParticipantWriteTableCommand::CreateTableWriter( + const ICommandContextPtr& context) const +{ + PutMethodInfoInTraceContext("participant_write_table"); + + return context + ->GetClient() + ->CreateParticipantTableWriter( + StaticPointerCast<TDistributedWriteCookie>(ResultingCookie), + TTypedCommand<TParticipantTableWriterOptions>::Options); +} + +// -> Cookie +void TParticipantWriteTableCommand::DoExecute(ICommandContextPtr context) +{ + auto cookie = ConvertTo<TDistributedWriteCookiePtr>(Cookie); + ResultingCookie = StaticPointerCast<TRefCounted>(std::move(cookie)); + + DoExecuteImpl(context); + ProduceOutput(context, [cookie = std::move(ResultingCookie)] (IYsonConsumer* consumer) { + Serialize(StaticPointerCast<TDistributedWriteCookie>(cookie), consumer); + }); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDriver diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h new file mode 100644 index 0000000000..ea3f7a9b7b --- /dev/null +++ b/yt/yt/client/driver/distributed_table_commands.h @@ -0,0 +1,75 @@ +#pragma once + +#include "table_commands.h" + +#include <yt/yt/client/formats/format.h> + +#include <yt/yt/client/ypath/rich.h> + +namespace NYT::NDriver { + +//////////////////////////////////////////////////////////////////////////////// + +// -> DistributedWriteSession +class TStartDistributedWriteSessionCommand + : public TTypedCommand<NApi::TDistributedWriteSessionStartOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TStartDistributedWriteSessionCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TRichYPath Path; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// -> Nothing +class TFinishDistributedWriteSessionCommand + : public TTypedCommand<NApi::TDistributedWriteSessionFinishOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TFinishDistributedWriteSessionCommand); + + static void Register(TRegistrar registrar); + +private: + NYTree::INodePtr Session; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// -> Cookie +class TParticipantWriteTableCommand + : public TTypedCommand<NApi::TParticipantTableWriterOptions> + , private TWriteTableCommand +{ +public: + // Shadow normal execute in order to fix + // ambiguity in dispatch. + void Execute(ICommandContextPtr context) override; + + REGISTER_YSON_STRUCT_LITE(TParticipantWriteTableCommand); + + static void Register(TRegistrar registrar); + +private: + using TBase = TWriteTableCommand; + + NYTree::INodePtr Cookie; + TRefCountedPtr ResultingCookie; + + TFuture<NApi::ITableWriterPtr> CreateTableWriter( + const ICommandContextPtr& context) const override; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDriver diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index bba7ffbabd..561eb836fb 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -7,6 +7,7 @@ #include "command.h" #include "config.h" #include "cypress_commands.h" +#include "distributed_table_commands.h" #include "etc_commands.h" #include "file_commands.h" #include "journal_commands.h" @@ -394,6 +395,9 @@ public: REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false); REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false); REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false); + REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false); + REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false); + REGISTER_ALL(TParticipantWriteTableCommand, "participant_write_table", Tabular, Structured, true, true ); } #undef REGISTER diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index a45768580b..5cbfa54a46 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -47,7 +47,9 @@ using namespace NTracing; //////////////////////////////////////////////////////////////////////////////// -static NLogging::TLogger WithCommandTag( +namespace { + +NLogging::TLogger WithCommandTag( const NLogging::TLogger& logger, const ICommandContextPtr& context) { @@ -55,6 +57,8 @@ static NLogging::TLogger WithCommandTag( context->Request().CommandName); } +} // namespace + //////////////////////////////////////////////////////////////////////////////// void TReadTableCommand::Register(TRegistrar registrar) @@ -261,7 +265,17 @@ void TWriteTableCommand::Register(TRegistrar registrar) .Default(1_MB); } -void TWriteTableCommand::DoExecute(ICommandContextPtr context) +TFuture<ITableWriterPtr> TWriteTableCommand::CreateTableWriter( + const ICommandContextPtr& context) const +{ + PutMethodInfoInTraceContext("write_table"); + + return context->GetClient()->CreateTableWriter( + Path, + Options); +} + +void TWriteTableCommand::DoExecuteImpl(const ICommandContextPtr& context) { auto transaction = AttachTransaction(context, false); @@ -273,11 +287,7 @@ void TWriteTableCommand::DoExecute(ICommandContextPtr context) Options.PingAncestors = true; Options.Config = config; - PutMethodInfoInTraceContext("write_table"); - - auto apiWriter = WaitFor(context->GetClient()->CreateTableWriter( - Path, - Options)) + auto apiWriter = WaitFor(CreateTableWriter(context)) .ValueOrThrow(); auto schemalessWriter = CreateSchemalessFromApiWriterAdapter(std::move(apiWriter)); @@ -298,7 +308,11 @@ void TWriteTableCommand::DoExecute(ICommandContextPtr context) WaitFor(schemalessWriter->Close()) .ThrowOnError(); +} +void TWriteTableCommand::DoExecute(ICommandContextPtr context) +{ + DoExecuteImpl(context); ProduceEmptyOutput(context); } diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h index 744d0220b2..2e1e8e22d5 100644 --- a/yt/yt/client/driver/table_commands.h +++ b/yt/yt/client/driver/table_commands.h @@ -82,6 +82,12 @@ public: static void Register(TRegistrar registrar); +protected: + virtual TFuture<NApi::ITableWriterPtr> CreateTableWriter( + const ICommandContextPtr& context) const; + + void DoExecuteImpl(const ICommandContextPtr& context); + private: NYPath::TRichYPath Path; NYTree::INodePtr TableWriter; diff --git a/yt/yt/client/driver/ya.make b/yt/yt/client/driver/ya.make index 9604eb5848..f33485ce8f 100644 --- a/yt/yt/client/driver/ya.make +++ b/yt/yt/client/driver/ya.make @@ -10,6 +10,7 @@ SRCS( command.cpp config.cpp cypress_commands.cpp + distributed_table_commands.cpp driver.cpp etc_commands.cpp file_commands.cpp diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 771ca3e3ac..14375369fc 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -233,6 +233,8 @@ public: UNIMPLEMENTED_METHOD(IFileWriterPtr, CreateFileWriter, (const NYPath::TRichYPath&, const TFileWriterOptions&)); UNIMPLEMENTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const NYPath::TYPath&, const TJournalReaderOptions&)); UNIMPLEMENTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const NYPath::TYPath&, const TJournalWriterOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); + UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); private: const TClientPtr Client_; @@ -471,6 +473,9 @@ public: UNIMPLEMENTED_METHOD(TFuture<void>, PausePipeline, (const NYPath::TYPath&, const TPausePipelineOptions&)); UNIMPLEMENTED_METHOD(TFuture<TPipelineState>, GetPipelineState, (const NYPath::TYPath&, const TGetPipelineStateOptions&)); UNIMPLEMENTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); + UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); + UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&)); private: friend class TTransaction; diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index ab41ce4c2f..d78aa78231 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -113,6 +113,9 @@ public: UNSUPPORTED_METHOD(IFileWriterPtr, CreateFileWriter, (const TRichYPath&, const TFileWriterOptions&)); UNSUPPORTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const TYPath&, const TJournalReaderOptions&)); UNSUPPORTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const TYPath&, const TJournalWriterOptions&)); + UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); + UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); + UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&)); // IClient methods. // Unsupported methods. diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index 39d7aa777b..906e4427b3 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -2,6 +2,7 @@ #include <yt/yt/client/api/connection.h> #include <yt/yt/client/api/client.h> +#include <yt/yt/client/api/distributed_table_sessions.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> #include <yt/yt/client/api/journal_writer.h> @@ -825,6 +826,21 @@ public: const TGetFlowViewOptions& options), (override)); + MOCK_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, ( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options), + (override)); + + MOCK_METHOD(TFuture<void>, FinishDistributedWriteSession, ( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options), + (override)); + + MOCK_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, ( + const TDistributedWriteCookiePtr& cookie, + const TParticipantTableWriterOptions& options), + (override)); + private: NTabletClient::ITableMountCachePtr TableMountCache_; NTransactionClient::ITimestampProviderPtr TimestampProvider_; diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index 5d4ada47f9..9bb296dbe8 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -1,5 +1,6 @@ #pragma once +#include <yt/yt/client/api/distributed_table_sessions.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> #include <yt/yt/client/api/journal_writer.h> @@ -150,6 +151,16 @@ public: const NYPath::TYPath& path, const TJournalWriterOptions& options), (override)); + MOCK_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, ( + const NYPath::TRichYPath& path, + const TDistributedWriteSessionStartOptions& options), + (override)); + + MOCK_METHOD(TFuture<void>, FinishDistributedWriteSession, ( + TDistributedWriteSessionPtr session, + const TDistributedWriteSessionFinishOptions& options), + (override)); + // ITransaction IClientPtr Client; NTransactionClient::ETransactionType Type; diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index 8dcc45c9e1..4eda047917 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -13,6 +13,7 @@ SRCS( api/client_cache.cpp api/delegating_client.cpp api/delegating_transaction.cpp + api/distributed_table_sessions.cpp api/etc_client.cpp api/journal_client.cpp api/operation_client.cpp diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 81dcfaeba2..d43f4a22ca 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -3364,3 +3364,47 @@ message TRspGetQueryTrackerInfo required bytes supported_features = 2; // YSON repeated string access_control_objects = 3; } + +//////////////////////////////////////////////////////////////////////////////// +// NB(arkady-e1ppa): Under construction. +// Distributed table client +//////////////////////////////////////////////////////////////////////////////// + +message TReqStartDistributedWriteSession +{ + optional string path = 1; + + // TDistributedWriteSessionStartOptions contents... +} + +message TRspStartDistributedWriteSession +{ + required bytes session = 1; // YSON-serialized TDistributedWriteSession +} + +message TReqFinishDistributedWriteSession +{ + required bytes session = 2; // YSON-serialized TDistributedWriteSession + + // TDistributedWriteSessionFinishOptions contents... +} + +message TRspFinishDistributedWriteSession +{ +} + +message TReqParticipantWriteTable +{ + optional bytes config = 1; // YSON-serialized TTableWriterConfig + + optional bytes format = 2; // YSON-serialized TFormat + + required bytes cookie = 3; // YSON-serialized TDistributedWriteCookie + + // TParticipantTableWriterOptions contents... +} + +message TRspParticipantWriteTable +{ + required bytes cookie = 1; // YSON-serialized TDistributedWriteCookie +} |