aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-08-23 11:29:08 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-08-23 11:40:02 +0300
commitc872ef1cd2cb3bdc9dadb23fe2a72a24de87a32c (patch)
tree5f268e44c021acbb38e91eb2bbd522204db0a241
parent7e4fd8b537c4391893cf62e3feaba46b2e69750e (diff)
downloadydb-c872ef1cd2cb3bdc9dadb23fe2a72a24de87a32c.tar.gz
YT-22307: Empty template for dist write api
e9554dea04d2a2fb0c9d8cbee012afa88382e715
-rw-r--r--yt/yt/client/api/client.h3
-rw-r--r--yt/yt/client/api/delegating_client.h16
-rw-r--r--yt/yt/client/api/delegating_transaction.cpp10
-rw-r--r--yt/yt/client/api/delegating_transaction.h9
-rw-r--r--yt/yt/client/api/distributed_table_client.h51
-rw-r--r--yt/yt/client/api/distributed_table_sessions.cpp17
-rw-r--r--yt/yt/client/api/distributed_table_sessions.h41
-rw-r--r--yt/yt/client/api/public.h3
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h6
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp33
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.h9
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp28
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h5
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp86
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.h54
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp20
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h8
-rw-r--r--yt/yt/client/driver/distributed_table_commands.cpp139
-rw-r--r--yt/yt/client/driver/distributed_table_commands.h75
-rw-r--r--yt/yt/client/driver/driver.cpp4
-rw-r--r--yt/yt/client/driver/table_commands.cpp28
-rw-r--r--yt/yt/client/driver/table_commands.h6
-rw-r--r--yt/yt/client/driver/ya.make1
-rw-r--r--yt/yt/client/federated/client.cpp5
-rw-r--r--yt/yt/client/hedging/hedging.cpp3
-rw-r--r--yt/yt/client/unittests/mock/client.h16
-rw-r--r--yt/yt/client/unittests/mock/transaction.h11
-rw-r--r--yt/yt/client/ya.make1
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto44
29 files changed, 725 insertions, 7 deletions
diff --git a/yt/yt/client/api/client.h b/yt/yt/client/api/client.h
index b4ff0ffe17..d65619c86e 100644
--- a/yt/yt/client/api/client.h
+++ b/yt/yt/client/api/client.h
@@ -4,6 +4,7 @@
#include "accounting_client.h"
#include "admin_client.h"
#include "cypress_client.h"
+#include "distributed_table_client.h"
#include "etc_client.h"
#include "file_client.h"
#include "journal_client.h"
@@ -37,6 +38,7 @@ struct IClientBase
, public IJournalClientBase
, public IQueueClientBase
, public IEtcClientBase
+ , public IDistributedTableClientBase
{
virtual IConnectionPtr GetConnection() = 0;
};
@@ -70,6 +72,7 @@ struct IClient
, public IEtcClient
, public NBundleControllerClient::IBundleControllerClient
, public IFlowClient
+ , public IDistributedTableClient
{
//! Terminates all channels.
//! Aborts all pending uncommitted transactions.
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index a7982186ac..1ac5ce7597 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -840,6 +840,22 @@ public:
const TGetFlowViewOptions& options),
(pipelinePath, viewPath, options))
+ // Distributed client
+ DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options),
+ (path, options))
+
+ DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, (
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options),
+ (std::move(session), options))
+
+ DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (
+ const TDistributedWriteCookiePtr& cookie,
+ const TParticipantTableWriterOptions& options),
+ (cookie, options))
+
#undef DELEGATE_METHOD
protected:
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp
index 3e539bf9f7..156eb1dde7 100644
--- a/yt/yt/client/api/delegating_transaction.cpp
+++ b/yt/yt/client/api/delegating_transaction.cpp
@@ -312,6 +312,16 @@ DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
const TPushQueueProducerOptions& options),
(producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options))
+DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options),
+ (path, options))
+
+DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, (
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options),
+ (std::move(session), options))
+
#undef DELEGATE_METHOD
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h
index 533a048204..e0a71dee00 100644
--- a/yt/yt/client/api/delegating_transaction.h
+++ b/yt/yt/client/api/delegating_transaction.h
@@ -256,6 +256,15 @@ public:
const std::vector<TSharedRef>& serializedRows,
const TPushQueueProducerOptions& options) override;
+ // Distributed table client
+ TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options = {}) override;
+
+ TFuture<void> FinishDistributedWriteSession(
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options = {}) override;
+
protected:
const ITransactionPtr Underlying_;
};
diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h
new file mode 100644
index 0000000000..9ac983371c
--- /dev/null
+++ b/yt/yt/client/api/distributed_table_client.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include "table_client.h"
+
+#include <yt/yt/client/table_client/config.h>
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TDistributedWriteSessionStartOptions
+ : public TTransactionalOptions
+{ };
+
+struct TDistributedWriteSessionFinishOptions
+ : public TTransactionalOptions
+{ };
+
+struct TParticipantTableWriterOptions
+ : public TTableWriterOptions
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IDistributedTableClientBase
+{
+ virtual ~IDistributedTableClientBase() = default;
+
+ virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options = {}) = 0;
+
+ virtual TFuture<void> FinishDistributedWriteSession(
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options = {}) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IDistributedTableClient
+{
+ virtual ~IDistributedTableClient() = default;
+
+ virtual TFuture<ITableWriterPtr> CreateParticipantTableWriter(
+ const TDistributedWriteCookiePtr& cookie,
+ const TParticipantTableWriterOptions& options = {}) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_sessions.cpp b/yt/yt/client/api/distributed_table_sessions.cpp
new file mode 100644
index 0000000000..b454ad476b
--- /dev/null
+++ b/yt/yt/client/api/distributed_table_sessions.cpp
@@ -0,0 +1,17 @@
+#include "distributed_table_sessions.h"
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TDistributedWriteCookie::Register(TRegistrar /*registrar*/)
+{ }
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TDistributedWriteSession::Register(TRegistrar /*registrar*/)
+{ }
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_sessions.h b/yt/yt/client/api/distributed_table_sessions.h
new file mode 100644
index 0000000000..19b3c0f5c0
--- /dev/null
+++ b/yt/yt/client/api/distributed_table_sessions.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/core/ytree/yson_struct.h>
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDistributedWriteCookie
+ : public NYTree::TYsonStruct
+{
+public:
+ REGISTER_YSON_STRUCT(TDistributedWriteCookie);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TDistributedWriteCookie);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDistributedWriteSession
+ : public NYTree::TYsonStruct
+{
+public:
+ REGISTER_YSON_STRUCT(TDistributedWriteSession);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index fe0a173f31..f67dbed45b 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -184,6 +184,9 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest)
DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter)
+DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession)
+DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie)
+
////////////////////////////////////////////////////////////////////////////////
inline const TString ClusterNamePath("//sys/@cluster_name");
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index 91fe45fc3d..14edf0f3c2 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -204,6 +204,12 @@ public:
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AlterQuery);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetQueryTrackerInfo);
+ // Distributed table client
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartDistributedWriteSession);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishDistributedWriteSession);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable,
+ .SetStreamingEnabled(true));
+
// Misc
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CheckClusterLiveness);
};
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index 2711f8a37b..90b88789c3 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -12,6 +12,7 @@
#include "table_writer.h"
#include "transaction.h"
+#include <yt/yt/client/api/distributed_table_sessions.h>
#include <yt/yt/client/api/file_reader.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
@@ -788,6 +789,38 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter(
////////////////////////////////////////////////////////////////////////////////
+TFuture<TDistributedWriteSessionPtr> TClientBase::StartDistributedWriteSession(
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options)
+{
+ using TRsp = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspStartDistributedWriteSession>>;
+
+ auto proxy = CreateApiServiceProxy();
+
+ auto req = proxy.StartDistributedWriteSession();
+ FillRequest(req.Get(), path, options);
+
+ return req->Invoke()
+ .ApplyUnique(BIND([] (TRsp&& result) -> TDistributedWriteSessionPtr {
+ return ConvertTo<TDistributedWriteSessionPtr>(TYsonString(result->session()));
+ }));
+}
+
+TFuture<void> TClientBase::FinishDistributedWriteSession(
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options)
+{
+ auto proxy = CreateApiServiceProxy();
+
+ auto req = proxy.FinishDistributedWriteSession();
+
+ FillRequest(req.Get(), std::move(session), options);
+
+ return req->Invoke().AsVoid();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
TFuture<TUnversionedLookupRowsResult> TClientBase::LookupRows(
const TYPath& path,
TNameTablePtr nameTable,
diff --git a/yt/yt/client/api/rpc_proxy/client_base.h b/yt/yt/client/api/rpc_proxy/client_base.h
index 5a85160692..2ccdb1adb4 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.h
+++ b/yt/yt/client/api/rpc_proxy/client_base.h
@@ -183,6 +183,15 @@ public:
TFuture<NApi::ITableWriterPtr> CreateTableWriter(
const NYPath::TRichYPath& path,
const NApi::TTableWriterOptions& options) override;
+
+ // Distributed table client
+ TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options) override;
+
+ TFuture<void> FinishDistributedWriteSession(
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options) override;
};
DEFINE_REFCOUNTED_TYPE(TClientBase)
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index beb3b391ec..567505e05f 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -4,6 +4,7 @@
#include "helpers.h"
#include "private.h"
#include "table_mount_cache.h"
+#include "table_writer.h"
#include "timestamp_provider.h"
#include "transaction.h"
@@ -51,6 +52,7 @@ using NYT::FromProto;
using namespace NAuth;
using namespace NChaosClient;
using namespace NChunkClient;
+using namespace NConcurrency;
using namespace NObjectClient;
using namespace NRpc;
using namespace NScheduler;
@@ -739,6 +741,32 @@ TFuture<void> TClient::AlterReplicationCard(
return req->Invoke().As<void>();
}
+TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter(
+ const TDistributedWriteCookiePtr& cookie,
+ const TParticipantTableWriterOptions& options)
+{
+ auto proxy = CreateApiServiceProxy();
+ auto req = proxy.ParticipantWriteTable();
+ InitStreamingRequest(*req);
+
+ FillRequest(req.Get(), cookie, options);
+
+ auto schema = New<TTableSchema>();
+ return NRpc::CreateRpcClientOutputStream(
+ std::move(req),
+ BIND ([=] (const TSharedRef& metaRef) {
+ NApi::NRpcProxy::NProto::TWriteTableMeta meta;
+ if (!TryDeserializeProto(&meta, metaRef)) {
+ THROW_ERROR_EXCEPTION("Failed to deserialize schema for participant table writer");
+ }
+
+ FromProto(schema.Get(), meta.schema());
+ }))
+ .Apply(BIND([=] (IAsyncZeroCopyOutputStreamPtr outputStream) {
+ return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema));
+ })).As<ITableWriterPtr>();
+}
+
TFuture<IQueueRowsetPtr> TClient::PullQueue(
const TRichYPath& queuePath,
i64 offset,
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index f6020cd0af..ffce8539b5 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -129,6 +129,11 @@ public:
NChaosClient::TReplicationCardId replicationCardId,
const TAlterReplicationCardOptions& options = {}) override;
+ // Distributed table client
+ TFuture<ITableWriterPtr> CreateParticipantTableWriter(
+ const TDistributedWriteCookiePtr& cookie,
+ const TParticipantTableWriterOptions& options) override;
+
// Queues.
TFuture<NQueueClient::IQueueRowsetPtr> PullQueue(
const NYPath::TRichYPath& queuePath,
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index d5a4beb180..9ebde85ea7 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -1911,6 +1911,92 @@ NQueryTrackerClient::EQueryState ConvertQueryStateFromProto(
////////////////////////////////////////////////////////////////////////////////
+void FillRequest(
+ TReqStartDistributedWriteSession* req,
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options)
+{
+ Y_UNUSED(req, path, options);
+}
+
+void FromProto(
+ TDistributedWriteSessionStartOptions* mutableOptions,
+ const TReqStartDistributedWriteSession& req)
+{
+ Y_UNUSED(req, mutableOptions);
+}
+
+void FromProto(
+ TDistributedWriteSession* mutableSession,
+ TRspStartDistributedWriteSession&& rsp)
+{
+ Y_UNUSED(mutableSession, rsp);
+}
+
+void ToProto(
+ TRspStartDistributedWriteSession* rsp,
+ const TDistributedWriteSessionPtr& session)
+{
+ Y_UNUSED(rsp, session);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void FillRequest(
+ TReqFinishDistributedWriteSession* req,
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options)
+{
+ Y_UNUSED(req, session, options);
+}
+
+void FromProto(
+ TDistributedWriteSessionFinishOptions* mutableOptions,
+ const TReqFinishDistributedWriteSession& req)
+{
+ Y_UNUSED(req, mutableOptions);
+}
+
+void FromProto(
+ TDistributedWriteSession* mutableSession,
+ const TReqFinishDistributedWriteSession& req)
+{
+ Y_UNUSED(mutableSession, req);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void FillRequest(
+ TReqParticipantWriteTable* req,
+ const TDistributedWriteCookiePtr& cookie,
+ const TParticipantTableWriterOptions& options)
+{
+ Y_UNUSED(req, cookie, options);
+}
+
+void FromProto(
+ TParticipantTableWriterOptions* mutableOptions,
+ const TReqParticipantWriteTable& req)
+{
+ Y_UNUSED(req, mutableOptions);
+}
+
+void FromProto(
+ TDistributedWriteCookie* cookie,
+ const TReqParticipantWriteTable& req)
+{
+ Y_UNUSED(cookie, req);
+}
+
+void ToProto(
+ TRspParticipantWriteTable* rsp,
+ const TDistributedWriteCookiePtr& cookie)
+{
+ Y_UNUSED(rsp, cookie);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NProto
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h
index 9d398923b0..9027337359 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.h
+++ b/yt/yt/client/api/rpc_proxy/helpers.h
@@ -271,6 +271,60 @@ NProto::EQueryState ConvertQueryStateToProto(
NQueryTrackerClient::EQueryState ConvertQueryStateFromProto(
NProto::EQueryState proto);
+
+////////////////////////////////////////////////////////////////////////////////
+
+void FillRequest(
+ TReqStartDistributedWriteSession* req,
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options);
+
+void FromProto(
+ TDistributedWriteSessionStartOptions* mutableOptions,
+ const TReqStartDistributedWriteSession& req);
+
+void FromProto(
+ TDistributedWriteSession* mutableSession,
+ TRspStartDistributedWriteSession&& rsp);
+
+void ToProto(
+ TRspStartDistributedWriteSession* rsp,
+ const TDistributedWriteSessionPtr& session);
+
+////////////////////////////////////////////////////////////////////////////////
+
+void FillRequest(
+ TReqFinishDistributedWriteSession* req,
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options);
+
+void FromProto(
+ TDistributedWriteSessionFinishOptions* mutableOptions,
+ const TReqFinishDistributedWriteSession& req);
+
+void FromProto(
+ TDistributedWriteSession* mutableSession,
+ const TReqFinishDistributedWriteSession& req);
+
+////////////////////////////////////////////////////////////////////////////////
+
+void FillRequest(
+ TReqParticipantWriteTable* req,
+ const TDistributedWriteCookiePtr& cookie,
+ const TParticipantTableWriterOptions& options);
+
+void FromProto(
+ TParticipantTableWriterOptions* mutableOptions,
+ const TReqParticipantWriteTable& req);
+
+void FromProto(
+ TDistributedWriteCookie* cookie,
+ const TReqParticipantWriteTable& req);
+
+void ToProto(
+ TRspParticipantWriteTable* rsp,
+ const TDistributedWriteCookiePtr& cookie);
+
} // namespace NProto
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index 6d32ba3995..e719d41f8c 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -921,6 +921,26 @@ IJournalWriterPtr TTransaction::CreateJournalWriter(
PatchTransactionId(options));
}
+TFuture<TDistributedWriteSessionPtr> TTransaction::StartDistributedWriteSession(
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options)
+{
+ ValidateActive();
+ return Client_->StartDistributedWriteSession(
+ path,
+ PatchTransactionId(options));
+}
+
+TFuture<void> TTransaction::FinishDistributedWriteSession(
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options)
+{
+ ValidateActive();
+ return Client_->FinishDistributedWriteSession(
+ std::move(session),
+ PatchTransactionId(options));
+}
+
TFuture<void> TTransaction::DoAbort(
TGuard<NThreading::TSpinLock>* guard,
const TTransactionAbortOptions& /*options*/)
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index 636b8fe2e2..3faa58d708 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -235,6 +235,14 @@ public:
const NYPath::TYPath& path,
const NApi::TJournalWriterOptions& options) override;
+ TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options = {}) override;
+
+ TFuture<void> FinishDistributedWriteSession(
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options = {}) override;
+
// Custom methods.
//! Returns proxy address this transaction is sticking to.
diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp
new file mode 100644
index 0000000000..5d41f1abc8
--- /dev/null
+++ b/yt/yt/client/driver/distributed_table_commands.cpp
@@ -0,0 +1,139 @@
+#include "distributed_table_commands.h"
+#include "config.h"
+#include "helpers.h"
+
+#include <yt/yt/client/api/distributed_table_sessions.h>
+
+// #include <yt/yt/client/api/rowset.h>
+// #include <yt/yt/client/api/skynet.h>
+
+// #include <yt/yt/client/chaos_client/replication_card_serialization.h>
+
+#include <yt/yt/client/formats/config.h>
+#include <yt/yt/client/formats/parser.h>
+
+// #include <yt/yt/client/table_client/adapters.h>
+// #include <yt/yt/client/table_client/blob_reader.h>
+// #include <yt/yt/client/table_client/columnar_statistics.h>
+// #include <yt/yt/client/table_client/row_buffer.h>
+// #include <yt/yt/client/table_client/table_consumer.h>
+// #include <yt/yt/client/table_client/table_output.h>
+// #include <yt/yt/client/table_client/unversioned_writer.h>
+// #include <yt/yt/client/table_client/versioned_writer.h>
+// #include <yt/yt/client/table_client/wire_protocol.h>
+
+// #include <yt/yt/client/tablet_client/table_mount_cache.h>
+
+#include <yt/yt/client/ypath/public.h>
+
+#include <yt/yt/library/formats/format.h>
+
+#include <yt/yt/core/concurrency/scheduler_api.h>
+// #include <yt/yt/core/misc/finally.h>
+
+#include <yt/yt/core/ytree/convert.h>
+
+namespace NYT::NDriver {
+
+using namespace NApi;
+using namespace NConcurrency;
+using namespace NFormats;
+using namespace NTracing;
+using namespace NYTree;
+using namespace NYson;
+using namespace NYPath;
+
+////////////////////////////////////////////////////////////////////////////////
+
+// namespace {
+
+// NLogging::TLogger WithCommandTag(
+// const NLogging::TLogger& logger,
+// const ICommandContextPtr& context)
+// {
+// return logger.WithTag("Command: %v",
+// context->Request().CommandName);
+// }
+
+// } // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TStartDistributedWriteSessionCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("path", &TThis::Path);
+}
+
+// -> DistributedWriteSession
+void TStartDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context)
+{
+ auto transaction = AttachTransaction(context, /*required*/ false);
+
+ auto session = WaitFor(context->GetClient()->StartDistributedWriteSession(
+ Path,
+ Options));
+
+ ProduceOutput(context, [session = std::move(session)] (IYsonConsumer* consumer) {
+ Serialize(session, consumer);
+ });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TFinishDistributedWriteSessionCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("session", &TThis::Session);
+}
+
+// -> Nothing
+void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context)
+{
+ auto transaction = AttachTransaction(context, /*required*/ false);
+
+ auto session = ConvertTo<TDistributedWriteSessionPtr>(Session);
+
+ WaitFor(context->GetClient()->FinishDistributedWriteSession(
+ std::move(session),
+ Options))
+ .ThrowOnError();
+
+ ProduceEmptyOutput(context);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TParticipantWriteTableCommand::Execute(ICommandContextPtr context)
+{
+ TTypedCommand<NApi::TParticipantTableWriterOptions>::Execute(std::move(context));
+}
+
+void TParticipantWriteTableCommand::Register(TRegistrar /*registrar*/)
+{ }
+
+TFuture<NApi::ITableWriterPtr> TParticipantWriteTableCommand::CreateTableWriter(
+ const ICommandContextPtr& context) const
+{
+ PutMethodInfoInTraceContext("participant_write_table");
+
+ return context
+ ->GetClient()
+ ->CreateParticipantTableWriter(
+ StaticPointerCast<TDistributedWriteCookie>(ResultingCookie),
+ TTypedCommand<TParticipantTableWriterOptions>::Options);
+}
+
+// -> Cookie
+void TParticipantWriteTableCommand::DoExecute(ICommandContextPtr context)
+{
+ auto cookie = ConvertTo<TDistributedWriteCookiePtr>(Cookie);
+ ResultingCookie = StaticPointerCast<TRefCounted>(std::move(cookie));
+
+ DoExecuteImpl(context);
+ ProduceOutput(context, [cookie = std::move(ResultingCookie)] (IYsonConsumer* consumer) {
+ Serialize(StaticPointerCast<TDistributedWriteCookie>(cookie), consumer);
+ });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDriver
diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h
new file mode 100644
index 0000000000..ea3f7a9b7b
--- /dev/null
+++ b/yt/yt/client/driver/distributed_table_commands.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include "table_commands.h"
+
+#include <yt/yt/client/formats/format.h>
+
+#include <yt/yt/client/ypath/rich.h>
+
+namespace NYT::NDriver {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// -> DistributedWriteSession
+class TStartDistributedWriteSessionCommand
+ : public TTypedCommand<NApi::TDistributedWriteSessionStartOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TStartDistributedWriteSessionCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TRichYPath Path;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// -> Nothing
+class TFinishDistributedWriteSessionCommand
+ : public TTypedCommand<NApi::TDistributedWriteSessionFinishOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TFinishDistributedWriteSessionCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYTree::INodePtr Session;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// -> Cookie
+class TParticipantWriteTableCommand
+ : public TTypedCommand<NApi::TParticipantTableWriterOptions>
+ , private TWriteTableCommand
+{
+public:
+ // Shadow normal execute in order to fix
+ // ambiguity in dispatch.
+ void Execute(ICommandContextPtr context) override;
+
+ REGISTER_YSON_STRUCT_LITE(TParticipantWriteTableCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ using TBase = TWriteTableCommand;
+
+ NYTree::INodePtr Cookie;
+ TRefCountedPtr ResultingCookie;
+
+ TFuture<NApi::ITableWriterPtr> CreateTableWriter(
+ const ICommandContextPtr& context) const override;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDriver
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index bba7ffbabd..561eb836fb 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -7,6 +7,7 @@
#include "command.h"
#include "config.h"
#include "cypress_commands.h"
+#include "distributed_table_commands.h"
#include "etc_commands.h"
#include "file_commands.h"
#include "journal_commands.h"
@@ -394,6 +395,9 @@ public:
REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false);
REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false);
REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false);
+ REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false);
+ REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false);
+ REGISTER_ALL(TParticipantWriteTableCommand, "participant_write_table", Tabular, Structured, true, true );
}
#undef REGISTER
diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp
index a45768580b..5cbfa54a46 100644
--- a/yt/yt/client/driver/table_commands.cpp
+++ b/yt/yt/client/driver/table_commands.cpp
@@ -47,7 +47,9 @@ using namespace NTracing;
////////////////////////////////////////////////////////////////////////////////
-static NLogging::TLogger WithCommandTag(
+namespace {
+
+NLogging::TLogger WithCommandTag(
const NLogging::TLogger& logger,
const ICommandContextPtr& context)
{
@@ -55,6 +57,8 @@ static NLogging::TLogger WithCommandTag(
context->Request().CommandName);
}
+} // namespace
+
////////////////////////////////////////////////////////////////////////////////
void TReadTableCommand::Register(TRegistrar registrar)
@@ -261,7 +265,17 @@ void TWriteTableCommand::Register(TRegistrar registrar)
.Default(1_MB);
}
-void TWriteTableCommand::DoExecute(ICommandContextPtr context)
+TFuture<ITableWriterPtr> TWriteTableCommand::CreateTableWriter(
+ const ICommandContextPtr& context) const
+{
+ PutMethodInfoInTraceContext("write_table");
+
+ return context->GetClient()->CreateTableWriter(
+ Path,
+ Options);
+}
+
+void TWriteTableCommand::DoExecuteImpl(const ICommandContextPtr& context)
{
auto transaction = AttachTransaction(context, false);
@@ -273,11 +287,7 @@ void TWriteTableCommand::DoExecute(ICommandContextPtr context)
Options.PingAncestors = true;
Options.Config = config;
- PutMethodInfoInTraceContext("write_table");
-
- auto apiWriter = WaitFor(context->GetClient()->CreateTableWriter(
- Path,
- Options))
+ auto apiWriter = WaitFor(CreateTableWriter(context))
.ValueOrThrow();
auto schemalessWriter = CreateSchemalessFromApiWriterAdapter(std::move(apiWriter));
@@ -298,7 +308,11 @@ void TWriteTableCommand::DoExecute(ICommandContextPtr context)
WaitFor(schemalessWriter->Close())
.ThrowOnError();
+}
+void TWriteTableCommand::DoExecute(ICommandContextPtr context)
+{
+ DoExecuteImpl(context);
ProduceEmptyOutput(context);
}
diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h
index 744d0220b2..2e1e8e22d5 100644
--- a/yt/yt/client/driver/table_commands.h
+++ b/yt/yt/client/driver/table_commands.h
@@ -82,6 +82,12 @@ public:
static void Register(TRegistrar registrar);
+protected:
+ virtual TFuture<NApi::ITableWriterPtr> CreateTableWriter(
+ const ICommandContextPtr& context) const;
+
+ void DoExecuteImpl(const ICommandContextPtr& context);
+
private:
NYPath::TRichYPath Path;
NYTree::INodePtr TableWriter;
diff --git a/yt/yt/client/driver/ya.make b/yt/yt/client/driver/ya.make
index 9604eb5848..f33485ce8f 100644
--- a/yt/yt/client/driver/ya.make
+++ b/yt/yt/client/driver/ya.make
@@ -10,6 +10,7 @@ SRCS(
command.cpp
config.cpp
cypress_commands.cpp
+ distributed_table_commands.cpp
driver.cpp
etc_commands.cpp
file_commands.cpp
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 771ca3e3ac..14375369fc 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -233,6 +233,8 @@ public:
UNIMPLEMENTED_METHOD(IFileWriterPtr, CreateFileWriter, (const NYPath::TRichYPath&, const TFileWriterOptions&));
UNIMPLEMENTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const NYPath::TYPath&, const TJournalReaderOptions&));
UNIMPLEMENTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const NYPath::TYPath&, const TJournalWriterOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
private:
const TClientPtr Client_;
@@ -471,6 +473,9 @@ public:
UNIMPLEMENTED_METHOD(TFuture<void>, PausePipeline, (const NYPath::TYPath&, const TPausePipelineOptions&));
UNIMPLEMENTED_METHOD(TFuture<TPipelineState>, GetPipelineState, (const NYPath::TYPath&, const TGetPipelineStateOptions&));
UNIMPLEMENTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&));
private:
friend class TTransaction;
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index ab41ce4c2f..d78aa78231 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -113,6 +113,9 @@ public:
UNSUPPORTED_METHOD(IFileWriterPtr, CreateFileWriter, (const TRichYPath&, const TFileWriterOptions&));
UNSUPPORTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const TYPath&, const TJournalReaderOptions&));
UNSUPPORTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const TYPath&, const TJournalWriterOptions&));
+ UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
+ UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
+ UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&));
// IClient methods.
// Unsupported methods.
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index 39d7aa777b..906e4427b3 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -2,6 +2,7 @@
#include <yt/yt/client/api/connection.h>
#include <yt/yt/client/api/client.h>
+#include <yt/yt/client/api/distributed_table_sessions.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
#include <yt/yt/client/api/journal_writer.h>
@@ -825,6 +826,21 @@ public:
const TGetFlowViewOptions& options),
(override));
+ MOCK_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options),
+ (override));
+
+ MOCK_METHOD(TFuture<void>, FinishDistributedWriteSession, (
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options),
+ (override));
+
+ MOCK_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (
+ const TDistributedWriteCookiePtr& cookie,
+ const TParticipantTableWriterOptions& options),
+ (override));
+
private:
NTabletClient::ITableMountCachePtr TableMountCache_;
NTransactionClient::ITimestampProviderPtr TimestampProvider_;
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index 5d4ada47f9..9bb296dbe8 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -1,5 +1,6 @@
#pragma once
+#include <yt/yt/client/api/distributed_table_sessions.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
#include <yt/yt/client/api/journal_writer.h>
@@ -150,6 +151,16 @@ public:
const NYPath::TYPath& path,
const TJournalWriterOptions& options), (override));
+ MOCK_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
+ const NYPath::TRichYPath& path,
+ const TDistributedWriteSessionStartOptions& options),
+ (override));
+
+ MOCK_METHOD(TFuture<void>, FinishDistributedWriteSession, (
+ TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionFinishOptions& options),
+ (override));
+
// ITransaction
IClientPtr Client;
NTransactionClient::ETransactionType Type;
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index 8dcc45c9e1..4eda047917 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -13,6 +13,7 @@ SRCS(
api/client_cache.cpp
api/delegating_client.cpp
api/delegating_transaction.cpp
+ api/distributed_table_sessions.cpp
api/etc_client.cpp
api/journal_client.cpp
api/operation_client.cpp
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 81dcfaeba2..d43f4a22ca 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -3364,3 +3364,47 @@ message TRspGetQueryTrackerInfo
required bytes supported_features = 2; // YSON
repeated string access_control_objects = 3;
}
+
+////////////////////////////////////////////////////////////////////////////////
+// NB(arkady-e1ppa): Under construction.
+// Distributed table client
+////////////////////////////////////////////////////////////////////////////////
+
+message TReqStartDistributedWriteSession
+{
+ optional string path = 1;
+
+ // TDistributedWriteSessionStartOptions contents...
+}
+
+message TRspStartDistributedWriteSession
+{
+ required bytes session = 1; // YSON-serialized TDistributedWriteSession
+}
+
+message TReqFinishDistributedWriteSession
+{
+ required bytes session = 2; // YSON-serialized TDistributedWriteSession
+
+ // TDistributedWriteSessionFinishOptions contents...
+}
+
+message TRspFinishDistributedWriteSession
+{
+}
+
+message TReqParticipantWriteTable
+{
+ optional bytes config = 1; // YSON-serialized TTableWriterConfig
+
+ optional bytes format = 2; // YSON-serialized TFormat
+
+ required bytes cookie = 3; // YSON-serialized TDistributedWriteCookie
+
+ // TParticipantTableWriterOptions contents...
+}
+
+message TRspParticipantWriteTable
+{
+ required bytes cookie = 1; // YSON-serialized TDistributedWriteCookie
+}