aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-12-27 21:15:39 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-12-27 21:32:56 +0300
commit53beb43a8ca2431f7a260c3b26555f2272eb75db (patch)
tree0d3f8f4de24352258ca3071aeb78af0b227bbaf2 /yt
parentb0be24b640ceac2a1d571fcc5a84d043b05ef87a (diff)
downloadydb-53beb43a8ca2431f7a260c3b26555f2272eb75db.tar.gz
YT-22307: Rework distributed write to support signatures
commit_hash:85ce68163df979408ed6b9047a9b2346628ba98d
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/client/api/delegating_client.h12
-rw-r--r--yt/yt/client/api/delegating_transaction.cpp6
-rw-r--r--yt/yt/client/api/delegating_transaction.h4
-rw-r--r--yt/yt/client/api/distributed_table_client.h53
-rw-r--r--yt/yt/client/api/distributed_table_session.cpp168
-rw-r--r--yt/yt/client/api/distributed_table_session.h106
-rw-r--r--yt/yt/client/api/public.h13
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp21
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.h4
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp28
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h6
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp36
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.h12
-rw-r--r--yt/yt/client/api/rpc_proxy/table_writer.cpp48
-rw-r--r--yt/yt/client/api/rpc_proxy/table_writer.h9
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp6
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h4
-rw-r--r--yt/yt/client/api/table_writer.h11
-rw-r--r--yt/yt/client/driver/distributed_table_commands.cpp56
-rw-r--r--yt/yt/client/driver/distributed_table_commands.h9
-rw-r--r--yt/yt/client/driver/driver.cpp2
-rw-r--r--yt/yt/client/driver/table_commands.cpp12
-rw-r--r--yt/yt/client/driver/table_commands.h4
-rw-r--r--yt/yt/client/federated/client.cpp10
-rw-r--r--yt/yt/client/hedging/hedging.cpp6
-rw-r--r--yt/yt/client/signature/signature.h4
-rw-r--r--yt/yt/client/table_client/adapters.cpp42
-rw-r--r--yt/yt/client/table_client/adapters.h3
-rw-r--r--yt/yt/client/table_client/public.h9
-rw-r--r--yt/yt/client/table_client/unversioned_writer.h11
-rw-r--r--yt/yt/client/unittests/mock/client.h10
-rw-r--r--yt/yt/client/unittests/mock/transaction.h4
-rw-r--r--yt/yt/core/rpc/stream.h3
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto13
34 files changed, 425 insertions, 320 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index 62048663c3..ec546404b5 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -846,19 +846,19 @@ public:
(pipelinePath, viewPath, options))
// Distributed client
- DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
+ DELEGATE_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options),
(path, options))
DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, (
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options),
- (std::move(session), options))
+ (sessionWithResults, options))
- DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (
- const TFragmentWriteCookiePtr& cookie,
- const TFragmentTableWriterOptions& options),
+ DELEGATE_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, (
+ const TSignedWriteFragmentCookiePtr& cookie,
+ const TTableFragmentWriterOptions& options),
(cookie, options))
// Shuffle Service
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp
index 8a8ddf96dd..2e12a18cfb 100644
--- a/yt/yt/client/api/delegating_transaction.cpp
+++ b/yt/yt/client/api/delegating_transaction.cpp
@@ -312,15 +312,15 @@ DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
const TPushQueueProducerOptions& options),
(producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options))
-DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
+DELEGATE_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options),
(path, options))
DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, (
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options),
- (std::move(session), options))
+ (sessionWithResults, options))
#undef DELEGATE_METHOD
diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h
index e0263fc799..5823e143fc 100644
--- a/yt/yt/client/api/delegating_transaction.h
+++ b/yt/yt/client/api/delegating_transaction.h
@@ -257,12 +257,12 @@ public:
const TPushQueueProducerOptions& options) override;
// Distributed table client
- TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
+ TFuture<TDistributedWriteSessionWithCookies> StartDistributedWriteSession(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options = {}) override;
TFuture<void> FinishDistributedWriteSession(
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options = {}) override;
protected:
diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h
index 7417c5b620..3cb3640cb0 100644
--- a/yt/yt/client/api/distributed_table_client.h
+++ b/yt/yt/client/api/distributed_table_client.h
@@ -2,24 +2,44 @@
#include "table_client.h"
-#include <yt/yt/client/table_client/config.h>
+#include <yt/yt/client/signature/public.h>
-#include <library/cpp/yt/memory/non_null_ptr.h>
+#include <yt/yt/client/table_client/config.h>
namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
+struct TDistributedWriteSessionWithCookies
+{
+ // TDistributedWriteSession.
+ TSignedDistributedWriteSessionPtr Session;
+ // std::vector<TWriteFragmentCookie>.
+ std::vector<TSignedWriteFragmentCookiePtr> Cookies;
+};
+
+struct TDistributedWriteSessionWithResults
+{
+ // TDistributedWriteSession.
+ TSignedDistributedWriteSessionPtr Session;
+ // std::vector<TWriteFragmentResult>.
+ std::vector<TSignedWriteFragmentResultPtr> Results;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TDistributedWriteSessionStartOptions
: public TTransactionalOptions
-{ };
+{
+ int CookieCount = 0;
+};
struct TDistributedWriteSessionFinishOptions
{
int MaxChildrenPerAttachRequest = 10'000;
};
-struct TFragmentTableWriterOptions
+struct TTableFragmentWriterOptions
: public TTableWriterOptions
{ };
@@ -27,23 +47,15 @@ struct TFragmentTableWriterOptions
struct IDistributedTableClientBase
{
-public:
virtual ~IDistributedTableClientBase() = default;
- virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
+ virtual TFuture<TDistributedWriteSessionWithCookies> StartDistributedWriteSession(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options = {}) = 0;
virtual TFuture<void> FinishDistributedWriteSession(
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options = {}) = 0;
-
- // Helper used to implement FinishDistributedWriteSession efficiently
- // without compromising privacy of session fields.
- // defined in yt/yt/client/api/distributed_table_session.cpp.
- void* GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session);
- // Used in chunk writer for results recording
- void RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult);
};
////////////////////////////////////////////////////////////////////////////////
@@ -52,11 +64,18 @@ struct IDistributedTableClient
{
virtual ~IDistributedTableClient() = default;
- virtual TFuture<ITableWriterPtr> CreateFragmentTableWriter(
- const TFragmentWriteCookiePtr& cookie,
- const TFragmentTableWriterOptions& options = {}) = 0;
+ virtual TFuture<ITableFragmentWriterPtr> CreateTableFragmentWriter(
+ const TSignedWriteFragmentCookiePtr& cookie,
+ const TTableFragmentWriterOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
+// Defined in distributed_table_session.cpp.
+TFuture<void> PingDistributedWriteSession(
+ const TSignedDistributedWriteSessionPtr& session,
+ const IClientPtr& client);
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_session.cpp b/yt/yt/client/api/distributed_table_session.cpp
index d67998b9d2..555c5f6812 100644
--- a/yt/yt/client/api/distributed_table_session.cpp
+++ b/yt/yt/client/api/distributed_table_session.cpp
@@ -1,8 +1,11 @@
#include "distributed_table_session.h"
#include "client.h"
+#include "distributed_table_client.h"
#include "transaction.h"
+#include <yt/yt/client/signature/signature.h>
+
namespace NYT::NApi {
using namespace NYPath;
@@ -15,34 +18,6 @@ using namespace NChunkClient;
////////////////////////////////////////////////////////////////////////////////
-TTableWriterPatchInfo::TTableWriterPatchInfo(
- TRichYPath richPath,
- TObjectId objectId,
- TCellTag externalCellTag,
- TMasterTableSchemaId chunkSchemaId,
- TTableSchemaPtr chunkSchema,
- std::optional<TLegacyOwningKey> writerLastKey,
- int maxHeavyColumns,
- TTimestamp timestamp,
- const IAttributeDictionary& tableAttributes)
- : TTableWriterPatchInfo()
-{
- ObjectId = objectId;
- RichPath = std::move(richPath);
-
- ChunkSchemaId = chunkSchemaId;
- ChunkSchema = std::move(chunkSchema);
-
- WriterLastKey = writerLastKey;
- MaxHeavyColumns = maxHeavyColumns;
-
- TableAttributes = tableAttributes.ToMap();
-
- ExternalCellTag = externalCellTag;
-
- Timestamp = timestamp;
-}
-
void TTableWriterPatchInfo::Register(TRegistrar registrar)
{
registrar.Parameter("table_id", &TThis::ObjectId);
@@ -63,8 +38,10 @@ void TTableWriterPatchInfo::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
-void TFragmentWriteResult::Register(TRegistrar registrar)
+void TWriteFragmentResult::Register(TRegistrar registrar)
{
+ registrar.Parameter("session_id", &TThis::SessionId);
+ registrar.Parameter("cookie_id", &TThis::CookieId);
registrar.Parameter("min_boundary_key", &TThis::MinBoundaryKey);
registrar.Parameter("max_boundary_key", &TThis::MaxBoundaryKey);
registrar.Parameter("chunk_list_id", &TThis::ChunkListId);
@@ -72,126 +49,89 @@ void TFragmentWriteResult::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
-const TTableWriterPatchInfo& TFragmentWriteCookie::GetPatchInfo() const
+void TWriteFragmentCookie::Register(TRegistrar registrar)
{
- return PatchInfo_;
-}
+ registrar.Parameter("session_id", &TThis::SessionId);
+ registrar.Parameter("cookie_id", &TThis::CookieId);
-TTransactionId TFragmentWriteCookie::GetMainTransactionId() const
-{
- return MainTxId_;
-}
+ registrar.Parameter("transaction_id", &TThis::MainTransactionId);
-TTransactionId TFragmentWriteCookie::GetUploadTransactionId() const
-{
- return UploadTxId_;
-}
-
-void TFragmentWriteCookie::Register(TRegistrar registrar)
-{
- registrar.Parameter("session_id", &TThis::Id_);
-
- registrar.Parameter("tx_id", &TThis::MainTxId_);
- registrar.Parameter("upload_tx_id", &TThis::UploadTxId_);
-
- registrar.Parameter("patch_info", &TThis::PatchInfo_);
-
- registrar.Parameter("writer_results", &TThis::WriteResults_);
+ registrar.Parameter("patch_info", &TThis::PatchInfo);
}
////////////////////////////////////////////////////////////////////////////////
TDistributedWriteSession::TDistributedWriteSession(
- TTransactionId mainTxId,
- TTransactionId uploadTxId,
+ TTransactionId mainTransactionId,
+ TTransactionId uploadTransactionId,
TChunkListId rootChunkListId,
- TTableWriterPatchInfo patchInfo)
+ NYPath::TRichYPath richPath,
+ NObjectClient::TObjectId objectId,
+ NObjectClient::TCellTag externalCellTag,
+ NTableClient::TMasterTableSchemaId chunkSchemaId,
+ NTableClient::TTableSchemaPtr chunkSchema,
+ std::optional<NTableClient::TLegacyOwningKey> writerLastKey,
+ int maxHeavyColumns,
+ NTransactionClient::TTimestamp timestamp,
+ const NYTree::IAttributeDictionary& tableAttributes)
: TDistributedWriteSession()
{
- Id_ = TDistributedWriteSessionId(TGuid::Create());
- MainTxId_ = mainTxId;
- UploadTxId_ = uploadTxId;
+ MainTransactionId = mainTransactionId;
+ UploadTransactionId = uploadTransactionId;
- RootChunkListId_ = rootChunkListId;
+ RootChunkListId = rootChunkListId;
- PatchInfo_ = std::move(patchInfo);
+ PatchInfo.ObjectId = objectId;
+ PatchInfo.RichPath = std::move(richPath);
- // Signatures?
-}
+ PatchInfo.ChunkSchemaId = chunkSchemaId;
+ PatchInfo.ChunkSchema = std::move(chunkSchema);
-TTransactionId TDistributedWriteSession::GetMainTransactionId() const
-{
- return MainTxId_;
-}
+ PatchInfo.WriterLastKey = std::move(writerLastKey);
+ PatchInfo.MaxHeavyColumns = maxHeavyColumns;
-TTransactionId TDistributedWriteSession::GetUploadTransactionId() const
-{
- return UploadTxId_;
-}
+ PatchInfo.TableAttributes = tableAttributes.ToMap();
-const TTableWriterPatchInfo& TDistributedWriteSession::GetPatchInfo() const Y_LIFETIME_BOUND
-{
- return PatchInfo_;
-}
+ PatchInfo.ExternalCellTag = externalCellTag;
-TChunkListId TDistributedWriteSession::GetRootChunkListId() const
-{
- return RootChunkListId_;
+ PatchInfo.Timestamp = timestamp;
}
-TFragmentWriteCookiePtr TDistributedWriteSession::GiveCookie()
+TWriteFragmentCookie TDistributedWriteSession::CookieFromThis() const
{
- auto cookie = New<TFragmentWriteCookie>();
- cookie->Id_ = Id_;
- cookie->MainTxId_ = MainTxId_;
- cookie->UploadTxId_ = UploadTxId_;
- cookie->PatchInfo_ = PatchInfo_;
+ auto cookie = TWriteFragmentCookie{};
+ cookie.CookieId = TGuid::Create();
+ cookie.SessionId = RootChunkListId;
+ cookie.MainTransactionId = MainTransactionId;
+ cookie.PatchInfo = PatchInfo;
return cookie;
}
-void TDistributedWriteSession::TakeCookie(TFragmentWriteCookiePtr cookie)
-{
- // Verify cookie signature?
- WriteResults_.reserve(std::ssize(WriteResults_) + std::ssize(cookie->WriteResults_));
- for (const auto& writeResult : cookie->WriteResults_) {
- WriteResults_.push_back(writeResult);
- }
-}
-
-TFuture<void> TDistributedWriteSession::Ping(IClientPtr client)
-{
- // NB(arkady-e1ppa): AutoAbort = false by default.
- auto mainTx = client->AttachTransaction(MainTxId_);
-
- return mainTx->Ping();
-}
-
void TDistributedWriteSession::Register(TRegistrar registrar)
{
- registrar.Parameter("session_id", &TThis::Id_);
- registrar.Parameter("tx_id", &TThis::MainTxId_);
- registrar.Parameter("upload_tx_id", &TThis::UploadTxId_);
+ registrar.Parameter("main_transaction_id", &TThis::MainTransactionId);
+ registrar.Parameter("upload_transaction_id", &TThis::UploadTransactionId);
- registrar.Parameter("root_chunk_list_id", &TThis::RootChunkListId_);
+ registrar.Parameter("root_chunk_list_id", &TThis::RootChunkListId);
- registrar.Parameter("patch_info", &TThis::PatchInfo_);
-
- registrar.Parameter("write_results", &TThis::WriteResults_);
+ registrar.Parameter("patch_info", &TThis::PatchInfo);
}
////////////////////////////////////////////////////////////////////////////////
-void* IDistributedTableClientBase::GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session)
+TFuture<void> PingDistributedWriteSession(
+ const TSignedDistributedWriteSessionPtr& session,
+ const IClientPtr& client)
{
- return static_cast<void*>(&session->WriteResults_);
-}
+ auto concreteSession = ConvertTo<TDistributedWriteSession>(session.Underlying()->Payload());
-void IDistributedTableClientBase::RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult)
-{
- auto* concrete = static_cast<TFragmentWriteResult*>(opaqueWriteResult);
- YT_ASSERT(concrete);
- cookie->WriteResults_.push_back(*concrete);
+ // NB(arkady-e1ppa): AutoAbort = false by default.
+ auto mainTx = client->AttachTransaction(concreteSession.MainTransactionId);
+
+ return mainTx->Ping();
}
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_session.h b/yt/yt/client/api/distributed_table_session.h
index c8c074c84c..136ffcfcd4 100644
--- a/yt/yt/client/api/distributed_table_session.h
+++ b/yt/yt/client/api/distributed_table_session.h
@@ -16,10 +16,6 @@ namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
-YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid);
-
-////////////////////////////////////////////////////////////////////////////////
-
// Used by distributed table writer to patch
// its config based on table data.
// NB(arkady-e1ppa): TableUploadOptions are
@@ -28,7 +24,6 @@ YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid);
struct TTableWriterPatchInfo
: public NYTree::TYsonStructLite
{
-public:
NObjectClient::TObjectId ObjectId;
NYPath::TRichYPath RichPath;
@@ -48,17 +43,6 @@ public:
NTransactionClient::TTimestamp Timestamp;
- TTableWriterPatchInfo(
- NYPath::TRichYPath richPath,
- NObjectClient::TObjectId objectId,
- NObjectClient::TCellTag externalCellTag,
- NTableClient::TMasterTableSchemaId chunkSchemaId,
- NTableClient::TTableSchemaPtr chunkSchema,
- std::optional<NTableClient::TLegacyOwningKey> writerLastKey,
- int maxHeavyColumns,
- NTransactionClient::TTimestamp timestamp,
- const NYTree::IAttributeDictionary& tableAttributes);
-
REGISTER_YSON_STRUCT_LITE(TTableWriterPatchInfo)
static void Register(TRegistrar registrar);
@@ -66,93 +50,73 @@ public:
////////////////////////////////////////////////////////////////////////////////
-struct TFragmentWriteResult
+struct TWriteFragmentResult
: public NYTree::TYsonStructLite
{
+ TGuid SessionId;
+ TGuid CookieId;
+
NTableClient::TLegacyOwningKey MinBoundaryKey;
NTableClient::TLegacyOwningKey MaxBoundaryKey;
NChunkClient::TChunkListId ChunkListId;
- REGISTER_YSON_STRUCT_LITE(TFragmentWriteResult)
+ REGISTER_YSON_STRUCT_LITE(TWriteFragmentResult)
static void Register(TRegistrar registrar);
};
////////////////////////////////////////////////////////////////////////////////
-class TFragmentWriteCookie
- : public NYTree::TYsonStruct
+struct TWriteFragmentCookie
+ : public NYTree::TYsonStructLite
{
-public:
- const TTableWriterPatchInfo& GetPatchInfo() const;
-
- NCypressClient::TTransactionId GetMainTransactionId() const;
- NCypressClient::TTransactionId GetUploadTransactionId() const;
+ TGuid SessionId;
+ TGuid CookieId;
-private:
- TDistributedWriteSessionId Id_;
+ NCypressClient::TTransactionId MainTransactionId;
- NCypressClient::TTransactionId MainTxId_;
- NCypressClient::TTransactionId UploadTxId_;
+ TTableWriterPatchInfo PatchInfo;
- TTableWriterPatchInfo PatchInfo_;
-
- std::vector<TFragmentWriteResult> WriteResults_;
-
- REGISTER_YSON_STRUCT(TFragmentWriteCookie);
+ REGISTER_YSON_STRUCT_LITE(TWriteFragmentCookie);
static void Register(TRegistrar registrar);
-
- friend class TDistributedWriteSession;
- friend struct IDistributedTableClientBase;
};
-DEFINE_REFCOUNTED_TYPE(TFragmentWriteCookie);
-
////////////////////////////////////////////////////////////////////////////////
-class TDistributedWriteSession
- : public NYTree::TYsonStruct
+struct TDistributedWriteSession
+ : public NYTree::TYsonStructLite
{
-public:
- TDistributedWriteSession(
- NCypressClient::TTransactionId mainTxId,
- NCypressClient::TTransactionId uploadTxId,
- NChunkClient::TChunkListId rootChunkListId,
- TTableWriterPatchInfo patchInfo);
-
- NCypressClient::TTransactionId GetMainTransactionId() const;
- NCypressClient::TTransactionId GetUploadTransactionId() const;
- const TTableWriterPatchInfo& GetPatchInfo() const Y_LIFETIME_BOUND;
- NChunkClient::TChunkListId GetRootChunkListId() const;
-
- TFragmentWriteCookiePtr GiveCookie();
- void TakeCookie(TFragmentWriteCookiePtr cookie);
-
- TFuture<void> Ping(IClientPtr client);
- REGISTER_YSON_STRUCT(TDistributedWriteSession);
+ // RootChunkListId is used for indentification.
+ NChunkClient::TChunkListId RootChunkListId;
- static void Register(TRegistrar registrar);
-
-private:
- TDistributedWriteSessionId Id_;
+ NCypressClient::TTransactionId MainTransactionId;
+ NCypressClient::TTransactionId UploadTransactionId;
- NCypressClient::TTransactionId MainTxId_;
- NCypressClient::TTransactionId UploadTxId_;
+ TTableWriterPatchInfo PatchInfo;
- NChunkClient::TChunkListId RootChunkListId_;
+ TDistributedWriteSession(
+ NCypressClient::TTransactionId mainTransactionId,
+ NCypressClient::TTransactionId uploadTransactionId,
+ NChunkClient::TChunkListId rootChunkListId,
+ NYPath::TRichYPath richPath,
+ NObjectClient::TObjectId objectId,
+ NObjectClient::TCellTag externalCellTag,
+ NTableClient::TMasterTableSchemaId chunkSchemaId,
+ NTableClient::TTableSchemaPtr chunkSchema,
+ std::optional<NTableClient::TLegacyOwningKey> writerLastKey,
+ int maxHeavyColumns,
+ NTransactionClient::TTimestamp timestamp,
+ const NYTree::IAttributeDictionary& tableAttributes);
- TTableWriterPatchInfo PatchInfo_;
+ TWriteFragmentCookie CookieFromThis() const;
- // This is used to commit changes when session is over.
- std::vector<TFragmentWriteResult> WriteResults_;
+ REGISTER_YSON_STRUCT_LITE(TDistributedWriteSession);
- friend struct IDistributedTableClientBase;
+ static void Register(TRegistrar registrar);
};
-DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession);
-
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NApi
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index 0e92ead601..a8293e640e 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -146,6 +146,8 @@ DECLARE_REFCOUNTED_STRUCT(IRowBatchWriter)
DECLARE_REFCOUNTED_STRUCT(ITableReader)
DECLARE_REFCOUNTED_STRUCT(ITableWriter)
+DECLARE_REFCOUNTED_STRUCT(ITableFragmentWriter);
+
DECLARE_REFCOUNTED_STRUCT(IFileReader)
DECLARE_REFCOUNTED_STRUCT(IFileWriter)
@@ -187,10 +189,6 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest)
DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter)
-DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession)
-DECLARE_REFCOUNTED_CLASS(TFragmentWriteCookie)
-struct IDistributedTableClientBase;
-
DECLARE_REFCOUNTED_STRUCT(TShuffleHandle)
////////////////////////////////////////////////////////////////////////////////
@@ -240,4 +238,11 @@ using TMaintenanceCountsPerTarget = TCompactFlatMap<std::string, TMaintenanceCou
////////////////////////////////////////////////////////////////////////////////
+using NTableClient::TSignedDistributedWriteSessionPtr;
+using NTableClient::TSignedWriteFragmentCookiePtr;
+using NTableClient::TSignedWriteFragmentResultPtr;
+struct TWriteFragmentCookie;
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NApi
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index 58b2538df6..26062e93ee 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -21,6 +21,8 @@
#include <yt/yt/client/chaos_client/replication_card_serialization.h>
+#include <yt/yt/client/signature/signature.h>
+
#include <yt/yt/client/table_client/name_table.h>
#include <yt/yt/client/table_client/row_base.h>
#include <yt/yt/client/table_client/row_buffer.h>
@@ -792,7 +794,7 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter(
////////////////////////////////////////////////////////////////////////////////
-TFuture<TDistributedWriteSessionPtr> TClientBase::StartDistributedWriteSession(
+TFuture<TDistributedWriteSessionWithCookies> TClientBase::StartDistributedWriteSession(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options)
{
@@ -804,21 +806,28 @@ TFuture<TDistributedWriteSessionPtr> TClientBase::StartDistributedWriteSession(
FillRequest(req.Get(), path, options);
return req->Invoke()
- .ApplyUnique(BIND([] (TRsp&& result) -> TDistributedWriteSessionPtr {
- return ConvertTo<TDistributedWriteSessionPtr>(TYsonString(result->session()));
+ .ApplyUnique(BIND([] (TRsp&& result) -> TDistributedWriteSessionWithCookies {
+ std::vector<TSignedWriteFragmentCookiePtr> cookies;
+ cookies.reserve(result->signed_cookies().size());
+ for (const auto& cookie : result->signed_cookies()) {
+ cookies.push_back(ConvertTo<TSignedWriteFragmentCookiePtr>(TYsonString(cookie)));
+ }
+ return TDistributedWriteSessionWithCookies{
+ .Session = ConvertTo<TSignedDistributedWriteSessionPtr>(TYsonString(result->signed_session())),
+ .Cookies = std::move(cookies),
+ };
}));
}
TFuture<void> TClientBase::FinishDistributedWriteSession(
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options)
{
auto proxy = CreateApiServiceProxy();
auto req = proxy.FinishDistributedWriteSession();
- FillRequest(req.Get(), std::move(session), options);
-
+ FillRequest(req.Get(), sessionWithResults, options);
return req->Invoke().AsVoid();
}
diff --git a/yt/yt/client/api/rpc_proxy/client_base.h b/yt/yt/client/api/rpc_proxy/client_base.h
index 2ccdb1adb4..424752d95e 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.h
+++ b/yt/yt/client/api/rpc_proxy/client_base.h
@@ -185,12 +185,12 @@ public:
const NApi::TTableWriterOptions& options) override;
// Distributed table client
- TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
+ TFuture<TDistributedWriteSessionWithCookies> StartDistributedWriteSession(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options) override;
TFuture<void> FinishDistributedWriteSession(
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options) override;
};
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 3ab8c239d5..5c90b89d18 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -16,6 +16,8 @@
#include <yt/yt/client/scheduler/operation_id_or_alias.h>
+#include <yt/yt/client/signature/signature.h>
+
#include <yt/yt/client/table_client/columnar_statistics.h>
#include <yt/yt/client/table_client/schema.h>
#include <yt/yt/client/table_client/unversioned_row.h>
@@ -742,11 +744,12 @@ TFuture<void> TClient::AlterReplicationCard(
return req->Invoke().As<void>();
}
-TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter(
- const TFragmentWriteCookiePtr& cookie,
- const TFragmentTableWriterOptions& options)
+TFuture<ITableFragmentWriterPtr> TClient::CreateTableFragmentWriter(
+ const TSignedWriteFragmentCookiePtr& cookie,
+ const TTableFragmentWriterOptions& options)
{
using TRspPtr = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspWriteTableFragment>>;
+ YT_VERIFY(cookie);
auto proxy = CreateApiServiceProxy();
auto req = proxy.WriteTableFragment();
@@ -755,22 +758,29 @@ TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter(
FillRequest(req.Get(), cookie, options);
auto schema = New<TTableSchema>();
+ auto promise = NewPromise<TSignedWriteFragmentResultPtr>();
+
+ // NB(arkady-e1ppa): Whenever stream is over, rsp future is set
+ // with the value of write result. We create a channel via promise-future
+ // to transfer this write result to the TableWriter adapter. In order to avoid races
+ // when TableWriter is already closed (and so is stream) but the promise with
+ // write result is not yet set, consider writer closed only after said promise is set.
return CreateRpcClientOutputStream(
std::move(req),
BIND ([=] (const TSharedRef& metaRef) {
NApi::NRpcProxy::NProto::TWriteTableMeta meta;
if (!TryDeserializeProto(&meta, metaRef)) {
- THROW_ERROR_EXCEPTION("Failed to deserialize schema for fragment table writer");
+ THROW_ERROR_EXCEPTION("Failed to deserialize schema for table fragment writer");
}
FromProto(schema.Get(), meta.schema());
}),
- BIND([=] (TRspPtr&& rsp) mutable {
- Deserialize(*cookie, ConvertToNode(TYsonString(rsp->cookie())));
+ BIND([=] (TRspPtr&& rsp) {
+ promise.Set(ConvertTo<TSignedWriteFragmentResultPtr>(TYsonString(rsp->signed_write_result())));
}))
- .ApplyUnique(BIND([=] (IAsyncZeroCopyOutputStreamPtr&& outputStream) {
- return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema));
- })).As<ITableWriterPtr>();
+ .ApplyUnique(BIND([=, future = promise.ToFuture()] (IAsyncZeroCopyOutputStreamPtr&& outputStream) {
+ return NRpcProxy::CreateTableFragmentWriter(std::move(outputStream), std::move(schema), std::move(future));
+ }));
}
TFuture<IQueueRowsetPtr> TClient::PullQueue(
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index 4a5cc37d79..a161cfe88d 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -130,9 +130,9 @@ public:
const TAlterReplicationCardOptions& options = {}) override;
// Distributed table client
- TFuture<ITableWriterPtr> CreateFragmentTableWriter(
- const TFragmentWriteCookiePtr& cookie,
- const TFragmentTableWriterOptions& options) override;
+ TFuture<ITableFragmentWriterPtr> CreateTableFragmentWriter(
+ const TSignedWriteFragmentCookiePtr& cookie,
+ const TTableFragmentWriterOptions& options) override;
// Queues.
TFuture<NQueueClient::IQueueRowsetPtr> PullQueue(
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index 9c558cfdf2..5baa8460ee 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -6,6 +6,8 @@
#include <yt/yt/client/sequoia_client/public.h>
+#include <yt/yt/client/signature/signature.h>
+
#include <yt/yt/client/table_client/columnar_statistics.h>
#include <yt/yt/client/table_client/column_sort_schema.h>
#include <yt/yt/client/table_client/logical_type.h>
@@ -1964,6 +1966,7 @@ void FillRequest(
const TDistributedWriteSessionStartOptions& options)
{
ToProto(req->mutable_path(), path);
+ req->set_cookie_count(options.CookieCount);
if (options.TransactionId) {
ToProto(req->mutable_transactional_options(), options);
@@ -1976,6 +1979,7 @@ void ParseRequest(
const TReqStartDistributedWriteSession& req)
{
*mutablePath = FromProto<NYPath::TRichYPath>(req.path());
+ mutableOptions->CookieCount = req.cookie_count();
if (req.has_transactional_options()) {
FromProto(mutableOptions, req.transactional_options());
}
@@ -1985,19 +1989,31 @@ void ParseRequest(
void FillRequest(
TReqFinishDistributedWriteSession* req,
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options)
{
- req->set_session(ConvertToYsonString(session).ToString());
+ YT_VERIFY(sessionWithResults.Session);
+
+ req->set_signed_session(ConvertToYsonString(sessionWithResults.Session).ToString());
+ for (const auto& writeResult : sessionWithResults.Results) {
+ YT_VERIFY(writeResult);
+ req->add_signed_write_results(ConvertToYsonString(writeResult).ToString());
+ }
req->set_max_children_per_attach_request(options.MaxChildrenPerAttachRequest);
}
void ParseRequest(
- TDistributedWriteSessionPtr* mutableSession,
+ TDistributedWriteSessionWithResults* mutableSessionWithResults,
TDistributedWriteSessionFinishOptions* mutableOptions,
const TReqFinishDistributedWriteSession& req)
{
- *mutableSession = ConvertTo<TDistributedWriteSessionPtr>(TYsonString(req.session()));
+ mutableSessionWithResults->Results.reserve(req.signed_write_results().size());
+ for (const auto& writeResult : req.signed_write_results()) {
+ mutableSessionWithResults->Results.push_back(ConvertTo<TSignedWriteFragmentResultPtr>(TYsonString(writeResult)));
+ }
+
+ mutableSessionWithResults->Session = ConvertTo<TSignedDistributedWriteSessionPtr>(TYsonString(req.signed_session()));
+
mutableOptions->MaxChildrenPerAttachRequest = req.max_children_per_attach_request();
}
@@ -2005,10 +2021,10 @@ void ParseRequest(
void FillRequest(
TReqWriteTableFragment* req,
- const TFragmentWriteCookiePtr& cookie,
- const TFragmentTableWriterOptions& options)
+ const TSignedWriteFragmentCookiePtr& cookie,
+ const TTableFragmentWriterOptions& options)
{
- req->set_cookie(ConvertToYsonString(cookie).ToString());
+ req->set_signed_cookie(ConvertToYsonString(cookie).ToString());
if (options.Config) {
req->set_config(ConvertToYsonString(*options.Config).ToString());
@@ -2016,11 +2032,11 @@ void FillRequest(
}
void ParseRequest(
- TFragmentWriteCookiePtr* mutableCookie,
- TFragmentTableWriterOptions* mutableOptions,
+ TSignedWriteFragmentCookiePtr* mutableCookie,
+ TTableFragmentWriterOptions* mutableOptions,
const TReqWriteTableFragment& req)
{
- *mutableCookie = ConvertTo<TFragmentWriteCookiePtr>(TYsonString(req.cookie()));
+ *mutableCookie = ConvertTo<TSignedWriteFragmentCookiePtr>(TYsonString(req.signed_cookie()));
if (req.has_config()) {
mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(req.config()));
} else {
diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h
index dfe89178cb..9e7f432e62 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.h
+++ b/yt/yt/client/api/rpc_proxy/helpers.h
@@ -296,11 +296,11 @@ void ParseRequest(
void FillRequest(
TReqFinishDistributedWriteSession* req,
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options);
void ParseRequest(
- TDistributedWriteSessionPtr* mutableSession,
+ TDistributedWriteSessionWithResults* mutableSessionWithResults,
TDistributedWriteSessionFinishOptions* mutableOptions,
const TReqFinishDistributedWriteSession& req);
@@ -308,12 +308,12 @@ void ParseRequest(
void FillRequest(
TReqWriteTableFragment* req,
- const TFragmentWriteCookiePtr& cookie,
- const TFragmentTableWriterOptions& options);
+ const TSignedWriteFragmentCookiePtr& cookie,
+ const TTableFragmentWriterOptions& options);
void ParseRequest(
- TFragmentWriteCookiePtr* mutableCookie,
- TFragmentTableWriterOptions* mutableOptions,
+ TSignedWriteFragmentCookiePtr* mutableCookie,
+ TTableFragmentWriterOptions* mutableOptions,
const TReqWriteTableFragment& req);
} // namespace NProto
diff --git a/yt/yt/client/api/rpc_proxy/table_writer.cpp b/yt/yt/client/api/rpc_proxy/table_writer.cpp
index d957c67023..7c258019d5 100644
--- a/yt/yt/client/api/rpc_proxy/table_writer.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_writer.cpp
@@ -42,4 +42,52 @@ ITableWriterPtr CreateTableWriter(
////////////////////////////////////////////////////////////////////////////////
+class TTableFragmentWriter
+ : public TRowBatchWriter
+ , public ITableFragmentWriter
+{
+public:
+ TTableFragmentWriter(
+ IAsyncZeroCopyOutputStreamPtr underlying,
+ TTableSchemaPtr schema,
+ TFuture<TSignedWriteFragmentResultPtr> asyncResult)
+ : TRowBatchWriter(std::move(underlying))
+ , Schema_(std::move(schema))
+ , AsyncResult_(std::move(asyncResult))
+ {
+ }
+
+ const TTableSchemaPtr& GetSchema() const override
+ {
+ return Schema_;
+ }
+
+ TFuture<void> Close() override
+ {
+ return AllSucceeded<void>({TRowBatchWriter::Close(), AsyncResult_.AsVoid()});
+ }
+
+ TSignedWriteFragmentResultPtr GetWriteFragmentResult() const override
+ {
+ YT_VERIFY(AsyncResult_.IsSet());
+ auto resultOrError = AsyncResult_.Get();
+ YT_VERIFY(resultOrError.IsOK());
+ return resultOrError.Value();
+ }
+
+private:
+ const TTableSchemaPtr Schema_;
+ const TFuture<TSignedWriteFragmentResultPtr> AsyncResult_;
+};
+
+ITableFragmentWriterPtr CreateTableFragmentWriter(
+ NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream,
+ TTableSchemaPtr tableSchema,
+ TFuture<TSignedWriteFragmentResultPtr> asyncResult)
+{
+ return New<TTableFragmentWriter>(std::move(outputStream), std::move(tableSchema), std::move(asyncResult));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NApi::NRpcProxy
diff --git a/yt/yt/client/api/rpc_proxy/table_writer.h b/yt/yt/client/api/rpc_proxy/table_writer.h
index 21e20c4ba4..4d8348f989 100644
--- a/yt/yt/client/api/rpc_proxy/table_writer.h
+++ b/yt/yt/client/api/rpc_proxy/table_writer.h
@@ -12,6 +12,15 @@ ITableWriterPtr CreateTableWriter(
NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream,
NTableClient::TTableSchemaPtr tableSchema);
+// NB(arkady-e1ppa): Promise is expected to be set
+// whenever outputStream is closed.
+// TODO(arkady-e1ppa): Introduce a separate output stream
+// which has this logic built in to make this code less fragile.
+ITableFragmentWriterPtr CreateTableFragmentWriter(
+ NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream,
+ NTableClient::TTableSchemaPtr tableSchema,
+ TFuture<NTableClient::TSignedWriteFragmentResultPtr> asyncResult);
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NApi::NRpcProxy
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index e52fdf7acb..89da586fdf 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -921,7 +921,7 @@ IJournalWriterPtr TTransaction::CreateJournalWriter(
PatchTransactionId(options));
}
-TFuture<TDistributedWriteSessionPtr> TTransaction::StartDistributedWriteSession(
+TFuture<TDistributedWriteSessionWithCookies> TTransaction::StartDistributedWriteSession(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options)
{
@@ -932,12 +932,12 @@ TFuture<TDistributedWriteSessionPtr> TTransaction::StartDistributedWriteSession(
}
TFuture<void> TTransaction::FinishDistributedWriteSession(
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options)
{
ValidateActive();
return Client_->FinishDistributedWriteSession(
- std::move(session),
+ sessionWithResults,
options);
}
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index 3faa58d708..20ac689bcb 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -235,12 +235,12 @@ public:
const NYPath::TYPath& path,
const NApi::TJournalWriterOptions& options) override;
- TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
+ TFuture<TDistributedWriteSessionWithCookies> StartDistributedWriteSession(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options = {}) override;
TFuture<void> FinishDistributedWriteSession(
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options = {}) override;
// Custom methods.
diff --git a/yt/yt/client/api/table_writer.h b/yt/yt/client/api/table_writer.h
index f4d42c3c78..4ac0fa0fd7 100644
--- a/yt/yt/client/api/table_writer.h
+++ b/yt/yt/client/api/table_writer.h
@@ -18,4 +18,15 @@ DEFINE_REFCOUNTED_TYPE(ITableWriter)
////////////////////////////////////////////////////////////////////////////////
+struct ITableFragmentWriter
+ : public ITableWriter
+{
+ //! Returns signed write result. Only safe to use after |Close|.
+ virtual TSignedWriteFragmentResultPtr GetWriteFragmentResult() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(ITableFragmentWriter)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NApi
diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp
index 7bd88c5279..bfb81c20c3 100644
--- a/yt/yt/client/driver/distributed_table_commands.cpp
+++ b/yt/yt/client/driver/distributed_table_commands.cpp
@@ -3,10 +3,13 @@
#include "helpers.h"
#include <yt/yt/client/api/distributed_table_session.h>
+#include <yt/yt/client/api/table_writer.h>
#include <yt/yt/client/formats/config.h>
#include <yt/yt/client/formats/parser.h>
+#include <yt/yt/client/signature/signature.h>
+
#include <yt/yt/client/ypath/public.h>
#include <yt/yt/library/formats/format.h>
@@ -37,12 +40,12 @@ void TStartDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context)
{
auto transaction = AttachTransaction(context, /*required*/ false);
- auto session = WaitFor(context->GetClient()->StartDistributedWriteSession(
+ auto sessionAndCookies = WaitFor(context->GetClient()->StartDistributedWriteSession(
Path,
Options));
- ProduceOutput(context, [session = std::move(session)] (IYsonConsumer* consumer) {
- Serialize(session, consumer);
+ ProduceOutput(context, [sessionAndCookies = std::move(sessionAndCookies)] (IYsonConsumer* consumer) {
+ Serialize(sessionAndCookies, consumer);
});
}
@@ -51,15 +54,20 @@ void TStartDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context)
void TFinishDistributedWriteSessionCommand::Register(TRegistrar registrar)
{
registrar.Parameter("session", &TThis::Session);
+ registrar.Parameter("results", &TThis::Results);
}
// -> Nothing
void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context)
{
- auto session = ConvertTo<TDistributedWriteSessionPtr>(Session);
+ auto session = ConvertTo<TSignedDistributedWriteSessionPtr>(Session);
+ auto results = ConvertTo<std::vector<NTableClient::TSignedWriteFragmentResultPtr>>(Results);
WaitFor(context->GetClient()->FinishDistributedWriteSession(
- std::move(session),
+ TDistributedWriteSessionWithResults{
+ .Session = std::move(session),
+ .Results = std::move(results),
+ },
Options))
.ThrowOnError();
@@ -70,33 +78,43 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context
void TWriteTableFragmentCommand::Execute(ICommandContextPtr context)
{
- TTypedCommand<NApi::TFragmentTableWriterOptions>::Execute(std::move(context));
+ TTypedCommand<NApi::TTableFragmentWriterOptions>::Execute(std::move(context));
}
-void TWriteTableFragmentCommand::Register(TRegistrar /*registrar*/)
-{ }
+void TWriteTableFragmentCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("cookie", &TThis::Cookie);
+}
-TFuture<NApi::ITableWriterPtr> TWriteTableFragmentCommand::CreateTableWriter(
- const ICommandContextPtr& context) const
+NApi::ITableWriterPtr TWriteTableFragmentCommand::CreateTableWriter(
+ const ICommandContextPtr& context)
{
- PutMethodInfoInTraceContext("participant_write_table");
+ PutMethodInfoInTraceContext("write_table_fragment");
- return context
+ auto tableWriter = WaitFor(context
->GetClient()
- ->CreateFragmentTableWriter(
- StaticPointerCast<TFragmentWriteCookie>(ResultingCookie),
- TTypedCommand<TFragmentTableWriterOptions>::Options);
+ ->CreateTableFragmentWriter(
+ ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie),
+ TTypedCommand<TTableFragmentWriterOptions>::Options))
+ .ValueOrThrow();
+
+ TableWriter = tableWriter;
+ return tableWriter;
}
// -> Cookie
void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context)
{
- auto cookie = ConvertTo<TFragmentWriteCookiePtr>(Cookie);
- ResultingCookie = StaticPointerCast<TRefCounted>(std::move(cookie));
+ auto cookie = ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie);
DoExecuteImpl(context);
- ProduceOutput(context, [cookie = std::move(ResultingCookie)] (IYsonConsumer* consumer) {
- Serialize(StaticPointerCast<TFragmentWriteCookie>(cookie), consumer);
+
+ // Sadly, we are plagued by virtual bases :/.
+ auto writer = DynamicPointerCast<NApi::ITableFragmentWriter>(TableWriter);
+ ProduceOutput(context, [result = writer->GetWriteFragmentResult()] (IYsonConsumer* consumer) {
+ Serialize(
+ *result.Underlying(),
+ consumer);
});
}
diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h
index fa3d6bb4e5..57fcfd7c6d 100644
--- a/yt/yt/client/driver/distributed_table_commands.h
+++ b/yt/yt/client/driver/distributed_table_commands.h
@@ -38,6 +38,7 @@ public:
private:
NYTree::INodePtr Session;
+ std::vector<NYTree::INodePtr> Results;
void DoExecute(ICommandContextPtr context) override;
};
@@ -46,7 +47,7 @@ private:
// -> Cookie
class TWriteTableFragmentCommand
- : public TTypedCommand<NApi::TFragmentTableWriterOptions>
+ : public TTypedCommand<NApi::TTableFragmentWriterOptions>
, private TWriteTableCommand
{
public:
@@ -62,10 +63,10 @@ private:
using TBase = TWriteTableCommand;
NYTree::INodePtr Cookie;
- TRefCountedPtr ResultingCookie;
+ TRefCountedPtr TableWriter;
- TFuture<NApi::ITableWriterPtr> CreateTableWriter(
- const ICommandContextPtr& context) const override;
+ NApi::ITableWriterPtr CreateTableWriter(
+ const ICommandContextPtr& context) override;
void DoExecute(ICommandContextPtr context) override;
};
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index 32568fdc3c..9be3e37f3e 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -397,6 +397,8 @@ public:
REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false);
REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false);
REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false);
+
+ // TODO(arkady-e1ppa): flags past command name might be complete rubbish -- think them through later.
REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false);
REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false);
REGISTER_ALL(TWriteTableFragmentCommand, "distributed_write_table_partition", Tabular, Structured, true, true );
diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp
index 3ae3e78a23..e358d2f8ee 100644
--- a/yt/yt/client/driver/table_commands.cpp
+++ b/yt/yt/client/driver/table_commands.cpp
@@ -267,14 +267,15 @@ void TWriteTableCommand::Register(TRegistrar registrar)
.Default(1_MB);
}
-TFuture<ITableWriterPtr> TWriteTableCommand::CreateTableWriter(
- const ICommandContextPtr& context) const
+NApi::ITableWriterPtr TWriteTableCommand::CreateTableWriter(
+ const ICommandContextPtr& context)
{
PutMethodInfoInTraceContext("write_table");
- return context->GetClient()->CreateTableWriter(
+ return WaitFor(context->GetClient()->CreateTableWriter(
Path,
- Options);
+ Options))
+ .ValueOrThrow();
}
void TWriteTableCommand::DoExecuteImpl(const ICommandContextPtr& context)
@@ -289,8 +290,7 @@ void TWriteTableCommand::DoExecuteImpl(const ICommandContextPtr& context)
Options.PingAncestors = true;
Options.Config = config;
- auto apiWriter = WaitFor(CreateTableWriter(context))
- .ValueOrThrow();
+ auto apiWriter = CreateTableWriter(context);
auto schemalessWriter = CreateSchemalessFromApiWriterAdapter(std::move(apiWriter));
diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h
index d511e76622..97f52e44fd 100644
--- a/yt/yt/client/driver/table_commands.h
+++ b/yt/yt/client/driver/table_commands.h
@@ -83,8 +83,8 @@ public:
static void Register(TRegistrar registrar);
protected:
- virtual TFuture<NApi::ITableWriterPtr> CreateTableWriter(
- const ICommandContextPtr& context) const;
+ virtual NApi::ITableWriterPtr CreateTableWriter(
+ const ICommandContextPtr& context);
void DoExecuteImpl(const ICommandContextPtr& context);
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 170dedf910..86555b39d6 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -236,8 +236,8 @@ public:
UNIMPLEMENTED_METHOD(IFileWriterPtr, CreateFileWriter, (const NYPath::TRichYPath&, const TFileWriterOptions&));
UNIMPLEMENTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const NYPath::TYPath&, const TJournalReaderOptions&));
UNIMPLEMENTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const NYPath::TYPath&, const TJournalWriterOptions&));
- UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
- UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (const TDistributedWriteSessionWithResults&, const TDistributedWriteSessionFinishOptions&));
private:
const TClientPtr Client_;
@@ -484,9 +484,9 @@ public:
UNIMPLEMENTED_METHOD(TFuture<void>, PausePipeline, (const NYPath::TYPath&, const TPausePipelineOptions&));
UNIMPLEMENTED_METHOD(TFuture<TPipelineState>, GetPipelineState, (const NYPath::TYPath&, const TGetPipelineStateOptions&));
UNIMPLEMENTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&));
- UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
- UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
- UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (const TDistributedWriteSessionWithResults&, const TDistributedWriteSessionFinishOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, (const TSignedWriteFragmentCookiePtr&, const TTableFragmentWriterOptions&));
UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const std::string& , int, NObjectClient::TTransactionId, const TStartShuffleOptions&));
UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&));
UNIMPLEMENTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TShuffleHandlePtr&, const std::string&, const NTableClient::TTableWriterConfigPtr&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index ac96c64e39..e5bdd74b22 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -109,9 +109,9 @@ public:
UNSUPPORTED_METHOD(IFileWriterPtr, CreateFileWriter, (const TRichYPath&, const TFileWriterOptions&));
UNSUPPORTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const TYPath&, const TJournalReaderOptions&));
UNSUPPORTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const TYPath&, const TJournalWriterOptions&));
- UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
- UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
- UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&));
+ UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
+ UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (const TDistributedWriteSessionWithResults&, const TDistributedWriteSessionFinishOptions&));
+ UNSUPPORTED_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, (const TSignedWriteFragmentCookiePtr&, const TTableFragmentWriterOptions&));
// IClient methods.
// Unsupported methods.
diff --git a/yt/yt/client/signature/signature.h b/yt/yt/client/signature/signature.h
index b163ecc0e9..6cb739bd3c 100644
--- a/yt/yt/client/signature/signature.h
+++ b/yt/yt/client/signature/signature.h
@@ -26,7 +26,9 @@ public:
[[nodiscard]] const NYson::TYsonString& Payload() const;
private:
- NYson::TYsonString Header_;
+ // TODO(arkady-e1ppa): Whenever trivial generator/validators are added
+ // remove initialization.
+ NYson::TYsonString Header_ = NYson::TYsonString(TStringBuf(""));
NYson::TYsonString Payload_;
std::vector<std::byte> Signature_;
diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp
index 648a208ea4..f17c1dcdac 100644
--- a/yt/yt/client/table_client/adapters.cpp
+++ b/yt/yt/client/table_client/adapters.cpp
@@ -25,47 +25,69 @@ static constexpr auto& Logger = TableClientLogger;
////////////////////////////////////////////////////////////////////////////////
+template <std::derived_from<ITableWriter> TInterface, std::derived_from<IUnversionedWriter> TUnderlyingInterface>
class TApiFromSchemalessWriterAdapter
- : public ITableWriter
+ : public TInterface
{
public:
- explicit TApiFromSchemalessWriterAdapter(IUnversionedWriterPtr underlyingWriter)
+ explicit TApiFromSchemalessWriterAdapter(TIntrusivePtr<TUnderlyingInterface> underlyingWriter)
: UnderlyingWriter_(std::move(underlyingWriter))
{ }
- bool Write(TRange<TUnversionedRow> rows) override
+ bool Write(TRange<TUnversionedRow> rows) /*override*/
{
return UnderlyingWriter_->Write(rows);
}
- TFuture<void> GetReadyEvent() override
+ TFuture<void> GetReadyEvent() /*override*/
{
return UnderlyingWriter_->GetReadyEvent();
}
- TFuture<void> Close() override
+ TFuture<void> Close() /*override*/
{
return UnderlyingWriter_->Close();
}
- const TNameTablePtr& GetNameTable() const override
+ const TNameTablePtr& GetNameTable() const /*override*/
{
return UnderlyingWriter_->GetNameTable();
}
- const TTableSchemaPtr& GetSchema() const override
+ const TTableSchemaPtr& GetSchema() const /*override*/
{
return UnderlyingWriter_->GetSchema();
}
-private:
- const IUnversionedWriterPtr UnderlyingWriter_;
+protected:
+ const TIntrusivePtr<TUnderlyingInterface> UnderlyingWriter_;
};
ITableWriterPtr CreateApiFromSchemalessWriterAdapter(
IUnversionedWriterPtr underlyingWriter)
{
- return New<TApiFromSchemalessWriterAdapter>(std::move(underlyingWriter));
+ return New<TApiFromSchemalessWriterAdapter<ITableWriter, IUnversionedWriter>>(std::move(underlyingWriter));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TApiFromSchemalessTableFragmentWriterAdapter
+ : public TApiFromSchemalessWriterAdapter<ITableFragmentWriter, IUnversionedTableFragmentWriter>
+{
+public:
+ using TBase = TApiFromSchemalessWriterAdapter<ITableFragmentWriter, IUnversionedTableFragmentWriter>;
+ using TBase::TBase;
+
+ TSignedWriteFragmentResultPtr GetWriteFragmentResult() const /*override*/
+ {
+ return TBase::UnderlyingWriter_->GetWriteFragmentResult();
+ }
+};
+
+ITableFragmentWriterPtr CreateApiFromSchemalessWriterAdapter(
+ IUnversionedTableFragmentWriterPtr underlyingWriter)
+{
+ return New<TApiFromSchemalessTableFragmentWriterAdapter>(std::move(underlyingWriter));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h
index f90cbe101d..f4b058bf23 100644
--- a/yt/yt/client/table_client/adapters.h
+++ b/yt/yt/client/table_client/adapters.h
@@ -21,6 +21,9 @@ IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter(
NApi::ITableWriterPtr CreateApiFromSchemalessWriterAdapter(
IUnversionedWriterPtr underlyingWriter);
+NApi::ITableFragmentWriterPtr CreateApiFromSchemalessWriterAdapter(
+ IUnversionedTableFragmentWriterPtr underlyingWriter);
+
////////////////////////////////////////////////////////////////////////////////
struct TPipeReaderToWriterOptions
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h
index b577fb929d..c397838e60 100644
--- a/yt/yt/client/table_client/public.h
+++ b/yt/yt/client/table_client/public.h
@@ -4,6 +4,8 @@
#include <yt/yt/client/cypress_client/public.h>
+#include <yt/yt/client/signature/public.h>
+
#include <yt/yt/client/tablet_client/public.h>
#include <yt/yt/client/transaction_client/public.h>
@@ -343,6 +345,7 @@ DECLARE_REFCOUNTED_CLASS(TRowBuffer)
DECLARE_REFCOUNTED_STRUCT(ISchemalessUnversionedReader)
DECLARE_REFCOUNTED_STRUCT(ISchemafulUnversionedReader)
DECLARE_REFCOUNTED_STRUCT(IUnversionedWriter)
+DECLARE_REFCOUNTED_STRUCT(IUnversionedTableFragmentWriter)
DECLARE_REFCOUNTED_STRUCT(IUnversionedRowsetWriter)
using TSchemalessWriterFactory = std::function<IUnversionedRowsetWriterPtr(
@@ -454,4 +457,10 @@ struct TVersionedWriteOptions;
////////////////////////////////////////////////////////////////////////////////
+YT_DEFINE_STRONG_TYPEDEF(TSignedDistributedWriteSessionPtr, NSignature::TSignaturePtr);
+YT_DEFINE_STRONG_TYPEDEF(TSignedWriteFragmentCookiePtr, NSignature::TSignaturePtr);
+YT_DEFINE_STRONG_TYPEDEF(TSignedWriteFragmentResultPtr, NSignature::TSignaturePtr);
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/unversioned_writer.h b/yt/yt/client/table_client/unversioned_writer.h
index b8ba78d621..22814d3b0b 100644
--- a/yt/yt/client/table_client/unversioned_writer.h
+++ b/yt/yt/client/table_client/unversioned_writer.h
@@ -62,4 +62,15 @@ DEFINE_REFCOUNTED_TYPE(IUnversionedWriter)
////////////////////////////////////////////////////////////////////////////////
+struct IUnversionedTableFragmentWriter
+ : public IUnversionedWriter
+{
+ //! Returns signed write result. Only safe to use after |Stop|.
+ virtual TSignedWriteFragmentResultPtr GetWriteFragmentResult() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IUnversionedTableFragmentWriter)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NTableClient
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index 31c76a9071..e29d989ceb 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -831,19 +831,19 @@ public:
const TGetFlowViewOptions& options),
(override));
- MOCK_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
+ MOCK_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options),
(override));
MOCK_METHOD(TFuture<void>, FinishDistributedWriteSession, (
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options),
(override));
- MOCK_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (
- const TFragmentWriteCookiePtr& cookie,
- const TFragmentTableWriterOptions& options),
+ MOCK_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, (
+ const TSignedWriteFragmentCookiePtr& cookie,
+ const TTableFragmentWriterOptions& options),
(override));
MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index 943e4fa201..713bf0b443 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -151,13 +151,13 @@ public:
const NYPath::TYPath& path,
const TJournalWriterOptions& options), (override));
- MOCK_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
+ MOCK_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options),
(override));
MOCK_METHOD(TFuture<void>, FinishDistributedWriteSession, (
- TDistributedWriteSessionPtr session,
+ const TDistributedWriteSessionWithResults& sessionWithResults,
const TDistributedWriteSessionFinishOptions& options),
(override));
diff --git a/yt/yt/core/rpc/stream.h b/yt/yt/core/rpc/stream.h
index f2220e7425..c7d765c88e 100644
--- a/yt/yt/core/rpc/stream.h
+++ b/yt/yt/core/rpc/stream.h
@@ -266,6 +266,9 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream
TCallback<void(TSharedRef)> metaHandler);
//! This variant additionally allows non-trivial response of streaming request to be handled.
+//! TODO(arkady-e1ppa): Introduce IAsyncZeroCopyOutputStream<TRet> which |Close| returns
+//! TFuture<TRet> instead of TFuture<void> as a way to transfer data via rsp
+//! use it here.
template <class TRequestMessage, class TResponse>
TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream(
TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request,
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 723a63b639..457a28b0f5 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -3426,20 +3426,23 @@ message TRspGetQueryTrackerInfo
message TReqStartDistributedWriteSession
{
required string path = 1;
+ required int32 cookie_count = 2;
optional TTransactionalOptions transactional_options = 100;
}
message TRspStartDistributedWriteSession
{
- required bytes session = 1; // YSON-serialized TDistributedWriteSession
+ required bytes signed_session = 1; // YSON-serialized TSignedDistributedWriteSessionPtr
+ repeated bytes signed_cookies = 2; // vector of YSON-serialized TSignedDistributedWriteCookiePtr
}
message TReqFinishDistributedWriteSession
{
- required bytes session = 1; // YSON-serialized TDistributedWriteSession
+ required bytes signed_session = 1; // YSON-serialized TSignedDistributedWriteSessionPtr
+ repeated bytes signed_write_results = 2; // vector of YSON-serialized TSignedWriteFragmentResultPtr
- required int32 max_children_per_attach_request = 2;
+ required int32 max_children_per_attach_request = 3;
}
message TRspFinishDistributedWriteSession
@@ -3452,12 +3455,12 @@ message TReqWriteTableFragment
optional bytes format = 2; // YSON-serialized TFormat
- required bytes cookie = 3; // YSON-serialized TFragmentWriteCookie
+ required bytes signed_cookie = 3; // YSON-serialized TSignedWriteFragmentCookiePtr
}
message TRspWriteTableFragment
{
- required bytes cookie = 1; // YSON-serialized TFragmentWriteCookie
+ required bytes signed_write_result = 1; // YSON-serialized TSignedWriteFragmentResultPtr
}
///////////////////////////////////////////////////////////////////////////////