aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-10-14 23:33:03 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-10-14 23:43:57 +0300
commit9cd00d5f998588423d9ff891fbc504d9cd09bb31 (patch)
treefac56bcb6929d7b653b1a41800f4943fb37dc264
parent608385daa2cf9d3ccca4e8d2354d44451cb58a85 (diff)
downloadydb-9cd00d5f998588423d9ff891fbc504d9cd09bb31.tar.gz
YT-22307: Implement distributed write protocol (no signatures yet)
MaxChildrenPerAttachRequest default value is the default from the same parameter in operation controller. commit_hash:e977687ef81e62ad75d8eb57b156e4efe4ee132d
-rw-r--r--yt/yt/client/api/delegating_client.h6
-rw-r--r--yt/yt/client/api/distributed_table_client.h23
-rw-r--r--yt/yt/client/api/distributed_table_session.cpp197
-rw-r--r--yt/yt/client/api/distributed_table_session.h158
-rw-r--r--yt/yt/client/api/distributed_table_sessions.cpp17
-rw-r--r--yt/yt/client/api/distributed_table_sessions.h41
-rw-r--r--yt/yt/client/api/public.h3
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp18
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h6
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp94
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.h45
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp2
-rw-r--r--yt/yt/client/driver/distributed_table_commands.cpp56
-rw-r--r--yt/yt/client/driver/distributed_table_commands.h6
-rw-r--r--yt/yt/client/driver/driver.cpp6
-rw-r--r--yt/yt/client/federated/client.cpp2
-rw-r--r--yt/yt/client/hedging/hedging.cpp2
-rw-r--r--yt/yt/client/unittests/mock/client.h8
-rw-r--r--yt/yt/client/unittests/mock/transaction.h2
-rw-r--r--yt/yt/client/ya.make2
-rw-r--r--yt/yt/core/rpc/stream-inl.h25
-rw-r--r--yt/yt/core/rpc/stream.h7
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto21
25 files changed, 519 insertions, 232 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index 5b5e9accf3..6f85657139 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -856,9 +856,9 @@ public:
const TDistributedWriteSessionFinishOptions& options),
(std::move(session), options))
- DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options),
+ DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options),
(cookie, options))
// Shuffle Service
diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h
index 9ac983371c..d1bca5bc3a 100644
--- a/yt/yt/client/api/distributed_table_client.h
+++ b/yt/yt/client/api/distributed_table_client.h
@@ -4,6 +4,8 @@
#include <yt/yt/client/table_client/config.h>
+#include <library/cpp/yt/misc/non_null_ptr.h>
+
namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
@@ -13,10 +15,11 @@ struct TDistributedWriteSessionStartOptions
{ };
struct TDistributedWriteSessionFinishOptions
- : public TTransactionalOptions
-{ };
+{
+ int MaxChildrenPerAttachRequest = 10'000;
+};
-struct TParticipantTableWriterOptions
+struct TFragmentTableWriterOptions
: public TTableWriterOptions
{ };
@@ -24,6 +27,7 @@ struct TParticipantTableWriterOptions
struct IDistributedTableClientBase
{
+public:
virtual ~IDistributedTableClientBase() = default;
virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
@@ -33,6 +37,13 @@ struct IDistributedTableClientBase
virtual TFuture<void> FinishDistributedWriteSession(
TDistributedWriteSessionPtr session,
const TDistributedWriteSessionFinishOptions& options = {}) = 0;
+
+ // Helper used to implement FinishDistributedWriteSession efficiently
+ // without compromising privacy of session fields.
+ // defined in yt/yt/client/api/distributed_table_session.cpp.
+ void* GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session);
+ // Used in chunk writer for results recording
+ void RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult);
};
////////////////////////////////////////////////////////////////////////////////
@@ -41,9 +52,9 @@ struct IDistributedTableClient
{
virtual ~IDistributedTableClient() = default;
- virtual TFuture<ITableWriterPtr> CreateParticipantTableWriter(
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options = {}) = 0;
+ virtual TFuture<ITableWriterPtr> CreateFragmentTableWriter(
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/distributed_table_session.cpp b/yt/yt/client/api/distributed_table_session.cpp
new file mode 100644
index 0000000000..d67998b9d2
--- /dev/null
+++ b/yt/yt/client/api/distributed_table_session.cpp
@@ -0,0 +1,197 @@
+#include "distributed_table_session.h"
+
+#include "client.h"
+#include "transaction.h"
+
+namespace NYT::NApi {
+
+using namespace NYPath;
+using namespace NObjectClient;
+using namespace NTableClient;
+using namespace NTransactionClient;
+using namespace NYTree;
+using namespace NCypressClient;
+using namespace NChunkClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TTableWriterPatchInfo::TTableWriterPatchInfo(
+ TRichYPath richPath,
+ TObjectId objectId,
+ TCellTag externalCellTag,
+ TMasterTableSchemaId chunkSchemaId,
+ TTableSchemaPtr chunkSchema,
+ std::optional<TLegacyOwningKey> writerLastKey,
+ int maxHeavyColumns,
+ TTimestamp timestamp,
+ const IAttributeDictionary& tableAttributes)
+ : TTableWriterPatchInfo()
+{
+ ObjectId = objectId;
+ RichPath = std::move(richPath);
+
+ ChunkSchemaId = chunkSchemaId;
+ ChunkSchema = std::move(chunkSchema);
+
+ WriterLastKey = writerLastKey;
+ MaxHeavyColumns = maxHeavyColumns;
+
+ TableAttributes = tableAttributes.ToMap();
+
+ ExternalCellTag = externalCellTag;
+
+ Timestamp = timestamp;
+}
+
+void TTableWriterPatchInfo::Register(TRegistrar registrar)
+{
+ registrar.Parameter("table_id", &TThis::ObjectId);
+ registrar.Parameter("table_path", &TThis::RichPath);
+
+ registrar.Parameter("chunk_schema_id", &TThis::ChunkSchemaId);
+ registrar.Parameter("chunk_schema", &TThis::ChunkSchema);
+
+ registrar.Parameter("writer_last_key", &TThis::WriterLastKey);
+ registrar.Parameter("max_heavy_columns", &TThis::MaxHeavyColumns);
+
+ registrar.Parameter("table_attributes", &TThis::TableAttributes);
+
+ registrar.Parameter("external_cell_tag", &TThis::ExternalCellTag);
+
+ registrar.Parameter("timestamp", &TThis::Timestamp);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TFragmentWriteResult::Register(TRegistrar registrar)
+{
+ registrar.Parameter("min_boundary_key", &TThis::MinBoundaryKey);
+ registrar.Parameter("max_boundary_key", &TThis::MaxBoundaryKey);
+ registrar.Parameter("chunk_list_id", &TThis::ChunkListId);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+const TTableWriterPatchInfo& TFragmentWriteCookie::GetPatchInfo() const
+{
+ return PatchInfo_;
+}
+
+TTransactionId TFragmentWriteCookie::GetMainTransactionId() const
+{
+ return MainTxId_;
+}
+
+TTransactionId TFragmentWriteCookie::GetUploadTransactionId() const
+{
+ return UploadTxId_;
+}
+
+void TFragmentWriteCookie::Register(TRegistrar registrar)
+{
+ registrar.Parameter("session_id", &TThis::Id_);
+
+ registrar.Parameter("tx_id", &TThis::MainTxId_);
+ registrar.Parameter("upload_tx_id", &TThis::UploadTxId_);
+
+ registrar.Parameter("patch_info", &TThis::PatchInfo_);
+
+ registrar.Parameter("writer_results", &TThis::WriteResults_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TDistributedWriteSession::TDistributedWriteSession(
+ TTransactionId mainTxId,
+ TTransactionId uploadTxId,
+ TChunkListId rootChunkListId,
+ TTableWriterPatchInfo patchInfo)
+ : TDistributedWriteSession()
+{
+ Id_ = TDistributedWriteSessionId(TGuid::Create());
+ MainTxId_ = mainTxId;
+ UploadTxId_ = uploadTxId;
+
+ RootChunkListId_ = rootChunkListId;
+
+ PatchInfo_ = std::move(patchInfo);
+
+ // Signatures?
+}
+
+TTransactionId TDistributedWriteSession::GetMainTransactionId() const
+{
+ return MainTxId_;
+}
+
+TTransactionId TDistributedWriteSession::GetUploadTransactionId() const
+{
+ return UploadTxId_;
+}
+
+const TTableWriterPatchInfo& TDistributedWriteSession::GetPatchInfo() const Y_LIFETIME_BOUND
+{
+ return PatchInfo_;
+}
+
+TChunkListId TDistributedWriteSession::GetRootChunkListId() const
+{
+ return RootChunkListId_;
+}
+
+TFragmentWriteCookiePtr TDistributedWriteSession::GiveCookie()
+{
+ auto cookie = New<TFragmentWriteCookie>();
+ cookie->Id_ = Id_;
+ cookie->MainTxId_ = MainTxId_;
+ cookie->UploadTxId_ = UploadTxId_;
+ cookie->PatchInfo_ = PatchInfo_;
+
+ return cookie;
+}
+
+void TDistributedWriteSession::TakeCookie(TFragmentWriteCookiePtr cookie)
+{
+ // Verify cookie signature?
+ WriteResults_.reserve(std::ssize(WriteResults_) + std::ssize(cookie->WriteResults_));
+ for (const auto& writeResult : cookie->WriteResults_) {
+ WriteResults_.push_back(writeResult);
+ }
+}
+
+TFuture<void> TDistributedWriteSession::Ping(IClientPtr client)
+{
+ // NB(arkady-e1ppa): AutoAbort = false by default.
+ auto mainTx = client->AttachTransaction(MainTxId_);
+
+ return mainTx->Ping();
+}
+
+void TDistributedWriteSession::Register(TRegistrar registrar)
+{
+ registrar.Parameter("session_id", &TThis::Id_);
+ registrar.Parameter("tx_id", &TThis::MainTxId_);
+ registrar.Parameter("upload_tx_id", &TThis::UploadTxId_);
+
+ registrar.Parameter("root_chunk_list_id", &TThis::RootChunkListId_);
+
+ registrar.Parameter("patch_info", &TThis::PatchInfo_);
+
+ registrar.Parameter("write_results", &TThis::WriteResults_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void* IDistributedTableClientBase::GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session)
+{
+ return static_cast<void*>(&session->WriteResults_);
+}
+
+void IDistributedTableClientBase::RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult)
+{
+ auto* concrete = static_cast<TFragmentWriteResult*>(opaqueWriteResult);
+ YT_ASSERT(concrete);
+ cookie->WriteResults_.push_back(*concrete);
+}
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_session.h b/yt/yt/client/api/distributed_table_session.h
new file mode 100644
index 0000000000..1932284206
--- /dev/null
+++ b/yt/yt/client/api/distributed_table_session.h
@@ -0,0 +1,158 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/ytlib/table_client/table_upload_options.h>
+
+#include <yt/yt/client/chunk_client/public.h>
+
+#include <yt/yt/client/table_client/key.h>
+
+#include <yt/yt/client/ypath/rich.h>
+
+#include <yt/yt/core/ytree/yson_struct.h>
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid);
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Used by distributed table writer to patch
+// its config based on table data.
+// NB(arkady-e1ppa): TableUploadOptions are
+// encoded in RichPath + TableAttributes and thus
+// are not required to be stored directly.
+struct TTableWriterPatchInfo
+ : public NYTree::TYsonStructLite
+{
+public:
+ NObjectClient::TObjectId ObjectId;
+
+ NYPath::TRichYPath RichPath;
+
+ // NB(arkady-e1ppa): Empty writer last key Serialization into Deserialization
+ // somehow results in a []; row which is considered non-empty for some reason.
+ // This matters too little for me to bother saving up the std::optional wrapper.
+ std::optional<NTableClient::TLegacyOwningKey> WriterLastKey;
+ int MaxHeavyColumns;
+
+ NTableClient::TMasterTableSchemaId ChunkSchemaId;
+ NTableClient::TTableSchemaPtr ChunkSchema;
+
+ NYTree::INodePtr TableAttributes;
+
+ NObjectClient::TCellTag ExternalCellTag;
+
+ NTransactionClient::TTimestamp Timestamp;
+
+ TTableWriterPatchInfo(
+ NYPath::TRichYPath richPath,
+ NObjectClient::TObjectId objectId,
+ NObjectClient::TCellTag externalCellTag,
+ NTableClient::TMasterTableSchemaId chunkSchemaId,
+ NTableClient::TTableSchemaPtr chunkSchema,
+ std::optional<NTableClient::TLegacyOwningKey> writerLastKey,
+ int maxHeavyColumns,
+ NTransactionClient::TTimestamp timestamp,
+ const NYTree::IAttributeDictionary& tableAttributes);
+
+ REGISTER_YSON_STRUCT_LITE(TTableWriterPatchInfo)
+
+ static void Register(TRegistrar registrar);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TFragmentWriteResult
+ : public NYTree::TYsonStructLite
+{
+ NTableClient::TLegacyOwningKey MinBoundaryKey;
+ NTableClient::TLegacyOwningKey MaxBoundaryKey;
+ NChunkClient::TChunkListId ChunkListId;
+
+ REGISTER_YSON_STRUCT_LITE(TFragmentWriteResult)
+
+ static void Register(TRegistrar registrar);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TFragmentWriteCookie
+ : public NYTree::TYsonStruct
+{
+public:
+ const TTableWriterPatchInfo& GetPatchInfo() const;
+
+ NCypressClient::TTransactionId GetMainTransactionId() const;
+ NCypressClient::TTransactionId GetUploadTransactionId() const;
+
+private:
+ TDistributedWriteSessionId Id_;
+
+ NCypressClient::TTransactionId MainTxId_;
+ NCypressClient::TTransactionId UploadTxId_;
+
+ TTableWriterPatchInfo PatchInfo_;
+
+ std::vector<TFragmentWriteResult> WriteResults_;
+
+ REGISTER_YSON_STRUCT(TFragmentWriteCookie);
+
+ static void Register(TRegistrar registrar);
+
+ friend class TDistributedWriteSession;
+ friend struct IDistributedTableClientBase;
+};
+
+DEFINE_REFCOUNTED_TYPE(TFragmentWriteCookie);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDistributedWriteSession
+ : public NYTree::TYsonStruct
+{
+public:
+ TDistributedWriteSession(
+ NCypressClient::TTransactionId mainTxId,
+ NCypressClient::TTransactionId uploadTxId,
+ NChunkClient::TChunkListId rootChunkListId,
+ TTableWriterPatchInfo patchInfo);
+
+ NCypressClient::TTransactionId GetMainTransactionId() const;
+ NCypressClient::TTransactionId GetUploadTransactionId() const;
+ const TTableWriterPatchInfo& GetPatchInfo() const Y_LIFETIME_BOUND;
+ NChunkClient::TChunkListId GetRootChunkListId() const;
+
+ TFragmentWriteCookiePtr GiveCookie();
+ void TakeCookie(TFragmentWriteCookiePtr cookie);
+
+ TFuture<void> Ping(IClientPtr client);
+
+ REGISTER_YSON_STRUCT(TDistributedWriteSession);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ TDistributedWriteSessionId Id_;
+
+ NCypressClient::TTransactionId MainTxId_;
+ NCypressClient::TTransactionId UploadTxId_;
+
+ NChunkClient::TChunkListId RootChunkListId_;
+
+ TTableWriterPatchInfo PatchInfo_;
+
+ // This is used to commit changes when session is over.
+ std::vector<TFragmentWriteResult> WriteResults_;
+
+ friend struct IDistributedTableClientBase;
+};
+
+DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_sessions.cpp b/yt/yt/client/api/distributed_table_sessions.cpp
deleted file mode 100644
index b454ad476b..0000000000
--- a/yt/yt/client/api/distributed_table_sessions.cpp
+++ /dev/null
@@ -1,17 +0,0 @@
-#include "distributed_table_sessions.h"
-
-namespace NYT::NApi {
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TDistributedWriteCookie::Register(TRegistrar /*registrar*/)
-{ }
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TDistributedWriteSession::Register(TRegistrar /*registrar*/)
-{ }
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_sessions.h b/yt/yt/client/api/distributed_table_sessions.h
deleted file mode 100644
index 19b3c0f5c0..0000000000
--- a/yt/yt/client/api/distributed_table_sessions.h
+++ /dev/null
@@ -1,41 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/ytree/yson_struct.h>
-
-namespace NYT::NApi {
-
-////////////////////////////////////////////////////////////////////////////////
-
-YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid);
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TDistributedWriteCookie
- : public NYTree::TYsonStruct
-{
-public:
- REGISTER_YSON_STRUCT(TDistributedWriteCookie);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TDistributedWriteCookie);
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TDistributedWriteSession
- : public NYTree::TYsonStruct
-{
-public:
- REGISTER_YSON_STRUCT(TDistributedWriteSession);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NApi
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index be710a04ce..47eea56fb8 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -188,7 +188,8 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest)
DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter)
DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession)
-DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie)
+DECLARE_REFCOUNTED_CLASS(TFragmentWriteCookie)
+struct IDistributedTableClientBase;
DECLARE_REFCOUNTED_STRUCT(TShuffleHandle)
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index 4dcb04c33f..5f1c945e3f 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -208,7 +208,7 @@ public:
// Distributed table client
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartDistributedWriteSession);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishDistributedWriteSession);
- DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable,
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, WriteTableFragment,
.SetStreamingEnabled(true));
// Shuffle service
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index c86a1305af..6ad3c152b4 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -12,7 +12,7 @@
#include "table_writer.h"
#include "transaction.h"
-#include <yt/yt/client/api/distributed_table_sessions.h>
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/file_reader.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 50bf51f247..430b366714 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -8,6 +8,7 @@
#include "timestamp_provider.h"
#include "transaction.h"
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/helpers.h>
#include <yt/yt/client/api/rowset.h>
#include <yt/yt/client/api/transaction.h>
@@ -752,12 +753,14 @@ TFuture<void> TClient::AlterReplicationCard(
return req->Invoke().As<void>();
}
-TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter(
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options)
+TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter(
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options)
{
+ using TRspPtr = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspWriteTableFragment>>;
+
auto proxy = CreateApiServiceProxy();
- auto req = proxy.ParticipantWriteTable();
+ auto req = proxy.WriteTableFragment();
InitStreamingRequest(*req);
FillRequest(req.Get(), cookie, options);
@@ -768,12 +771,15 @@ TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter(
BIND ([=] (const TSharedRef& metaRef) {
NApi::NRpcProxy::NProto::TWriteTableMeta meta;
if (!TryDeserializeProto(&meta, metaRef)) {
- THROW_ERROR_EXCEPTION("Failed to deserialize schema for participant table writer");
+ THROW_ERROR_EXCEPTION("Failed to deserialize schema for fragment table writer");
}
FromProto(schema.Get(), meta.schema());
+ }),
+ BIND([=] (TRspPtr&& rsp) mutable {
+ Deserialize(*cookie, ConvertToNode(TYsonString(rsp->cookie())));
}))
- .Apply(BIND([=] (IAsyncZeroCopyOutputStreamPtr outputStream) {
+ .ApplyUnique(BIND([=] (IAsyncZeroCopyOutputStreamPtr&& outputStream) {
return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema));
})).As<ITableWriterPtr>();
}
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index 9f8ee540a2..16e9400391 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -130,9 +130,9 @@ public:
const TAlterReplicationCardOptions& options = {}) override;
// Distributed table client
- TFuture<ITableWriterPtr> CreateParticipantTableWriter(
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options) override;
+ TFuture<ITableWriterPtr> CreateFragmentTableWriter(
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options) override;
// Queues.
TFuture<NQueueClient::IQueueRowsetPtr> PullQueue(
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index af3b4b7eb6..3a6a3650be 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -1,5 +1,6 @@
#include "helpers.h"
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/rowset.h>
#include <yt/yt/client/api/table_client.h>
@@ -58,6 +59,17 @@ void ToProto(
proto->set_suppress_upstream_sync(options.SuppressUpstreamSync);
}
+void FromProto(
+ NApi::TTransactionalOptions* options,
+ const NProto::TTransactionalOptions& proto)
+{
+ FromProto(&options->TransactionId, proto.transaction_id());
+ options->Ping = proto.ping();
+ options->PingAncestors = proto.ping_ancestors();
+ options->SuppressTransactionCoordinatorSync = proto.suppress_transaction_coordinator_sync();
+ options->SuppressUpstreamSync = proto.suppress_upstream_sync();
+}
+
void ToProto(
NProto::TPrerequisiteOptions* proto,
const NApi::TPrerequisiteOptions& options)
@@ -1946,28 +1958,22 @@ void FillRequest(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options)
{
- Y_UNUSED(req, path, options);
+ ToProto(req->mutable_path(), path);
+
+ if (options.TransactionId) {
+ ToProto(req->mutable_transactional_options(), options);
+ }
}
-void FromProto(
+void ParseRequest(
+ NYPath::TRichYPath* mutablePath,
TDistributedWriteSessionStartOptions* mutableOptions,
const TReqStartDistributedWriteSession& req)
{
- Y_UNUSED(req, mutableOptions);
-}
-
-void FromProto(
- TDistributedWriteSession* mutableSession,
- TRspStartDistributedWriteSession&& rsp)
-{
- Y_UNUSED(mutableSession, rsp);
-}
-
-void ToProto(
- TRspStartDistributedWriteSession* rsp,
- const TDistributedWriteSessionPtr& session)
-{
- Y_UNUSED(rsp, session);
+ *mutablePath = FromProto<NYPath::TRichYPath>(req.path());
+ if (req.has_transactional_options()) {
+ FromProto(mutableOptions, req.transactional_options());
+ }
}
////////////////////////////////////////////////////////////////////////////////
@@ -1977,52 +1983,44 @@ void FillRequest(
TDistributedWriteSessionPtr session,
const TDistributedWriteSessionFinishOptions& options)
{
- Y_UNUSED(req, session, options);
+ req->set_session(ConvertToYsonString(session).ToString());
+ req->set_max_children_per_attach_request(options.MaxChildrenPerAttachRequest);
}
-void FromProto(
+void ParseRequest(
+ TDistributedWriteSessionPtr* mutableSession,
TDistributedWriteSessionFinishOptions* mutableOptions,
const TReqFinishDistributedWriteSession& req)
{
- Y_UNUSED(req, mutableOptions);
-}
-
-void FromProto(
- TDistributedWriteSession* mutableSession,
- const TReqFinishDistributedWriteSession& req)
-{
- Y_UNUSED(mutableSession, req);
+ *mutableSession = ConvertTo<TDistributedWriteSessionPtr>(TYsonString(req.session()));
+ mutableOptions->MaxChildrenPerAttachRequest = req.max_children_per_attach_request();
}
////////////////////////////////////////////////////////////////////////////////
void FillRequest(
- TReqParticipantWriteTable* req,
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options)
+ TReqWriteTableFragment* req,
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options)
{
- Y_UNUSED(req, cookie, options);
-}
+ req->set_cookie(ConvertToYsonString(cookie).ToString());
-void FromProto(
- TParticipantTableWriterOptions* mutableOptions,
- const TReqParticipantWriteTable& req)
-{
- Y_UNUSED(req, mutableOptions);
-}
-
-void FromProto(
- TDistributedWriteCookie* cookie,
- const TReqParticipantWriteTable& req)
-{
- Y_UNUSED(cookie, req);
+ if (options.Config) {
+ req->set_config(ConvertToYsonString(*options.Config).ToString());
+ }
}
-void ToProto(
- TRspParticipantWriteTable* rsp,
- const TDistributedWriteCookiePtr& cookie)
+void ParseRequest(
+ TFragmentWriteCookiePtr* mutableCookie,
+ TFragmentTableWriterOptions* mutableOptions,
+ const TReqWriteTableFragment& req)
{
- Y_UNUSED(rsp, cookie);
+ *mutableCookie = ConvertTo<TFragmentWriteCookiePtr>(TYsonString(req.cookie()));
+ if (req.has_config()) {
+ mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(req.config()));
+ } else {
+ mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(TStringBuf("{}")));
+ }
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h
index ca99986c72..8849b7be44 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.h
+++ b/yt/yt/client/api/rpc_proxy/helpers.h
@@ -30,6 +30,10 @@ void ToProto(
NProto::TTransactionalOptions* proto,
const NApi::TTransactionalOptions& options);
+void FromProto(
+ NApi::TTransactionalOptions* options,
+ const NProto::TTransactionalOptions& proto);
+
void ToProto(
NProto::TPrerequisiteOptions* proto,
const NApi::TPrerequisiteOptions& options);
@@ -287,18 +291,11 @@ void FillRequest(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options);
-void FromProto(
+void ParseRequest(
+ NYPath::TRichYPath* mutablePath,
TDistributedWriteSessionStartOptions* mutableOptions,
const TReqStartDistributedWriteSession& req);
-void FromProto(
- TDistributedWriteSession* mutableSession,
- TRspStartDistributedWriteSession&& rsp);
-
-void ToProto(
- TRspStartDistributedWriteSession* rsp,
- const TDistributedWriteSessionPtr& session);
-
////////////////////////////////////////////////////////////////////////////////
void FillRequest(
@@ -306,32 +303,22 @@ void FillRequest(
TDistributedWriteSessionPtr session,
const TDistributedWriteSessionFinishOptions& options);
-void FromProto(
+void ParseRequest(
+ TDistributedWriteSessionPtr* mutableSession,
TDistributedWriteSessionFinishOptions* mutableOptions,
const TReqFinishDistributedWriteSession& req);
-void FromProto(
- TDistributedWriteSession* mutableSession,
- const TReqFinishDistributedWriteSession& req);
-
////////////////////////////////////////////////////////////////////////////////
void FillRequest(
- TReqParticipantWriteTable* req,
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options);
-
-void FromProto(
- TParticipantTableWriterOptions* mutableOptions,
- const TReqParticipantWriteTable& req);
-
-void FromProto(
- TDistributedWriteCookie* cookie,
- const TReqParticipantWriteTable& req);
-
-void ToProto(
- TRspParticipantWriteTable* rsp,
- const TDistributedWriteCookiePtr& cookie);
+ TReqWriteTableFragment* req,
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options);
+
+void ParseRequest(
+ TFragmentWriteCookiePtr* mutableCookie,
+ TFragmentTableWriterOptions* mutableOptions,
+ const TReqWriteTableFragment& req);
} // namespace NProto
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index e719d41f8c..e52fdf7acb 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -938,7 +938,7 @@ TFuture<void> TTransaction::FinishDistributedWriteSession(
ValidateActive();
return Client_->FinishDistributedWriteSession(
std::move(session),
- PatchTransactionId(options));
+ options);
}
TFuture<void> TTransaction::DoAbort(
diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp
index 5d41f1abc8..7bd88c5279 100644
--- a/yt/yt/client/driver/distributed_table_commands.cpp
+++ b/yt/yt/client/driver/distributed_table_commands.cpp
@@ -2,34 +2,16 @@
#include "config.h"
#include "helpers.h"
-#include <yt/yt/client/api/distributed_table_sessions.h>
-
-// #include <yt/yt/client/api/rowset.h>
-// #include <yt/yt/client/api/skynet.h>
-
-// #include <yt/yt/client/chaos_client/replication_card_serialization.h>
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/formats/config.h>
#include <yt/yt/client/formats/parser.h>
-// #include <yt/yt/client/table_client/adapters.h>
-// #include <yt/yt/client/table_client/blob_reader.h>
-// #include <yt/yt/client/table_client/columnar_statistics.h>
-// #include <yt/yt/client/table_client/row_buffer.h>
-// #include <yt/yt/client/table_client/table_consumer.h>
-// #include <yt/yt/client/table_client/table_output.h>
-// #include <yt/yt/client/table_client/unversioned_writer.h>
-// #include <yt/yt/client/table_client/versioned_writer.h>
-// #include <yt/yt/client/table_client/wire_protocol.h>
-
-// #include <yt/yt/client/tablet_client/table_mount_cache.h>
-
#include <yt/yt/client/ypath/public.h>
#include <yt/yt/library/formats/format.h>
#include <yt/yt/core/concurrency/scheduler_api.h>
-// #include <yt/yt/core/misc/finally.h>
#include <yt/yt/core/ytree/convert.h>
@@ -45,20 +27,6 @@ using namespace NYPath;
////////////////////////////////////////////////////////////////////////////////
-// namespace {
-
-// NLogging::TLogger WithCommandTag(
-// const NLogging::TLogger& logger,
-// const ICommandContextPtr& context)
-// {
-// return logger.WithTag("Command: %v",
-// context->Request().CommandName);
-// }
-
-// } // namespace
-
-////////////////////////////////////////////////////////////////////////////////
-
void TStartDistributedWriteSessionCommand::Register(TRegistrar registrar)
{
registrar.Parameter("path", &TThis::Path);
@@ -88,8 +56,6 @@ void TFinishDistributedWriteSessionCommand::Register(TRegistrar registrar)
// -> Nothing
void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context)
{
- auto transaction = AttachTransaction(context, /*required*/ false);
-
auto session = ConvertTo<TDistributedWriteSessionPtr>(Session);
WaitFor(context->GetClient()->FinishDistributedWriteSession(
@@ -102,35 +68,35 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context
////////////////////////////////////////////////////////////////////////////////
-void TParticipantWriteTableCommand::Execute(ICommandContextPtr context)
+void TWriteTableFragmentCommand::Execute(ICommandContextPtr context)
{
- TTypedCommand<NApi::TParticipantTableWriterOptions>::Execute(std::move(context));
+ TTypedCommand<NApi::TFragmentTableWriterOptions>::Execute(std::move(context));
}
-void TParticipantWriteTableCommand::Register(TRegistrar /*registrar*/)
+void TWriteTableFragmentCommand::Register(TRegistrar /*registrar*/)
{ }
-TFuture<NApi::ITableWriterPtr> TParticipantWriteTableCommand::CreateTableWriter(
+TFuture<NApi::ITableWriterPtr> TWriteTableFragmentCommand::CreateTableWriter(
const ICommandContextPtr& context) const
{
PutMethodInfoInTraceContext("participant_write_table");
return context
->GetClient()
- ->CreateParticipantTableWriter(
- StaticPointerCast<TDistributedWriteCookie>(ResultingCookie),
- TTypedCommand<TParticipantTableWriterOptions>::Options);
+ ->CreateFragmentTableWriter(
+ StaticPointerCast<TFragmentWriteCookie>(ResultingCookie),
+ TTypedCommand<TFragmentTableWriterOptions>::Options);
}
// -> Cookie
-void TParticipantWriteTableCommand::DoExecute(ICommandContextPtr context)
+void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context)
{
- auto cookie = ConvertTo<TDistributedWriteCookiePtr>(Cookie);
+ auto cookie = ConvertTo<TFragmentWriteCookiePtr>(Cookie);
ResultingCookie = StaticPointerCast<TRefCounted>(std::move(cookie));
DoExecuteImpl(context);
ProduceOutput(context, [cookie = std::move(ResultingCookie)] (IYsonConsumer* consumer) {
- Serialize(StaticPointerCast<TDistributedWriteCookie>(cookie), consumer);
+ Serialize(StaticPointerCast<TFragmentWriteCookie>(cookie), consumer);
});
}
diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h
index ea3f7a9b7b..fa3d6bb4e5 100644
--- a/yt/yt/client/driver/distributed_table_commands.h
+++ b/yt/yt/client/driver/distributed_table_commands.h
@@ -45,8 +45,8 @@ private:
////////////////////////////////////////////////////////////////////////////////
// -> Cookie
-class TParticipantWriteTableCommand
- : public TTypedCommand<NApi::TParticipantTableWriterOptions>
+class TWriteTableFragmentCommand
+ : public TTypedCommand<NApi::TFragmentTableWriterOptions>
, private TWriteTableCommand
{
public:
@@ -54,7 +54,7 @@ public:
// ambiguity in dispatch.
void Execute(ICommandContextPtr context) override;
- REGISTER_YSON_STRUCT_LITE(TParticipantWriteTableCommand);
+ REGISTER_YSON_STRUCT_LITE(TWriteTableFragmentCommand);
static void Register(TRegistrar registrar);
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index 4fb7783697..e13634f539 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -396,9 +396,9 @@ public:
REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false);
REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false);
REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false);
- REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false);
- REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false);
- REGISTER_ALL(TParticipantWriteTableCommand, "participant_write_table", Tabular, Structured, true, true );
+ REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false);
+ REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false);
+ REGISTER_ALL(TWriteTableFragmentCommand, "distributed_write_table_partition", Tabular, Structured, true, true );
}
#undef REGISTER
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index a2099a6d0e..f43f76958d 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -486,7 +486,7 @@ public:
UNIMPLEMENTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&));
UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
- UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&));
UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const TString& , int, const TStartShuffleOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, FinishShuffle, (const TShuffleHandlePtr&, const TFinishShuffleOptions&));
UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index 707a6759b8..3fb36f8560 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -111,7 +111,7 @@ public:
UNSUPPORTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const TYPath&, const TJournalWriterOptions&));
UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
- UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&));
+ UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&));
// IClient methods.
// Unsupported methods.
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index e90e6b8e71..43e8056b1e 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -2,7 +2,7 @@
#include <yt/yt/client/api/connection.h>
#include <yt/yt/client/api/client.h>
-#include <yt/yt/client/api/distributed_table_sessions.h>
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
#include <yt/yt/client/api/journal_writer.h>
@@ -841,9 +841,9 @@ public:
const TDistributedWriteSessionFinishOptions& options),
(override));
- MOCK_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options),
+ MOCK_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options),
(override));
MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index 9bb296dbe8..943e4fa201 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -1,6 +1,6 @@
#pragma once
-#include <yt/yt/client/api/distributed_table_sessions.h>
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
#include <yt/yt/client/api/journal_writer.h>
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index a0cc4caf88..9e468fbf8d 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -13,7 +13,7 @@ SRCS(
api/client_cache.cpp
api/delegating_client.cpp
api/delegating_transaction.cpp
- api/distributed_table_sessions.cpp
+ api/distributed_table_session.cpp
api/etc_client.cpp
api/journal_client.cpp
api/operation_client.cpp
diff --git a/yt/yt/core/rpc/stream-inl.h b/yt/yt/core/rpc/stream-inl.h
index 4474158d4e..8f7ddd274b 100644
--- a/yt/yt/core/rpc/stream-inl.h
+++ b/yt/yt/core/rpc/stream-inl.h
@@ -43,11 +43,28 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream
{
auto invokeResult = request->Invoke().template As<void>();
auto metaHandlerResult = request->GetResponseAttachmentsStream()->Read()
- .Apply(metaHandler);
- return metaHandlerResult.Apply(BIND ([=] () {
+ .Apply(std::move(metaHandler));
+ return metaHandlerResult.Apply(BIND ([req = std::move(request), res = std::move(invokeResult)] () mutable {
return NDetail::CreateRpcClientOutputStreamFromInvokedRequest(
- std::move(request),
- std::move(invokeResult));
+ std::move(req),
+ std::move(res));
+ }));
+}
+
+template <class TRequestMessage, class TResponse>
+TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream(
+ TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request,
+ TCallback<void(TSharedRef)> metaHandler,
+ TCallback<void(TIntrusivePtr<TResponse>&&)> rspHandler)
+{
+ auto invokeResult = request->Invoke()
+ .ApplyUnique(std::move(rspHandler));
+ auto metaHandlerResult = request->GetResponseAttachmentsStream()->Read()
+ .Apply(std::move(metaHandler));
+ return metaHandlerResult.Apply(BIND ([req = std::move(request), res = std::move(invokeResult)] () mutable {
+ return NDetail::CreateRpcClientOutputStreamFromInvokedRequest(
+ std::move(req),
+ std::move(res));
}));
}
diff --git a/yt/yt/core/rpc/stream.h b/yt/yt/core/rpc/stream.h
index 93e54155a8..f2220e7425 100644
--- a/yt/yt/core/rpc/stream.h
+++ b/yt/yt/core/rpc/stream.h
@@ -265,6 +265,13 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream
TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request,
TCallback<void(TSharedRef)> metaHandler);
+//! This variant additionally allows non-trivial response of streaming request to be handled.
+template <class TRequestMessage, class TResponse>
+TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream(
+ TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request,
+ TCallback<void(TSharedRef)> metaHandler,
+ TCallback<void(TIntrusivePtr<TResponse>&&)> rspHandler);
+
////////////////////////////////////////////////////////////////////////////////
//! Handles an incoming streaming request that uses the #CreateRpcClientInputStream
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 0c7bc9eddc..9c9e018be9 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -3409,15 +3409,14 @@ message TRspGetQueryTrackerInfo
}
////////////////////////////////////////////////////////////////////////////////
-// NB(arkady-e1ppa): Under construction.
// Distributed table client
////////////////////////////////////////////////////////////////////////////////
message TReqStartDistributedWriteSession
{
- optional string path = 1;
-
- // TDistributedWriteSessionStartOptions contents...
+ required string path = 1;
+
+ optional TTransactionalOptions transactional_options = 100;
}
message TRspStartDistributedWriteSession
@@ -3427,29 +3426,27 @@ message TRspStartDistributedWriteSession
message TReqFinishDistributedWriteSession
{
- required bytes session = 2; // YSON-serialized TDistributedWriteSession
+ required bytes session = 1; // YSON-serialized TDistributedWriteSession
- // TDistributedWriteSessionFinishOptions contents...
+ required int32 max_children_per_attach_request = 2;
}
message TRspFinishDistributedWriteSession
{
}
-message TReqParticipantWriteTable
+message TReqWriteTableFragment
{
optional bytes config = 1; // YSON-serialized TTableWriterConfig
optional bytes format = 2; // YSON-serialized TFormat
- required bytes cookie = 3; // YSON-serialized TDistributedWriteCookie
-
- // TParticipantTableWriterOptions contents...
+ required bytes cookie = 3; // YSON-serialized TFragmentWriteCookie
}
-message TRspParticipantWriteTable
+message TRspWriteTableFragment
{
- required bytes cookie = 1; // YSON-serialized TDistributedWriteCookie
+ required bytes cookie = 1; // YSON-serialized TFragmentWriteCookie
}
///////////////////////////////////////////////////////////////////////////////