diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-12-27 21:15:39 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-12-27 21:32:56 +0300 |
commit | 53beb43a8ca2431f7a260c3b26555f2272eb75db (patch) | |
tree | 0d3f8f4de24352258ca3071aeb78af0b227bbaf2 /yt | |
parent | b0be24b640ceac2a1d571fcc5a84d043b05ef87a (diff) | |
download | ydb-53beb43a8ca2431f7a260c3b26555f2272eb75db.tar.gz |
YT-22307: Rework distributed write to support signatures
commit_hash:85ce68163df979408ed6b9047a9b2346628ba98d
Diffstat (limited to 'yt')
34 files changed, 425 insertions, 320 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index 62048663c3..ec546404b5 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -846,19 +846,19 @@ public: (pipelinePath, viewPath, options)) // Distributed client - DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, ( + DELEGATE_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, ( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options), (path, options)) DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, ( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options), - (std::move(session), options)) + (sessionWithResults, options)) - DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, ( - const TFragmentWriteCookiePtr& cookie, - const TFragmentTableWriterOptions& options), + DELEGATE_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, ( + const TSignedWriteFragmentCookiePtr& cookie, + const TTableFragmentWriterOptions& options), (cookie, options)) // Shuffle Service diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp index 8a8ddf96dd..2e12a18cfb 100644 --- a/yt/yt/client/api/delegating_transaction.cpp +++ b/yt/yt/client/api/delegating_transaction.cpp @@ -312,15 +312,15 @@ DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( const TPushQueueProducerOptions& options), (producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options)) -DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, ( +DELEGATE_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, ( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options), (path, options)) DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, ( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options), - (std::move(session), options)) + (sessionWithResults, options)) #undef DELEGATE_METHOD diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h index e0263fc799..5823e143fc 100644 --- a/yt/yt/client/api/delegating_transaction.h +++ b/yt/yt/client/api/delegating_transaction.h @@ -257,12 +257,12 @@ public: const TPushQueueProducerOptions& options) override; // Distributed table client - TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( + TFuture<TDistributedWriteSessionWithCookies> StartDistributedWriteSession( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options = {}) override; TFuture<void> FinishDistributedWriteSession( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options = {}) override; protected: diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h index 7417c5b620..3cb3640cb0 100644 --- a/yt/yt/client/api/distributed_table_client.h +++ b/yt/yt/client/api/distributed_table_client.h @@ -2,24 +2,44 @@ #include "table_client.h" -#include <yt/yt/client/table_client/config.h> +#include <yt/yt/client/signature/public.h> -#include <library/cpp/yt/memory/non_null_ptr.h> +#include <yt/yt/client/table_client/config.h> namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// +struct TDistributedWriteSessionWithCookies +{ + // TDistributedWriteSession. + TSignedDistributedWriteSessionPtr Session; + // std::vector<TWriteFragmentCookie>. + std::vector<TSignedWriteFragmentCookiePtr> Cookies; +}; + +struct TDistributedWriteSessionWithResults +{ + // TDistributedWriteSession. + TSignedDistributedWriteSessionPtr Session; + // std::vector<TWriteFragmentResult>. + std::vector<TSignedWriteFragmentResultPtr> Results; +}; + +//////////////////////////////////////////////////////////////////////////////// + struct TDistributedWriteSessionStartOptions : public TTransactionalOptions -{ }; +{ + int CookieCount = 0; +}; struct TDistributedWriteSessionFinishOptions { int MaxChildrenPerAttachRequest = 10'000; }; -struct TFragmentTableWriterOptions +struct TTableFragmentWriterOptions : public TTableWriterOptions { }; @@ -27,23 +47,15 @@ struct TFragmentTableWriterOptions struct IDistributedTableClientBase { -public: virtual ~IDistributedTableClientBase() = default; - virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( + virtual TFuture<TDistributedWriteSessionWithCookies> StartDistributedWriteSession( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options = {}) = 0; virtual TFuture<void> FinishDistributedWriteSession( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options = {}) = 0; - - // Helper used to implement FinishDistributedWriteSession efficiently - // without compromising privacy of session fields. - // defined in yt/yt/client/api/distributed_table_session.cpp. - void* GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session); - // Used in chunk writer for results recording - void RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult); }; //////////////////////////////////////////////////////////////////////////////// @@ -52,11 +64,18 @@ struct IDistributedTableClient { virtual ~IDistributedTableClient() = default; - virtual TFuture<ITableWriterPtr> CreateFragmentTableWriter( - const TFragmentWriteCookiePtr& cookie, - const TFragmentTableWriterOptions& options = {}) = 0; + virtual TFuture<ITableFragmentWriterPtr> CreateTableFragmentWriter( + const TSignedWriteFragmentCookiePtr& cookie, + const TTableFragmentWriterOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// +// Defined in distributed_table_session.cpp. +TFuture<void> PingDistributedWriteSession( + const TSignedDistributedWriteSessionPtr& session, + const IClientPtr& client); + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_session.cpp b/yt/yt/client/api/distributed_table_session.cpp index d67998b9d2..555c5f6812 100644 --- a/yt/yt/client/api/distributed_table_session.cpp +++ b/yt/yt/client/api/distributed_table_session.cpp @@ -1,8 +1,11 @@ #include "distributed_table_session.h" #include "client.h" +#include "distributed_table_client.h" #include "transaction.h" +#include <yt/yt/client/signature/signature.h> + namespace NYT::NApi { using namespace NYPath; @@ -15,34 +18,6 @@ using namespace NChunkClient; //////////////////////////////////////////////////////////////////////////////// -TTableWriterPatchInfo::TTableWriterPatchInfo( - TRichYPath richPath, - TObjectId objectId, - TCellTag externalCellTag, - TMasterTableSchemaId chunkSchemaId, - TTableSchemaPtr chunkSchema, - std::optional<TLegacyOwningKey> writerLastKey, - int maxHeavyColumns, - TTimestamp timestamp, - const IAttributeDictionary& tableAttributes) - : TTableWriterPatchInfo() -{ - ObjectId = objectId; - RichPath = std::move(richPath); - - ChunkSchemaId = chunkSchemaId; - ChunkSchema = std::move(chunkSchema); - - WriterLastKey = writerLastKey; - MaxHeavyColumns = maxHeavyColumns; - - TableAttributes = tableAttributes.ToMap(); - - ExternalCellTag = externalCellTag; - - Timestamp = timestamp; -} - void TTableWriterPatchInfo::Register(TRegistrar registrar) { registrar.Parameter("table_id", &TThis::ObjectId); @@ -63,8 +38,10 @@ void TTableWriterPatchInfo::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// -void TFragmentWriteResult::Register(TRegistrar registrar) +void TWriteFragmentResult::Register(TRegistrar registrar) { + registrar.Parameter("session_id", &TThis::SessionId); + registrar.Parameter("cookie_id", &TThis::CookieId); registrar.Parameter("min_boundary_key", &TThis::MinBoundaryKey); registrar.Parameter("max_boundary_key", &TThis::MaxBoundaryKey); registrar.Parameter("chunk_list_id", &TThis::ChunkListId); @@ -72,126 +49,89 @@ void TFragmentWriteResult::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// -const TTableWriterPatchInfo& TFragmentWriteCookie::GetPatchInfo() const +void TWriteFragmentCookie::Register(TRegistrar registrar) { - return PatchInfo_; -} + registrar.Parameter("session_id", &TThis::SessionId); + registrar.Parameter("cookie_id", &TThis::CookieId); -TTransactionId TFragmentWriteCookie::GetMainTransactionId() const -{ - return MainTxId_; -} + registrar.Parameter("transaction_id", &TThis::MainTransactionId); -TTransactionId TFragmentWriteCookie::GetUploadTransactionId() const -{ - return UploadTxId_; -} - -void TFragmentWriteCookie::Register(TRegistrar registrar) -{ - registrar.Parameter("session_id", &TThis::Id_); - - registrar.Parameter("tx_id", &TThis::MainTxId_); - registrar.Parameter("upload_tx_id", &TThis::UploadTxId_); - - registrar.Parameter("patch_info", &TThis::PatchInfo_); - - registrar.Parameter("writer_results", &TThis::WriteResults_); + registrar.Parameter("patch_info", &TThis::PatchInfo); } //////////////////////////////////////////////////////////////////////////////// TDistributedWriteSession::TDistributedWriteSession( - TTransactionId mainTxId, - TTransactionId uploadTxId, + TTransactionId mainTransactionId, + TTransactionId uploadTransactionId, TChunkListId rootChunkListId, - TTableWriterPatchInfo patchInfo) + NYPath::TRichYPath richPath, + NObjectClient::TObjectId objectId, + NObjectClient::TCellTag externalCellTag, + NTableClient::TMasterTableSchemaId chunkSchemaId, + NTableClient::TTableSchemaPtr chunkSchema, + std::optional<NTableClient::TLegacyOwningKey> writerLastKey, + int maxHeavyColumns, + NTransactionClient::TTimestamp timestamp, + const NYTree::IAttributeDictionary& tableAttributes) : TDistributedWriteSession() { - Id_ = TDistributedWriteSessionId(TGuid::Create()); - MainTxId_ = mainTxId; - UploadTxId_ = uploadTxId; + MainTransactionId = mainTransactionId; + UploadTransactionId = uploadTransactionId; - RootChunkListId_ = rootChunkListId; + RootChunkListId = rootChunkListId; - PatchInfo_ = std::move(patchInfo); + PatchInfo.ObjectId = objectId; + PatchInfo.RichPath = std::move(richPath); - // Signatures? -} + PatchInfo.ChunkSchemaId = chunkSchemaId; + PatchInfo.ChunkSchema = std::move(chunkSchema); -TTransactionId TDistributedWriteSession::GetMainTransactionId() const -{ - return MainTxId_; -} + PatchInfo.WriterLastKey = std::move(writerLastKey); + PatchInfo.MaxHeavyColumns = maxHeavyColumns; -TTransactionId TDistributedWriteSession::GetUploadTransactionId() const -{ - return UploadTxId_; -} + PatchInfo.TableAttributes = tableAttributes.ToMap(); -const TTableWriterPatchInfo& TDistributedWriteSession::GetPatchInfo() const Y_LIFETIME_BOUND -{ - return PatchInfo_; -} + PatchInfo.ExternalCellTag = externalCellTag; -TChunkListId TDistributedWriteSession::GetRootChunkListId() const -{ - return RootChunkListId_; + PatchInfo.Timestamp = timestamp; } -TFragmentWriteCookiePtr TDistributedWriteSession::GiveCookie() +TWriteFragmentCookie TDistributedWriteSession::CookieFromThis() const { - auto cookie = New<TFragmentWriteCookie>(); - cookie->Id_ = Id_; - cookie->MainTxId_ = MainTxId_; - cookie->UploadTxId_ = UploadTxId_; - cookie->PatchInfo_ = PatchInfo_; + auto cookie = TWriteFragmentCookie{}; + cookie.CookieId = TGuid::Create(); + cookie.SessionId = RootChunkListId; + cookie.MainTransactionId = MainTransactionId; + cookie.PatchInfo = PatchInfo; return cookie; } -void TDistributedWriteSession::TakeCookie(TFragmentWriteCookiePtr cookie) -{ - // Verify cookie signature? - WriteResults_.reserve(std::ssize(WriteResults_) + std::ssize(cookie->WriteResults_)); - for (const auto& writeResult : cookie->WriteResults_) { - WriteResults_.push_back(writeResult); - } -} - -TFuture<void> TDistributedWriteSession::Ping(IClientPtr client) -{ - // NB(arkady-e1ppa): AutoAbort = false by default. - auto mainTx = client->AttachTransaction(MainTxId_); - - return mainTx->Ping(); -} - void TDistributedWriteSession::Register(TRegistrar registrar) { - registrar.Parameter("session_id", &TThis::Id_); - registrar.Parameter("tx_id", &TThis::MainTxId_); - registrar.Parameter("upload_tx_id", &TThis::UploadTxId_); + registrar.Parameter("main_transaction_id", &TThis::MainTransactionId); + registrar.Parameter("upload_transaction_id", &TThis::UploadTransactionId); - registrar.Parameter("root_chunk_list_id", &TThis::RootChunkListId_); + registrar.Parameter("root_chunk_list_id", &TThis::RootChunkListId); - registrar.Parameter("patch_info", &TThis::PatchInfo_); - - registrar.Parameter("write_results", &TThis::WriteResults_); + registrar.Parameter("patch_info", &TThis::PatchInfo); } //////////////////////////////////////////////////////////////////////////////// -void* IDistributedTableClientBase::GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session) +TFuture<void> PingDistributedWriteSession( + const TSignedDistributedWriteSessionPtr& session, + const IClientPtr& client) { - return static_cast<void*>(&session->WriteResults_); -} + auto concreteSession = ConvertTo<TDistributedWriteSession>(session.Underlying()->Payload()); -void IDistributedTableClientBase::RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult) -{ - auto* concrete = static_cast<TFragmentWriteResult*>(opaqueWriteResult); - YT_ASSERT(concrete); - cookie->WriteResults_.push_back(*concrete); + // NB(arkady-e1ppa): AutoAbort = false by default. + auto mainTx = client->AttachTransaction(concreteSession.MainTransactionId); + + return mainTx->Ping(); } +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_session.h b/yt/yt/client/api/distributed_table_session.h index c8c074c84c..136ffcfcd4 100644 --- a/yt/yt/client/api/distributed_table_session.h +++ b/yt/yt/client/api/distributed_table_session.h @@ -16,10 +16,6 @@ namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// -YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid); - -//////////////////////////////////////////////////////////////////////////////// - // Used by distributed table writer to patch // its config based on table data. // NB(arkady-e1ppa): TableUploadOptions are @@ -28,7 +24,6 @@ YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid); struct TTableWriterPatchInfo : public NYTree::TYsonStructLite { -public: NObjectClient::TObjectId ObjectId; NYPath::TRichYPath RichPath; @@ -48,17 +43,6 @@ public: NTransactionClient::TTimestamp Timestamp; - TTableWriterPatchInfo( - NYPath::TRichYPath richPath, - NObjectClient::TObjectId objectId, - NObjectClient::TCellTag externalCellTag, - NTableClient::TMasterTableSchemaId chunkSchemaId, - NTableClient::TTableSchemaPtr chunkSchema, - std::optional<NTableClient::TLegacyOwningKey> writerLastKey, - int maxHeavyColumns, - NTransactionClient::TTimestamp timestamp, - const NYTree::IAttributeDictionary& tableAttributes); - REGISTER_YSON_STRUCT_LITE(TTableWriterPatchInfo) static void Register(TRegistrar registrar); @@ -66,93 +50,73 @@ public: //////////////////////////////////////////////////////////////////////////////// -struct TFragmentWriteResult +struct TWriteFragmentResult : public NYTree::TYsonStructLite { + TGuid SessionId; + TGuid CookieId; + NTableClient::TLegacyOwningKey MinBoundaryKey; NTableClient::TLegacyOwningKey MaxBoundaryKey; NChunkClient::TChunkListId ChunkListId; - REGISTER_YSON_STRUCT_LITE(TFragmentWriteResult) + REGISTER_YSON_STRUCT_LITE(TWriteFragmentResult) static void Register(TRegistrar registrar); }; //////////////////////////////////////////////////////////////////////////////// -class TFragmentWriteCookie - : public NYTree::TYsonStruct +struct TWriteFragmentCookie + : public NYTree::TYsonStructLite { -public: - const TTableWriterPatchInfo& GetPatchInfo() const; - - NCypressClient::TTransactionId GetMainTransactionId() const; - NCypressClient::TTransactionId GetUploadTransactionId() const; + TGuid SessionId; + TGuid CookieId; -private: - TDistributedWriteSessionId Id_; + NCypressClient::TTransactionId MainTransactionId; - NCypressClient::TTransactionId MainTxId_; - NCypressClient::TTransactionId UploadTxId_; + TTableWriterPatchInfo PatchInfo; - TTableWriterPatchInfo PatchInfo_; - - std::vector<TFragmentWriteResult> WriteResults_; - - REGISTER_YSON_STRUCT(TFragmentWriteCookie); + REGISTER_YSON_STRUCT_LITE(TWriteFragmentCookie); static void Register(TRegistrar registrar); - - friend class TDistributedWriteSession; - friend struct IDistributedTableClientBase; }; -DEFINE_REFCOUNTED_TYPE(TFragmentWriteCookie); - //////////////////////////////////////////////////////////////////////////////// -class TDistributedWriteSession - : public NYTree::TYsonStruct +struct TDistributedWriteSession + : public NYTree::TYsonStructLite { -public: - TDistributedWriteSession( - NCypressClient::TTransactionId mainTxId, - NCypressClient::TTransactionId uploadTxId, - NChunkClient::TChunkListId rootChunkListId, - TTableWriterPatchInfo patchInfo); - - NCypressClient::TTransactionId GetMainTransactionId() const; - NCypressClient::TTransactionId GetUploadTransactionId() const; - const TTableWriterPatchInfo& GetPatchInfo() const Y_LIFETIME_BOUND; - NChunkClient::TChunkListId GetRootChunkListId() const; - - TFragmentWriteCookiePtr GiveCookie(); - void TakeCookie(TFragmentWriteCookiePtr cookie); - - TFuture<void> Ping(IClientPtr client); - REGISTER_YSON_STRUCT(TDistributedWriteSession); + // RootChunkListId is used for indentification. + NChunkClient::TChunkListId RootChunkListId; - static void Register(TRegistrar registrar); - -private: - TDistributedWriteSessionId Id_; + NCypressClient::TTransactionId MainTransactionId; + NCypressClient::TTransactionId UploadTransactionId; - NCypressClient::TTransactionId MainTxId_; - NCypressClient::TTransactionId UploadTxId_; + TTableWriterPatchInfo PatchInfo; - NChunkClient::TChunkListId RootChunkListId_; + TDistributedWriteSession( + NCypressClient::TTransactionId mainTransactionId, + NCypressClient::TTransactionId uploadTransactionId, + NChunkClient::TChunkListId rootChunkListId, + NYPath::TRichYPath richPath, + NObjectClient::TObjectId objectId, + NObjectClient::TCellTag externalCellTag, + NTableClient::TMasterTableSchemaId chunkSchemaId, + NTableClient::TTableSchemaPtr chunkSchema, + std::optional<NTableClient::TLegacyOwningKey> writerLastKey, + int maxHeavyColumns, + NTransactionClient::TTimestamp timestamp, + const NYTree::IAttributeDictionary& tableAttributes); - TTableWriterPatchInfo PatchInfo_; + TWriteFragmentCookie CookieFromThis() const; - // This is used to commit changes when session is over. - std::vector<TFragmentWriteResult> WriteResults_; + REGISTER_YSON_STRUCT_LITE(TDistributedWriteSession); - friend struct IDistributedTableClientBase; + static void Register(TRegistrar registrar); }; -DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession); - //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NApi diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index 0e92ead601..a8293e640e 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -146,6 +146,8 @@ DECLARE_REFCOUNTED_STRUCT(IRowBatchWriter) DECLARE_REFCOUNTED_STRUCT(ITableReader) DECLARE_REFCOUNTED_STRUCT(ITableWriter) +DECLARE_REFCOUNTED_STRUCT(ITableFragmentWriter); + DECLARE_REFCOUNTED_STRUCT(IFileReader) DECLARE_REFCOUNTED_STRUCT(IFileWriter) @@ -187,10 +189,6 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest) DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter) -DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession) -DECLARE_REFCOUNTED_CLASS(TFragmentWriteCookie) -struct IDistributedTableClientBase; - DECLARE_REFCOUNTED_STRUCT(TShuffleHandle) //////////////////////////////////////////////////////////////////////////////// @@ -240,4 +238,11 @@ using TMaintenanceCountsPerTarget = TCompactFlatMap<std::string, TMaintenanceCou //////////////////////////////////////////////////////////////////////////////// +using NTableClient::TSignedDistributedWriteSessionPtr; +using NTableClient::TSignedWriteFragmentCookiePtr; +using NTableClient::TSignedWriteFragmentResultPtr; +struct TWriteFragmentCookie; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NApi diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index 58b2538df6..26062e93ee 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -21,6 +21,8 @@ #include <yt/yt/client/chaos_client/replication_card_serialization.h> +#include <yt/yt/client/signature/signature.h> + #include <yt/yt/client/table_client/name_table.h> #include <yt/yt/client/table_client/row_base.h> #include <yt/yt/client/table_client/row_buffer.h> @@ -792,7 +794,7 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter( //////////////////////////////////////////////////////////////////////////////// -TFuture<TDistributedWriteSessionPtr> TClientBase::StartDistributedWriteSession( +TFuture<TDistributedWriteSessionWithCookies> TClientBase::StartDistributedWriteSession( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options) { @@ -804,21 +806,28 @@ TFuture<TDistributedWriteSessionPtr> TClientBase::StartDistributedWriteSession( FillRequest(req.Get(), path, options); return req->Invoke() - .ApplyUnique(BIND([] (TRsp&& result) -> TDistributedWriteSessionPtr { - return ConvertTo<TDistributedWriteSessionPtr>(TYsonString(result->session())); + .ApplyUnique(BIND([] (TRsp&& result) -> TDistributedWriteSessionWithCookies { + std::vector<TSignedWriteFragmentCookiePtr> cookies; + cookies.reserve(result->signed_cookies().size()); + for (const auto& cookie : result->signed_cookies()) { + cookies.push_back(ConvertTo<TSignedWriteFragmentCookiePtr>(TYsonString(cookie))); + } + return TDistributedWriteSessionWithCookies{ + .Session = ConvertTo<TSignedDistributedWriteSessionPtr>(TYsonString(result->signed_session())), + .Cookies = std::move(cookies), + }; })); } TFuture<void> TClientBase::FinishDistributedWriteSession( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options) { auto proxy = CreateApiServiceProxy(); auto req = proxy.FinishDistributedWriteSession(); - FillRequest(req.Get(), std::move(session), options); - + FillRequest(req.Get(), sessionWithResults, options); return req->Invoke().AsVoid(); } diff --git a/yt/yt/client/api/rpc_proxy/client_base.h b/yt/yt/client/api/rpc_proxy/client_base.h index 2ccdb1adb4..424752d95e 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.h +++ b/yt/yt/client/api/rpc_proxy/client_base.h @@ -185,12 +185,12 @@ public: const NApi::TTableWriterOptions& options) override; // Distributed table client - TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( + TFuture<TDistributedWriteSessionWithCookies> StartDistributedWriteSession( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options) override; TFuture<void> FinishDistributedWriteSession( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options) override; }; diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 3ab8c239d5..5c90b89d18 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -16,6 +16,8 @@ #include <yt/yt/client/scheduler/operation_id_or_alias.h> +#include <yt/yt/client/signature/signature.h> + #include <yt/yt/client/table_client/columnar_statistics.h> #include <yt/yt/client/table_client/schema.h> #include <yt/yt/client/table_client/unversioned_row.h> @@ -742,11 +744,12 @@ TFuture<void> TClient::AlterReplicationCard( return req->Invoke().As<void>(); } -TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter( - const TFragmentWriteCookiePtr& cookie, - const TFragmentTableWriterOptions& options) +TFuture<ITableFragmentWriterPtr> TClient::CreateTableFragmentWriter( + const TSignedWriteFragmentCookiePtr& cookie, + const TTableFragmentWriterOptions& options) { using TRspPtr = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspWriteTableFragment>>; + YT_VERIFY(cookie); auto proxy = CreateApiServiceProxy(); auto req = proxy.WriteTableFragment(); @@ -755,22 +758,29 @@ TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter( FillRequest(req.Get(), cookie, options); auto schema = New<TTableSchema>(); + auto promise = NewPromise<TSignedWriteFragmentResultPtr>(); + + // NB(arkady-e1ppa): Whenever stream is over, rsp future is set + // with the value of write result. We create a channel via promise-future + // to transfer this write result to the TableWriter adapter. In order to avoid races + // when TableWriter is already closed (and so is stream) but the promise with + // write result is not yet set, consider writer closed only after said promise is set. return CreateRpcClientOutputStream( std::move(req), BIND ([=] (const TSharedRef& metaRef) { NApi::NRpcProxy::NProto::TWriteTableMeta meta; if (!TryDeserializeProto(&meta, metaRef)) { - THROW_ERROR_EXCEPTION("Failed to deserialize schema for fragment table writer"); + THROW_ERROR_EXCEPTION("Failed to deserialize schema for table fragment writer"); } FromProto(schema.Get(), meta.schema()); }), - BIND([=] (TRspPtr&& rsp) mutable { - Deserialize(*cookie, ConvertToNode(TYsonString(rsp->cookie()))); + BIND([=] (TRspPtr&& rsp) { + promise.Set(ConvertTo<TSignedWriteFragmentResultPtr>(TYsonString(rsp->signed_write_result()))); })) - .ApplyUnique(BIND([=] (IAsyncZeroCopyOutputStreamPtr&& outputStream) { - return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema)); - })).As<ITableWriterPtr>(); + .ApplyUnique(BIND([=, future = promise.ToFuture()] (IAsyncZeroCopyOutputStreamPtr&& outputStream) { + return NRpcProxy::CreateTableFragmentWriter(std::move(outputStream), std::move(schema), std::move(future)); + })); } TFuture<IQueueRowsetPtr> TClient::PullQueue( diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index 4a5cc37d79..a161cfe88d 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -130,9 +130,9 @@ public: const TAlterReplicationCardOptions& options = {}) override; // Distributed table client - TFuture<ITableWriterPtr> CreateFragmentTableWriter( - const TFragmentWriteCookiePtr& cookie, - const TFragmentTableWriterOptions& options) override; + TFuture<ITableFragmentWriterPtr> CreateTableFragmentWriter( + const TSignedWriteFragmentCookiePtr& cookie, + const TTableFragmentWriterOptions& options) override; // Queues. TFuture<NQueueClient::IQueueRowsetPtr> PullQueue( diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 9c558cfdf2..5baa8460ee 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -6,6 +6,8 @@ #include <yt/yt/client/sequoia_client/public.h> +#include <yt/yt/client/signature/signature.h> + #include <yt/yt/client/table_client/columnar_statistics.h> #include <yt/yt/client/table_client/column_sort_schema.h> #include <yt/yt/client/table_client/logical_type.h> @@ -1964,6 +1966,7 @@ void FillRequest( const TDistributedWriteSessionStartOptions& options) { ToProto(req->mutable_path(), path); + req->set_cookie_count(options.CookieCount); if (options.TransactionId) { ToProto(req->mutable_transactional_options(), options); @@ -1976,6 +1979,7 @@ void ParseRequest( const TReqStartDistributedWriteSession& req) { *mutablePath = FromProto<NYPath::TRichYPath>(req.path()); + mutableOptions->CookieCount = req.cookie_count(); if (req.has_transactional_options()) { FromProto(mutableOptions, req.transactional_options()); } @@ -1985,19 +1989,31 @@ void ParseRequest( void FillRequest( TReqFinishDistributedWriteSession* req, - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options) { - req->set_session(ConvertToYsonString(session).ToString()); + YT_VERIFY(sessionWithResults.Session); + + req->set_signed_session(ConvertToYsonString(sessionWithResults.Session).ToString()); + for (const auto& writeResult : sessionWithResults.Results) { + YT_VERIFY(writeResult); + req->add_signed_write_results(ConvertToYsonString(writeResult).ToString()); + } req->set_max_children_per_attach_request(options.MaxChildrenPerAttachRequest); } void ParseRequest( - TDistributedWriteSessionPtr* mutableSession, + TDistributedWriteSessionWithResults* mutableSessionWithResults, TDistributedWriteSessionFinishOptions* mutableOptions, const TReqFinishDistributedWriteSession& req) { - *mutableSession = ConvertTo<TDistributedWriteSessionPtr>(TYsonString(req.session())); + mutableSessionWithResults->Results.reserve(req.signed_write_results().size()); + for (const auto& writeResult : req.signed_write_results()) { + mutableSessionWithResults->Results.push_back(ConvertTo<TSignedWriteFragmentResultPtr>(TYsonString(writeResult))); + } + + mutableSessionWithResults->Session = ConvertTo<TSignedDistributedWriteSessionPtr>(TYsonString(req.signed_session())); + mutableOptions->MaxChildrenPerAttachRequest = req.max_children_per_attach_request(); } @@ -2005,10 +2021,10 @@ void ParseRequest( void FillRequest( TReqWriteTableFragment* req, - const TFragmentWriteCookiePtr& cookie, - const TFragmentTableWriterOptions& options) + const TSignedWriteFragmentCookiePtr& cookie, + const TTableFragmentWriterOptions& options) { - req->set_cookie(ConvertToYsonString(cookie).ToString()); + req->set_signed_cookie(ConvertToYsonString(cookie).ToString()); if (options.Config) { req->set_config(ConvertToYsonString(*options.Config).ToString()); @@ -2016,11 +2032,11 @@ void FillRequest( } void ParseRequest( - TFragmentWriteCookiePtr* mutableCookie, - TFragmentTableWriterOptions* mutableOptions, + TSignedWriteFragmentCookiePtr* mutableCookie, + TTableFragmentWriterOptions* mutableOptions, const TReqWriteTableFragment& req) { - *mutableCookie = ConvertTo<TFragmentWriteCookiePtr>(TYsonString(req.cookie())); + *mutableCookie = ConvertTo<TSignedWriteFragmentCookiePtr>(TYsonString(req.signed_cookie())); if (req.has_config()) { mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(req.config())); } else { diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h index dfe89178cb..9e7f432e62 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.h +++ b/yt/yt/client/api/rpc_proxy/helpers.h @@ -296,11 +296,11 @@ void ParseRequest( void FillRequest( TReqFinishDistributedWriteSession* req, - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options); void ParseRequest( - TDistributedWriteSessionPtr* mutableSession, + TDistributedWriteSessionWithResults* mutableSessionWithResults, TDistributedWriteSessionFinishOptions* mutableOptions, const TReqFinishDistributedWriteSession& req); @@ -308,12 +308,12 @@ void ParseRequest( void FillRequest( TReqWriteTableFragment* req, - const TFragmentWriteCookiePtr& cookie, - const TFragmentTableWriterOptions& options); + const TSignedWriteFragmentCookiePtr& cookie, + const TTableFragmentWriterOptions& options); void ParseRequest( - TFragmentWriteCookiePtr* mutableCookie, - TFragmentTableWriterOptions* mutableOptions, + TSignedWriteFragmentCookiePtr* mutableCookie, + TTableFragmentWriterOptions* mutableOptions, const TReqWriteTableFragment& req); } // namespace NProto diff --git a/yt/yt/client/api/rpc_proxy/table_writer.cpp b/yt/yt/client/api/rpc_proxy/table_writer.cpp index d957c67023..7c258019d5 100644 --- a/yt/yt/client/api/rpc_proxy/table_writer.cpp +++ b/yt/yt/client/api/rpc_proxy/table_writer.cpp @@ -42,4 +42,52 @@ ITableWriterPtr CreateTableWriter( //////////////////////////////////////////////////////////////////////////////// +class TTableFragmentWriter + : public TRowBatchWriter + , public ITableFragmentWriter +{ +public: + TTableFragmentWriter( + IAsyncZeroCopyOutputStreamPtr underlying, + TTableSchemaPtr schema, + TFuture<TSignedWriteFragmentResultPtr> asyncResult) + : TRowBatchWriter(std::move(underlying)) + , Schema_(std::move(schema)) + , AsyncResult_(std::move(asyncResult)) + { + } + + const TTableSchemaPtr& GetSchema() const override + { + return Schema_; + } + + TFuture<void> Close() override + { + return AllSucceeded<void>({TRowBatchWriter::Close(), AsyncResult_.AsVoid()}); + } + + TSignedWriteFragmentResultPtr GetWriteFragmentResult() const override + { + YT_VERIFY(AsyncResult_.IsSet()); + auto resultOrError = AsyncResult_.Get(); + YT_VERIFY(resultOrError.IsOK()); + return resultOrError.Value(); + } + +private: + const TTableSchemaPtr Schema_; + const TFuture<TSignedWriteFragmentResultPtr> AsyncResult_; +}; + +ITableFragmentWriterPtr CreateTableFragmentWriter( + NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream, + TTableSchemaPtr tableSchema, + TFuture<TSignedWriteFragmentResultPtr> asyncResult) +{ + return New<TTableFragmentWriter>(std::move(outputStream), std::move(tableSchema), std::move(asyncResult)); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NApi::NRpcProxy diff --git a/yt/yt/client/api/rpc_proxy/table_writer.h b/yt/yt/client/api/rpc_proxy/table_writer.h index 21e20c4ba4..4d8348f989 100644 --- a/yt/yt/client/api/rpc_proxy/table_writer.h +++ b/yt/yt/client/api/rpc_proxy/table_writer.h @@ -12,6 +12,15 @@ ITableWriterPtr CreateTableWriter( NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream, NTableClient::TTableSchemaPtr tableSchema); +// NB(arkady-e1ppa): Promise is expected to be set +// whenever outputStream is closed. +// TODO(arkady-e1ppa): Introduce a separate output stream +// which has this logic built in to make this code less fragile. +ITableFragmentWriterPtr CreateTableFragmentWriter( + NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream, + NTableClient::TTableSchemaPtr tableSchema, + TFuture<NTableClient::TSignedWriteFragmentResultPtr> asyncResult); + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NApi::NRpcProxy diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index e52fdf7acb..89da586fdf 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -921,7 +921,7 @@ IJournalWriterPtr TTransaction::CreateJournalWriter( PatchTransactionId(options)); } -TFuture<TDistributedWriteSessionPtr> TTransaction::StartDistributedWriteSession( +TFuture<TDistributedWriteSessionWithCookies> TTransaction::StartDistributedWriteSession( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options) { @@ -932,12 +932,12 @@ TFuture<TDistributedWriteSessionPtr> TTransaction::StartDistributedWriteSession( } TFuture<void> TTransaction::FinishDistributedWriteSession( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options) { ValidateActive(); return Client_->FinishDistributedWriteSession( - std::move(session), + sessionWithResults, options); } diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index 3faa58d708..20ac689bcb 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -235,12 +235,12 @@ public: const NYPath::TYPath& path, const NApi::TJournalWriterOptions& options) override; - TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( + TFuture<TDistributedWriteSessionWithCookies> StartDistributedWriteSession( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options = {}) override; TFuture<void> FinishDistributedWriteSession( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options = {}) override; // Custom methods. diff --git a/yt/yt/client/api/table_writer.h b/yt/yt/client/api/table_writer.h index f4d42c3c78..4ac0fa0fd7 100644 --- a/yt/yt/client/api/table_writer.h +++ b/yt/yt/client/api/table_writer.h @@ -18,4 +18,15 @@ DEFINE_REFCOUNTED_TYPE(ITableWriter) //////////////////////////////////////////////////////////////////////////////// +struct ITableFragmentWriter + : public ITableWriter +{ + //! Returns signed write result. Only safe to use after |Close|. + virtual TSignedWriteFragmentResultPtr GetWriteFragmentResult() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(ITableFragmentWriter) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NApi diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp index 7bd88c5279..bfb81c20c3 100644 --- a/yt/yt/client/driver/distributed_table_commands.cpp +++ b/yt/yt/client/driver/distributed_table_commands.cpp @@ -3,10 +3,13 @@ #include "helpers.h" #include <yt/yt/client/api/distributed_table_session.h> +#include <yt/yt/client/api/table_writer.h> #include <yt/yt/client/formats/config.h> #include <yt/yt/client/formats/parser.h> +#include <yt/yt/client/signature/signature.h> + #include <yt/yt/client/ypath/public.h> #include <yt/yt/library/formats/format.h> @@ -37,12 +40,12 @@ void TStartDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context) { auto transaction = AttachTransaction(context, /*required*/ false); - auto session = WaitFor(context->GetClient()->StartDistributedWriteSession( + auto sessionAndCookies = WaitFor(context->GetClient()->StartDistributedWriteSession( Path, Options)); - ProduceOutput(context, [session = std::move(session)] (IYsonConsumer* consumer) { - Serialize(session, consumer); + ProduceOutput(context, [sessionAndCookies = std::move(sessionAndCookies)] (IYsonConsumer* consumer) { + Serialize(sessionAndCookies, consumer); }); } @@ -51,15 +54,20 @@ void TStartDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context) void TFinishDistributedWriteSessionCommand::Register(TRegistrar registrar) { registrar.Parameter("session", &TThis::Session); + registrar.Parameter("results", &TThis::Results); } // -> Nothing void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context) { - auto session = ConvertTo<TDistributedWriteSessionPtr>(Session); + auto session = ConvertTo<TSignedDistributedWriteSessionPtr>(Session); + auto results = ConvertTo<std::vector<NTableClient::TSignedWriteFragmentResultPtr>>(Results); WaitFor(context->GetClient()->FinishDistributedWriteSession( - std::move(session), + TDistributedWriteSessionWithResults{ + .Session = std::move(session), + .Results = std::move(results), + }, Options)) .ThrowOnError(); @@ -70,33 +78,43 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context void TWriteTableFragmentCommand::Execute(ICommandContextPtr context) { - TTypedCommand<NApi::TFragmentTableWriterOptions>::Execute(std::move(context)); + TTypedCommand<NApi::TTableFragmentWriterOptions>::Execute(std::move(context)); } -void TWriteTableFragmentCommand::Register(TRegistrar /*registrar*/) -{ } +void TWriteTableFragmentCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("cookie", &TThis::Cookie); +} -TFuture<NApi::ITableWriterPtr> TWriteTableFragmentCommand::CreateTableWriter( - const ICommandContextPtr& context) const +NApi::ITableWriterPtr TWriteTableFragmentCommand::CreateTableWriter( + const ICommandContextPtr& context) { - PutMethodInfoInTraceContext("participant_write_table"); + PutMethodInfoInTraceContext("write_table_fragment"); - return context + auto tableWriter = WaitFor(context ->GetClient() - ->CreateFragmentTableWriter( - StaticPointerCast<TFragmentWriteCookie>(ResultingCookie), - TTypedCommand<TFragmentTableWriterOptions>::Options); + ->CreateTableFragmentWriter( + ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie), + TTypedCommand<TTableFragmentWriterOptions>::Options)) + .ValueOrThrow(); + + TableWriter = tableWriter; + return tableWriter; } // -> Cookie void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context) { - auto cookie = ConvertTo<TFragmentWriteCookiePtr>(Cookie); - ResultingCookie = StaticPointerCast<TRefCounted>(std::move(cookie)); + auto cookie = ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie); DoExecuteImpl(context); - ProduceOutput(context, [cookie = std::move(ResultingCookie)] (IYsonConsumer* consumer) { - Serialize(StaticPointerCast<TFragmentWriteCookie>(cookie), consumer); + + // Sadly, we are plagued by virtual bases :/. + auto writer = DynamicPointerCast<NApi::ITableFragmentWriter>(TableWriter); + ProduceOutput(context, [result = writer->GetWriteFragmentResult()] (IYsonConsumer* consumer) { + Serialize( + *result.Underlying(), + consumer); }); } diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h index fa3d6bb4e5..57fcfd7c6d 100644 --- a/yt/yt/client/driver/distributed_table_commands.h +++ b/yt/yt/client/driver/distributed_table_commands.h @@ -38,6 +38,7 @@ public: private: NYTree::INodePtr Session; + std::vector<NYTree::INodePtr> Results; void DoExecute(ICommandContextPtr context) override; }; @@ -46,7 +47,7 @@ private: // -> Cookie class TWriteTableFragmentCommand - : public TTypedCommand<NApi::TFragmentTableWriterOptions> + : public TTypedCommand<NApi::TTableFragmentWriterOptions> , private TWriteTableCommand { public: @@ -62,10 +63,10 @@ private: using TBase = TWriteTableCommand; NYTree::INodePtr Cookie; - TRefCountedPtr ResultingCookie; + TRefCountedPtr TableWriter; - TFuture<NApi::ITableWriterPtr> CreateTableWriter( - const ICommandContextPtr& context) const override; + NApi::ITableWriterPtr CreateTableWriter( + const ICommandContextPtr& context) override; void DoExecute(ICommandContextPtr context) override; }; diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index 32568fdc3c..9be3e37f3e 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -397,6 +397,8 @@ public: REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false); REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false); REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false); + + // TODO(arkady-e1ppa): flags past command name might be complete rubbish -- think them through later. REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false); REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false); REGISTER_ALL(TWriteTableFragmentCommand, "distributed_write_table_partition", Tabular, Structured, true, true ); diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index 3ae3e78a23..e358d2f8ee 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -267,14 +267,15 @@ void TWriteTableCommand::Register(TRegistrar registrar) .Default(1_MB); } -TFuture<ITableWriterPtr> TWriteTableCommand::CreateTableWriter( - const ICommandContextPtr& context) const +NApi::ITableWriterPtr TWriteTableCommand::CreateTableWriter( + const ICommandContextPtr& context) { PutMethodInfoInTraceContext("write_table"); - return context->GetClient()->CreateTableWriter( + return WaitFor(context->GetClient()->CreateTableWriter( Path, - Options); + Options)) + .ValueOrThrow(); } void TWriteTableCommand::DoExecuteImpl(const ICommandContextPtr& context) @@ -289,8 +290,7 @@ void TWriteTableCommand::DoExecuteImpl(const ICommandContextPtr& context) Options.PingAncestors = true; Options.Config = config; - auto apiWriter = WaitFor(CreateTableWriter(context)) - .ValueOrThrow(); + auto apiWriter = CreateTableWriter(context); auto schemalessWriter = CreateSchemalessFromApiWriterAdapter(std::move(apiWriter)); diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h index d511e76622..97f52e44fd 100644 --- a/yt/yt/client/driver/table_commands.h +++ b/yt/yt/client/driver/table_commands.h @@ -83,8 +83,8 @@ public: static void Register(TRegistrar registrar); protected: - virtual TFuture<NApi::ITableWriterPtr> CreateTableWriter( - const ICommandContextPtr& context) const; + virtual NApi::ITableWriterPtr CreateTableWriter( + const ICommandContextPtr& context); void DoExecuteImpl(const ICommandContextPtr& context); diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 170dedf910..86555b39d6 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -236,8 +236,8 @@ public: UNIMPLEMENTED_METHOD(IFileWriterPtr, CreateFileWriter, (const NYPath::TRichYPath&, const TFileWriterOptions&)); UNIMPLEMENTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const NYPath::TYPath&, const TJournalReaderOptions&)); UNIMPLEMENTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const NYPath::TYPath&, const TJournalWriterOptions&)); - UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); - UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); + UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (const TDistributedWriteSessionWithResults&, const TDistributedWriteSessionFinishOptions&)); private: const TClientPtr Client_; @@ -484,9 +484,9 @@ public: UNIMPLEMENTED_METHOD(TFuture<void>, PausePipeline, (const NYPath::TYPath&, const TPausePipelineOptions&)); UNIMPLEMENTED_METHOD(TFuture<TPipelineState>, GetPipelineState, (const NYPath::TYPath&, const TGetPipelineStateOptions&)); UNIMPLEMENTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&)); - UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); - UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); - UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); + UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (const TDistributedWriteSessionWithResults&, const TDistributedWriteSessionFinishOptions&)); + UNIMPLEMENTED_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, (const TSignedWriteFragmentCookiePtr&, const TTableFragmentWriterOptions&)); UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const std::string& , int, NObjectClient::TTransactionId, const TStartShuffleOptions&)); UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&)); UNIMPLEMENTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TShuffleHandlePtr&, const std::string&, const NTableClient::TTableWriterConfigPtr&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index ac96c64e39..e5bdd74b22 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -109,9 +109,9 @@ public: UNSUPPORTED_METHOD(IFileWriterPtr, CreateFileWriter, (const TRichYPath&, const TFileWriterOptions&)); UNSUPPORTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const TYPath&, const TJournalReaderOptions&)); UNSUPPORTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const TYPath&, const TJournalWriterOptions&)); - UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); - UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); - UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&)); + UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); + UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (const TDistributedWriteSessionWithResults&, const TDistributedWriteSessionFinishOptions&)); + UNSUPPORTED_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, (const TSignedWriteFragmentCookiePtr&, const TTableFragmentWriterOptions&)); // IClient methods. // Unsupported methods. diff --git a/yt/yt/client/signature/signature.h b/yt/yt/client/signature/signature.h index b163ecc0e9..6cb739bd3c 100644 --- a/yt/yt/client/signature/signature.h +++ b/yt/yt/client/signature/signature.h @@ -26,7 +26,9 @@ public: [[nodiscard]] const NYson::TYsonString& Payload() const; private: - NYson::TYsonString Header_; + // TODO(arkady-e1ppa): Whenever trivial generator/validators are added + // remove initialization. + NYson::TYsonString Header_ = NYson::TYsonString(TStringBuf("")); NYson::TYsonString Payload_; std::vector<std::byte> Signature_; diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp index 648a208ea4..f17c1dcdac 100644 --- a/yt/yt/client/table_client/adapters.cpp +++ b/yt/yt/client/table_client/adapters.cpp @@ -25,47 +25,69 @@ static constexpr auto& Logger = TableClientLogger; //////////////////////////////////////////////////////////////////////////////// +template <std::derived_from<ITableWriter> TInterface, std::derived_from<IUnversionedWriter> TUnderlyingInterface> class TApiFromSchemalessWriterAdapter - : public ITableWriter + : public TInterface { public: - explicit TApiFromSchemalessWriterAdapter(IUnversionedWriterPtr underlyingWriter) + explicit TApiFromSchemalessWriterAdapter(TIntrusivePtr<TUnderlyingInterface> underlyingWriter) : UnderlyingWriter_(std::move(underlyingWriter)) { } - bool Write(TRange<TUnversionedRow> rows) override + bool Write(TRange<TUnversionedRow> rows) /*override*/ { return UnderlyingWriter_->Write(rows); } - TFuture<void> GetReadyEvent() override + TFuture<void> GetReadyEvent() /*override*/ { return UnderlyingWriter_->GetReadyEvent(); } - TFuture<void> Close() override + TFuture<void> Close() /*override*/ { return UnderlyingWriter_->Close(); } - const TNameTablePtr& GetNameTable() const override + const TNameTablePtr& GetNameTable() const /*override*/ { return UnderlyingWriter_->GetNameTable(); } - const TTableSchemaPtr& GetSchema() const override + const TTableSchemaPtr& GetSchema() const /*override*/ { return UnderlyingWriter_->GetSchema(); } -private: - const IUnversionedWriterPtr UnderlyingWriter_; +protected: + const TIntrusivePtr<TUnderlyingInterface> UnderlyingWriter_; }; ITableWriterPtr CreateApiFromSchemalessWriterAdapter( IUnversionedWriterPtr underlyingWriter) { - return New<TApiFromSchemalessWriterAdapter>(std::move(underlyingWriter)); + return New<TApiFromSchemalessWriterAdapter<ITableWriter, IUnversionedWriter>>(std::move(underlyingWriter)); +} + +//////////////////////////////////////////////////////////////////////////////// + +class TApiFromSchemalessTableFragmentWriterAdapter + : public TApiFromSchemalessWriterAdapter<ITableFragmentWriter, IUnversionedTableFragmentWriter> +{ +public: + using TBase = TApiFromSchemalessWriterAdapter<ITableFragmentWriter, IUnversionedTableFragmentWriter>; + using TBase::TBase; + + TSignedWriteFragmentResultPtr GetWriteFragmentResult() const /*override*/ + { + return TBase::UnderlyingWriter_->GetWriteFragmentResult(); + } +}; + +ITableFragmentWriterPtr CreateApiFromSchemalessWriterAdapter( + IUnversionedTableFragmentWriterPtr underlyingWriter) +{ + return New<TApiFromSchemalessTableFragmentWriterAdapter>(std::move(underlyingWriter)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h index f90cbe101d..f4b058bf23 100644 --- a/yt/yt/client/table_client/adapters.h +++ b/yt/yt/client/table_client/adapters.h @@ -21,6 +21,9 @@ IUnversionedWriterPtr CreateSchemalessFromApiWriterAdapter( NApi::ITableWriterPtr CreateApiFromSchemalessWriterAdapter( IUnversionedWriterPtr underlyingWriter); +NApi::ITableFragmentWriterPtr CreateApiFromSchemalessWriterAdapter( + IUnversionedTableFragmentWriterPtr underlyingWriter); + //////////////////////////////////////////////////////////////////////////////// struct TPipeReaderToWriterOptions diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h index b577fb929d..c397838e60 100644 --- a/yt/yt/client/table_client/public.h +++ b/yt/yt/client/table_client/public.h @@ -4,6 +4,8 @@ #include <yt/yt/client/cypress_client/public.h> +#include <yt/yt/client/signature/public.h> + #include <yt/yt/client/tablet_client/public.h> #include <yt/yt/client/transaction_client/public.h> @@ -343,6 +345,7 @@ DECLARE_REFCOUNTED_CLASS(TRowBuffer) DECLARE_REFCOUNTED_STRUCT(ISchemalessUnversionedReader) DECLARE_REFCOUNTED_STRUCT(ISchemafulUnversionedReader) DECLARE_REFCOUNTED_STRUCT(IUnversionedWriter) +DECLARE_REFCOUNTED_STRUCT(IUnversionedTableFragmentWriter) DECLARE_REFCOUNTED_STRUCT(IUnversionedRowsetWriter) using TSchemalessWriterFactory = std::function<IUnversionedRowsetWriterPtr( @@ -454,4 +457,10 @@ struct TVersionedWriteOptions; //////////////////////////////////////////////////////////////////////////////// +YT_DEFINE_STRONG_TYPEDEF(TSignedDistributedWriteSessionPtr, NSignature::TSignaturePtr); +YT_DEFINE_STRONG_TYPEDEF(TSignedWriteFragmentCookiePtr, NSignature::TSignaturePtr); +YT_DEFINE_STRONG_TYPEDEF(TSignedWriteFragmentResultPtr, NSignature::TSignaturePtr); + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/unversioned_writer.h b/yt/yt/client/table_client/unversioned_writer.h index b8ba78d621..22814d3b0b 100644 --- a/yt/yt/client/table_client/unversioned_writer.h +++ b/yt/yt/client/table_client/unversioned_writer.h @@ -62,4 +62,15 @@ DEFINE_REFCOUNTED_TYPE(IUnversionedWriter) //////////////////////////////////////////////////////////////////////////////// +struct IUnversionedTableFragmentWriter + : public IUnversionedWriter +{ + //! Returns signed write result. Only safe to use after |Stop|. + virtual TSignedWriteFragmentResultPtr GetWriteFragmentResult() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IUnversionedTableFragmentWriter) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NTableClient diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index 31c76a9071..e29d989ceb 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -831,19 +831,19 @@ public: const TGetFlowViewOptions& options), (override)); - MOCK_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, ( + MOCK_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, ( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options), (override)); MOCK_METHOD(TFuture<void>, FinishDistributedWriteSession, ( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options), (override)); - MOCK_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, ( - const TFragmentWriteCookiePtr& cookie, - const TFragmentTableWriterOptions& options), + MOCK_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, ( + const TSignedWriteFragmentCookiePtr& cookie, + const TTableFragmentWriterOptions& options), (override)); MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, ( diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index 943e4fa201..713bf0b443 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -151,13 +151,13 @@ public: const NYPath::TYPath& path, const TJournalWriterOptions& options), (override)); - MOCK_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, ( + MOCK_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, ( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options), (override)); MOCK_METHOD(TFuture<void>, FinishDistributedWriteSession, ( - TDistributedWriteSessionPtr session, + const TDistributedWriteSessionWithResults& sessionWithResults, const TDistributedWriteSessionFinishOptions& options), (override)); diff --git a/yt/yt/core/rpc/stream.h b/yt/yt/core/rpc/stream.h index f2220e7425..c7d765c88e 100644 --- a/yt/yt/core/rpc/stream.h +++ b/yt/yt/core/rpc/stream.h @@ -266,6 +266,9 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream TCallback<void(TSharedRef)> metaHandler); //! This variant additionally allows non-trivial response of streaming request to be handled. +//! TODO(arkady-e1ppa): Introduce IAsyncZeroCopyOutputStream<TRet> which |Close| returns +//! TFuture<TRet> instead of TFuture<void> as a way to transfer data via rsp +//! use it here. template <class TRequestMessage, class TResponse> TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream( TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request, diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 723a63b639..457a28b0f5 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -3426,20 +3426,23 @@ message TRspGetQueryTrackerInfo message TReqStartDistributedWriteSession { required string path = 1; + required int32 cookie_count = 2; optional TTransactionalOptions transactional_options = 100; } message TRspStartDistributedWriteSession { - required bytes session = 1; // YSON-serialized TDistributedWriteSession + required bytes signed_session = 1; // YSON-serialized TSignedDistributedWriteSessionPtr + repeated bytes signed_cookies = 2; // vector of YSON-serialized TSignedDistributedWriteCookiePtr } message TReqFinishDistributedWriteSession { - required bytes session = 1; // YSON-serialized TDistributedWriteSession + required bytes signed_session = 1; // YSON-serialized TSignedDistributedWriteSessionPtr + repeated bytes signed_write_results = 2; // vector of YSON-serialized TSignedWriteFragmentResultPtr - required int32 max_children_per_attach_request = 2; + required int32 max_children_per_attach_request = 3; } message TRspFinishDistributedWriteSession @@ -3452,12 +3455,12 @@ message TReqWriteTableFragment optional bytes format = 2; // YSON-serialized TFormat - required bytes cookie = 3; // YSON-serialized TFragmentWriteCookie + required bytes signed_cookie = 3; // YSON-serialized TSignedWriteFragmentCookiePtr } message TRspWriteTableFragment { - required bytes cookie = 1; // YSON-serialized TFragmentWriteCookie + required bytes signed_write_result = 1; // YSON-serialized TSignedWriteFragmentResultPtr } /////////////////////////////////////////////////////////////////////////////// |