diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-06-10 19:17:55 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-06-10 19:29:50 +0300 |
commit | c5fcafe98bbdcbd28171b5c90e42ac521292e032 (patch) | |
tree | 4389213159afbb5f493fb53616c615711485ba20 | |
parent | e2add312a22e219fb94e385160a305343d7b71b4 (diff) | |
download | ydb-c5fcafe98bbdcbd28171b5c90e42ac521292e032.tar.gz |
[queues] YT-21356: Add PushQueueProducer
905fe6c7f646970b7d498748dc01ceefb0fe6704
24 files changed, 365 insertions, 12 deletions
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp index 1e3c6a4ae7..c80fa09146 100644 --- a/yt/yt/client/api/delegating_transaction.cpp +++ b/yt/yt/client/api/delegating_transaction.cpp @@ -299,6 +299,16 @@ DELEGATE_METHOD(TFuture<void>, AdvanceConsumer, ( const NYT::NApi::TAdvanceConsumerOptions& options), (consumer, queue, partitionIndex, oldOffset, newOffset, options)) +DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + i64 epoch, + NTableClient::TNameTablePtr nameTable, + TSharedRange<NTableClient::TUnversionedRow> rows, + const TPushQueueProducerOptions& options), + (producerPath, queuePath, sessionId, epoch, nameTable, rows, options)) + #undef DELEGATE_METHOD //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h index 030b0b0c7e..6377bab05c 100644 --- a/yt/yt/client/api/delegating_transaction.h +++ b/yt/yt/client/api/delegating_transaction.h @@ -243,6 +243,15 @@ public: i64 newOffset, const NYT::NApi::TAdvanceConsumerOptions& options) override; + TFuture<TPushQueueProducerResult> PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + i64 epoch, + NTableClient::TNameTablePtr nameTable, + TSharedRange<NTableClient::TUnversionedRow> rows, + const TPushQueueProducerOptions& options) override; + protected: const ITransactionPtr Underlying_; }; diff --git a/yt/yt/client/api/dynamic_table_transaction.h b/yt/yt/client/api/dynamic_table_transaction.h index 2164cdeacd..b22e9bcd8f 100644 --- a/yt/yt/client/api/dynamic_table_transaction.h +++ b/yt/yt/client/api/dynamic_table_transaction.h @@ -63,6 +63,9 @@ struct TModifyRowsOptions //! If set treat missing key columns as null. bool AllowMissingKeyColumns = false; + + //! If set then WriteViaQueueProducer table schema will be used instead of Write table schema. + bool WriteViaQueueProducer = false; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h index 8425ecd816..2c9e970fa6 100644 --- a/yt/yt/client/api/queue_client.h +++ b/yt/yt/client/api/queue_client.h @@ -79,8 +79,8 @@ struct TCreateQueueProducerSessionOptions struct TCreateQueueProducerSessionResult { - ui64 SequenceNumber; - ui64 Epoch; + i64 SequenceNumber; + i64 Epoch; std::optional<NYson::TYsonString> UserMeta; }; @@ -145,7 +145,7 @@ struct IQueueClient const NYPath::TRichYPath& producerPath, const NYPath::TRichYPath& queuePath, const TString& sessionId, - const std::optional<NYson::TYsonString>& userMeta, + const std::optional<NYson::TYsonString>& userMeta = {}, const TCreateQueueProducerSessionOptions& options = {}) = 0; virtual TFuture<void> RemoveQueueProducerSession( diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h index 50ef073360..cfaa07c49d 100644 --- a/yt/yt/client/api/queue_transaction.h +++ b/yt/yt/client/api/queue_transaction.h @@ -12,6 +12,38 @@ struct TAdvanceConsumerOptions : public TTimeoutOptions { }; +struct TPushQueueProducerOptions + : public TTimeoutOptions +{ + //! Sequence number of the first row in the batch. + /*! + * Rows in the batch will have such sequence numbers: SequenceNumber, SequenceNumber+1, SequenceNumber+2, ... + * If the option is not set the $sequence_number column must be present in each row. + */ + std::optional<i64> SequenceNumber; + + //! Any yson data which will be saved for the session + /*! + * It can be retrieved later and used, for example, to understand + * what data should be written after fail of the user process. + */ + NYTree::INodePtr UserMeta; +}; + +struct TPushQueueProducerResult +{ + //! Sequence number of the last row in the written batch. + /*! + * All rows with greater sequence number will be ignored in future calls of PushQueueProducer. + */ + i64 LastSequenceNumber = -1; + //! Count of rows which were skipped because of their sequence number. + /*! + * Skipped rows were written before. + */ + i64 SkippedRowCount = 0; +}; + //////////////////////////////////////////////////////////////////////////////// struct IQueueTransaction @@ -45,6 +77,21 @@ struct IQueueTransaction std::optional<i64> oldOffset, i64 newOffset, const TAdvanceConsumerOptions& options) = 0; + + //! Write rows in the queue with checking their sequence number. + /*! + * If row sequence number is less than sequence number saved in producer table, then this row will not be written. + * #sessionId - an identificator of write session, for example, `<host>-<filename>`. + * #epoch - a number of producer epoch. All calls with an epoch less than the current epoch will fail. + */ + virtual TFuture<TPushQueueProducerResult> PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + i64 epoch, + NTableClient::TNameTablePtr nameTable, + TSharedRange<NTableClient::TUnversionedRow> rows, + const TPushQueueProducerOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index e73a63a437..dfe5550379 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -94,6 +94,7 @@ public: DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListQueueConsumerRegistrations); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CreateQueueProducerSession); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RemoveQueueProducerSession); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PushQueueProducer); // Scheduler pools DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, TransferPoolResources); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index a1d8f1f563..16725b33b0 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -900,8 +900,8 @@ TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession( } return TCreateQueueProducerSessionResult{ - .SequenceNumber = FromProto<ui64>(rsp->sequence_number()), - .Epoch = FromProto<ui64>(rsp->epoch()), + .SequenceNumber = FromProto<i64>(rsp->sequence_number()), + .Epoch = FromProto<i64>(rsp->epoch()), .UserMeta = std::move(userMeta), }; })); diff --git a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp index f957aa7471..c7440cc29f 100644 --- a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp +++ b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp @@ -57,6 +57,7 @@ private: auto primarySchema = NYT::FromProto<NTableClient::TTableSchemaPtr>(rsp->schema()); tableInfo->Schemas[ETableSchemaKind::Primary] = primarySchema; tableInfo->Schemas[ETableSchemaKind::Write] = primarySchema->ToWrite(); + tableInfo->Schemas[ETableSchemaKind::WriteViaQueueProducer] = primarySchema->ToWriteViaQueueProducer(); tableInfo->Schemas[ETableSchemaKind::VersionedWrite] = primarySchema->ToVersionedWrite(); tableInfo->Schemas[ETableSchemaKind::Delete] = primarySchema->ToDelete(); tableInfo->Schemas[ETableSchemaKind::Query] = primarySchema->ToQuery(); diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index d451b755b8..7dfb4d20bc 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -22,6 +22,7 @@ using namespace NCypressClient; using namespace NApi; using namespace NYTree; using namespace NYPath; +using namespace NYson; //////////////////////////////////////////////////////////////////////////////// @@ -525,6 +526,60 @@ TFuture<void> TTransaction::AdvanceConsumer( return req->Invoke().As<void>(); } +TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + i64 epoch, + NTableClient::TNameTablePtr nameTable, + TSharedRange<NTableClient::TUnversionedRow> rows, + const TPushQueueProducerOptions& options) +{ + ValidateTabletTransactionId(GetId()); + + THROW_ERROR_EXCEPTION_IF(epoch < 0, + "Epoch number %v cannot be negative", epoch); + THROW_ERROR_EXCEPTION_IF(options.SequenceNumber && *options.SequenceNumber < 0, + "Sequence number %v cannot be negative", *options.SequenceNumber); + + auto req = Proxy_.PushQueueProducer(); + SetTimeoutOptions(*req, options); + if (options.SequenceNumber) { + req->set_sequence_number(*options.SequenceNumber); + } + + if (NTracing::IsCurrentTraceContextRecorded()) { + req->TracingTags().emplace_back("yt.producer_path", ToString(producerPath)); + req->TracingTags().emplace_back("yt.queue_path", ToString(queuePath)); + req->TracingTags().emplace_back("yt.session_id", ToString(sessionId)); + req->TracingTags().emplace_back("yt.epoch", ToString(epoch)); + } + + ToProto(req->mutable_transaction_id(), GetId()); + + ToProto(req->mutable_producer_path(), producerPath); + ToProto(req->mutable_queue_path(), queuePath); + + ToProto(req->mutable_session_id(), sessionId); + req->set_epoch(epoch); + + if (options.UserMeta) { + ToProto(req->mutable_user_meta(), ConvertToYsonString(options.UserMeta).ToString()); + } + + req->Attachments() = SerializeRowset( + nameTable, + MakeRange(rows), + req->mutable_rowset_descriptor()); + + return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPushQueueProducerPtr& rsp) { + return TPushQueueProducerResult{ + .LastSequenceNumber = rsp->last_sequence_number(), + .SkippedRowCount = rsp->skipped_row_count(), + }; + })); +} + TFuture<ITransactionPtr> TTransaction::StartTransaction( ETransactionType type, const TTransactionStartOptions& options) diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index 57f7850b3e..34b93ba90d 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -86,6 +86,15 @@ public: i64 newOffset, const TAdvanceConsumerOptions& options) override; + TFuture<TPushQueueProducerResult> PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + i64 epoch, + NTableClient::TNameTablePtr nameTable, + TSharedRange<NTableClient::TUnversionedRow> rows, + const TPushQueueProducerOptions& options) override; + // IClientBase implementation. TFuture<NApi::ITransactionPtr> StartTransaction( NTransactionClient::ETransactionType type, diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index f099122416..9dc3c56237 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -354,6 +354,7 @@ public: REGISTER (TAdvanceConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4); REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4); REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4); + REGISTER (TPushQueueProducerCommand, "push_queue_producer", Null, Structured, true, false, ApiVersion4); REGISTER (TStartQueryCommand, "start_query", Null, Structured, true, false, ApiVersion4); REGISTER (TAbortQueryCommand, "abort_query", Null, Structured, true, false, ApiVersion4); diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index 0fbe979af3..d1e4bb3d81 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -1,15 +1,40 @@ #include "queue_commands.h" + #include "config.h" +#include "helpers.h" #include <yt/yt/client/api/config.h> +#include <yt/yt/client/formats/parser.h> + +#include <yt/yt/client/table_client/adapters.h> +#include <yt/yt/client/table_client/public.h> +#include <yt/yt/client/table_client/row_buffer.h> +#include <yt/yt/client/table_client/table_consumer.h> +#include <yt/yt/client/table_client/table_output.h> + +#include <yt/yt/client/tablet_client/table_mount_cache.h> + #include <yt/yt/library/formats/format.h> namespace NYT::NDriver { -using namespace NConcurrency; using namespace NApi; +using namespace NConcurrency; +using namespace NTableClient; +using namespace NTabletClient; using namespace NYTree; +using namespace NYson; + +//////////////////////////////////////////////////////////////////////////////// + +static NLogging::TLogger WithCommandTag( + const NLogging::TLogger& logger, + const ICommandContextPtr& context) +{ + return logger.WithTag("Command: %v", + context->Request().CommandName); +} //////////////////////////////////////////////////////////////////////////////// @@ -311,4 +336,93 @@ void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// +void TPushQueueProducerCommand::Register(TRegistrar registrar) +{ + registrar.ParameterWithUniversalAccessor<std::optional<i64>>( + "sequence_number", + [] (TThis* command) -> auto& { + return command->Options.SequenceNumber; + }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<INodePtr>( + "user_meta", + [] (TThis* command) -> auto& { + return command->Options.UserMeta; + }) + .Optional(/*init*/ false); + + registrar.Parameter("producer_path", &TThis::ProducerPath); + registrar.Parameter("queue_path", &TThis::QueuePath); + registrar.Parameter("session_id", &TThis::SessionId); + registrar.Parameter("epoch", &TThis::Epoch); + +} + +void TPushQueueProducerCommand::DoExecute(ICommandContextPtr context) +{ + auto tableMountCache = context->GetClient()->GetTableMountCache(); + + auto queueTableInfoFuture = tableMountCache->GetTableInfo(QueuePath.GetPath()); + auto producerTableInfoFuture = tableMountCache->GetTableInfo(ProducerPath.GetPath()); + + auto queueTableInfo = WaitFor(queueTableInfoFuture).ValueOrThrow("Path %v is not valid queue", QueuePath); + queueTableInfo->ValidateOrdered(); + + auto producerTableInfo = WaitFor(producerTableInfoFuture).ValueOrThrow("Path %v is not valid producer", ProducerPath); + producerTableInfo->ValidateSorted(); + + struct TPushQueueProducerBufferTag + { }; + + auto insertRowsFormatConfig = ConvertTo<TInsertRowsFormatConfigPtr>(context->GetInputFormat().Attributes()); + auto typeConversionConfig = ConvertTo<TTypeConversionConfigPtr>(context->GetInputFormat().Attributes()); + // Parse input data. + TBuildingValueConsumer valueConsumer( + queueTableInfo->Schemas[ETableSchemaKind::WriteViaQueueProducer], + WithCommandTag(Logger, context), + insertRowsFormatConfig->EnableNullToYsonEntityConversion, + typeConversionConfig); + valueConsumer.SetTreatMissingAsNull(true); + + TTableOutput output(CreateParserForFormat( + context->GetInputFormat(), + &valueConsumer)); + + PipeInputToOutput(context->Request().InputStream, &output); + auto rows = valueConsumer.GetRows(); + auto rowBuffer = New<TRowBuffer>(TPushQueueProducerBufferTag()); + auto capturedRows = rowBuffer->CaptureRows(rows); + auto rowRange = MakeSharedRange( + std::vector<TUnversionedRow>(capturedRows.begin(), capturedRows.end()), + std::move(rowBuffer)); + + auto transaction = GetTransaction(context); + + auto result = WaitFor(transaction->PushQueueProducer( + ProducerPath, + QueuePath, + SessionId, + Epoch, + valueConsumer.GetNameTable(), + std::move(rowRange), + Options)) + .ValueOrThrow(); + + if (ShouldCommitTransaction()) { + WaitFor(transaction->Commit()) + .ThrowOnError(); + } + + ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) { + BuildYsonFluently(consumer) + .BeginMap() + .Item("last_sequence_number").Value(result.LastSequenceNumber) + .Item("skipped_row_count").Value(result.SkippedRowCount) + .EndMap(); + }); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NDriver diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h index fc5bec8cb7..85ea7637af 100644 --- a/yt/yt/client/driver/queue_commands.h +++ b/yt/yt/client/driver/queue_commands.h @@ -162,4 +162,28 @@ private: //////////////////////////////////////////////////////////////////////////////// +struct TPushQueueProducerOptions + : public NApi::TPushQueueProducerOptions + , public TTabletWriteOptions +{ }; + +class TPushQueueProducerCommand + : public TTypedCommand<TPushQueueProducerOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TPushQueueProducerCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TRichYPath ProducerPath; + NYPath::TRichYPath QueuePath; + TString SessionId; + i64 Epoch; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NDriver diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index 362a814c95..3c9d42cfe2 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -7,6 +7,9 @@ #include <yt/yt/client/chaos_client/replication_card_serialization.h> +#include <yt/yt/client/formats/config.h> +#include <yt/yt/client/formats/parser.h> + #include <yt/yt/client/table_client/adapters.h> #include <yt/yt/client/table_client/blob_reader.h> #include <yt/yt/client/table_client/columnar_statistics.h> @@ -19,9 +22,6 @@ #include <yt/yt/client/tablet_client/table_mount_cache.h> -#include <yt/yt/client/formats/config.h> -#include <yt/yt/client/formats/parser.h> - #include <yt/yt/client/ypath/public.h> #include <yt/yt/core/concurrency/periodic_executor.h> diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 657cb4afb0..c4c71e882e 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -90,6 +90,15 @@ public: i64 newOffset, const TAdvanceConsumerOptions& options) override; + TFuture<TPushQueueProducerResult> PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + i64 epoch, + NTableClient::TNameTablePtr nameTable, + TSharedRange<NTableClient::TUnversionedRow> rows, + const TPushQueueProducerOptions& options) override; + TFuture<TTransactionFlushResult> Flush() override; TFuture<void> Ping(const NApi::TTransactionPingOptions& options = {}) override; @@ -507,6 +516,7 @@ TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); TRANSACTION_METHOD_IMPL(void, AdvanceConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceConsumerOptions&)); +TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, i64, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index 3027d32f8d..cf06d4e150 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -6,6 +6,7 @@ #include "rpc.h" #include <yt/yt/client/api/client.h> +#include <yt/yt/client/api/queue_transaction.h> #include <yt/yt/client/misc/method_helpers.h> diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h index eb5cf3f289..72ea49cd2b 100644 --- a/yt/yt/client/queue_client/public.h +++ b/yt/yt/client/queue_client/public.h @@ -11,6 +11,9 @@ namespace NYT::NQueueClient { YT_DEFINE_ERROR_ENUM( ((ConsumerOffsetConflict) (3100)) + ((InvalidEpoch) (3101)) + ((ZombieEpoch) (3102)) + ((InvalidRowSequenceNumbers) (3103)) ); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/table_client/public.cpp b/yt/yt/client/table_client/public.cpp index d0bba15185..0608491cc2 100644 --- a/yt/yt/client/table_client/public.cpp +++ b/yt/yt/client/table_client/public.cpp @@ -15,6 +15,7 @@ const TString TimestampColumnName = SystemColumnNamePrefix + "timestamp"; const TString TtlColumnName = SystemColumnNamePrefix + "ttl"; const TString CumulativeDataWeightColumnName = SystemColumnNamePrefix + "cumulative_data_weight"; const TString EmptyValueColumnName = SystemColumnNamePrefix + "empty"; +const TString SequenceNumberColumnName = SystemColumnNamePrefix + "sequence_number"; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h index 392cc45a18..2f03d1c8b7 100644 --- a/yt/yt/client/table_client/public.h +++ b/yt/yt/client/table_client/public.h @@ -126,6 +126,7 @@ extern const TString TtlColumnName; extern const TString CumulativeDataWeightColumnName; extern const TString EmptyValueColumnName; extern const TString PrimaryLockName; +extern const TString SequenceNumberColumnName; constexpr int TypicalHunkColumnCount = 8; diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index b2c9093b9b..c03818a999 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -962,6 +962,31 @@ TTableSchemaPtr TTableSchema::ToQuery() const } } +TTableSchemaPtr TTableSchema::ToWriteViaQueueProducer() const +{ + std::vector<TColumnSchema> columns; + if (IsSorted()) { + for (const auto& column : Columns()) { + if (!column.Expression()) { + columns.push_back(column); + } + } + } else { + columns.push_back(TColumnSchema(TabletIndexColumnName, ESimpleLogicalValueType::Int64) + .SetSortOrder(ESortOrder::Ascending)); + columns.push_back(TColumnSchema(SequenceNumberColumnName, ESimpleLogicalValueType::Int64)); + for (const auto& column : Columns()) { + if (column.StableName().Underlying() != TimestampColumnName && + column.StableName().Underlying() != CumulativeDataWeightColumnName) + { + columns.push_back(column); + } + } + } + return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_, + ETableSchemaModification::None, DeletedColumns()); +} + TTableSchemaPtr TTableSchema::ToWrite() const { std::vector<TColumnSchema> columns; diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h index d5bb5a37aa..7813c7b888 100644 --- a/yt/yt/client/table_client/schema.h +++ b/yt/yt/client/table_client/schema.h @@ -327,7 +327,11 @@ public: //! but without |$timestamp| column, if any. TTableSchemaPtr ToWrite() const; - //! For sorted tables, return the current schema + //! For sorted tables, return the current schema. + //! For ordered tables, prepends the current schema with |(tablet_index, sequence_number)| key column. + TTableSchemaPtr ToWriteViaQueueProducer() const; + + //! For sorted tables, return the current schema. //! For ordered tables, prepends the current schema with |(tablet_index)| key column. TTableSchemaPtr WithTabletIndex() const; diff --git a/yt/yt/client/tablet_client/table_mount_cache.h b/yt/yt/client/tablet_client/table_mount_cache.h index 0490d9ea02..8dc6e6ceee 100644 --- a/yt/yt/client/tablet_client/table_mount_cache.h +++ b/yt/yt/client/tablet_client/table_mount_cache.h @@ -92,6 +92,8 @@ DEFINE_ENUM(ETableSchemaKind, (PrimaryWithTabletIndex) // Schema used for replication log rows. (ReplicationLog) + // Schema used for inserting rows into ordered tables via queue producer. + (WriteViaQueueProducer) ); struct TTableMountInfo diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index 6b3ba70b77..60ad83bef3 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -213,6 +213,15 @@ public: NTableClient::TNameTablePtr nameTable, TSharedRange<TRowModification> modifications, const TModifyRowsOptions& options), (override)); + + MOCK_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TString& sessionId, + i64 epoch, + NTableClient::TNameTablePtr nameTable, + TSharedRange<NTableClient::TUnversionedRow> rows, + const TPushQueueProducerOptions& options), (override)); }; DEFINE_REFCOUNTED_TYPE(TMockTransaction) diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 5401e33444..5447738bb2 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -658,6 +658,29 @@ message TRspAdvanceConsumer { } +//////////////////////////////////////////////////////////////////////////////// + +message TReqPushQueueProducer +{ + optional NYT.NProto.TGuid transaction_id = 1; + optional string producer_path = 2; + optional string queue_path = 3; + optional string session_id = 4; + optional int64 epoch = 5; + optional bytes user_meta = 6; // YSON + optional int64 sequence_number = 7; + + required TRowsetDescriptor rowset_descriptor = 200; +} + +message TRspPushQueueProducer +{ + optional int64 last_sequence_number = 1; + optional int64 skipped_row_count = 2; +} + +//////////////////////////////////////////////////////////////////////////////// + message TRowBatchReadOptions { optional int64 max_row_count = 1; @@ -770,8 +793,8 @@ message TReqCreateQueueProducerSession message TRspCreateQueueProducerSession { - optional uint64 sequence_number = 1; - optional uint64 epoch = 2; + optional int64 sequence_number = 1; + optional int64 epoch = 2; optional bytes user_meta = 3; // YSON } |