aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-06-10 19:17:55 +0300
committernadya73 <nadya73@yandex-team.com>2024-06-10 19:29:50 +0300
commitc5fcafe98bbdcbd28171b5c90e42ac521292e032 (patch)
tree4389213159afbb5f493fb53616c615711485ba20
parente2add312a22e219fb94e385160a305343d7b71b4 (diff)
downloadydb-c5fcafe98bbdcbd28171b5c90e42ac521292e032.tar.gz
[queues] YT-21356: Add PushQueueProducer
905fe6c7f646970b7d498748dc01ceefb0fe6704
-rw-r--r--yt/yt/client/api/delegating_transaction.cpp10
-rw-r--r--yt/yt/client/api/delegating_transaction.h9
-rw-r--r--yt/yt/client/api/dynamic_table_transaction.h3
-rw-r--r--yt/yt/client/api/queue_client.h6
-rw-r--r--yt/yt/client/api/queue_transaction.h47
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h1
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp4
-rw-r--r--yt/yt/client/api/rpc_proxy/table_mount_cache.cpp1
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp55
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h9
-rw-r--r--yt/yt/client/driver/driver.cpp1
-rw-r--r--yt/yt/client/driver/queue_commands.cpp116
-rw-r--r--yt/yt/client/driver/queue_commands.h24
-rw-r--r--yt/yt/client/driver/table_commands.cpp6
-rw-r--r--yt/yt/client/federated/client.cpp10
-rw-r--r--yt/yt/client/hedging/hedging.cpp1
-rw-r--r--yt/yt/client/queue_client/public.h3
-rw-r--r--yt/yt/client/table_client/public.cpp1
-rw-r--r--yt/yt/client/table_client/public.h1
-rw-r--r--yt/yt/client/table_client/schema.cpp25
-rw-r--r--yt/yt/client/table_client/schema.h6
-rw-r--r--yt/yt/client/tablet_client/table_mount_cache.h2
-rw-r--r--yt/yt/client/unittests/mock/transaction.h9
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto27
24 files changed, 365 insertions, 12 deletions
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp
index 1e3c6a4ae7..c80fa09146 100644
--- a/yt/yt/client/api/delegating_transaction.cpp
+++ b/yt/yt/client/api/delegating_transaction.cpp
@@ -299,6 +299,16 @@ DELEGATE_METHOD(TFuture<void>, AdvanceConsumer, (
const NYT::NApi::TAdvanceConsumerOptions& options),
(consumer, queue, partitionIndex, oldOffset, newOffset, options))
+DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ i64 epoch,
+ NTableClient::TNameTablePtr nameTable,
+ TSharedRange<NTableClient::TUnversionedRow> rows,
+ const TPushQueueProducerOptions& options),
+ (producerPath, queuePath, sessionId, epoch, nameTable, rows, options))
+
#undef DELEGATE_METHOD
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h
index 030b0b0c7e..6377bab05c 100644
--- a/yt/yt/client/api/delegating_transaction.h
+++ b/yt/yt/client/api/delegating_transaction.h
@@ -243,6 +243,15 @@ public:
i64 newOffset,
const NYT::NApi::TAdvanceConsumerOptions& options) override;
+ TFuture<TPushQueueProducerResult> PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ i64 epoch,
+ NTableClient::TNameTablePtr nameTable,
+ TSharedRange<NTableClient::TUnversionedRow> rows,
+ const TPushQueueProducerOptions& options) override;
+
protected:
const ITransactionPtr Underlying_;
};
diff --git a/yt/yt/client/api/dynamic_table_transaction.h b/yt/yt/client/api/dynamic_table_transaction.h
index 2164cdeacd..b22e9bcd8f 100644
--- a/yt/yt/client/api/dynamic_table_transaction.h
+++ b/yt/yt/client/api/dynamic_table_transaction.h
@@ -63,6 +63,9 @@ struct TModifyRowsOptions
//! If set treat missing key columns as null.
bool AllowMissingKeyColumns = false;
+
+ //! If set then WriteViaQueueProducer table schema will be used instead of Write table schema.
+ bool WriteViaQueueProducer = false;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h
index 8425ecd816..2c9e970fa6 100644
--- a/yt/yt/client/api/queue_client.h
+++ b/yt/yt/client/api/queue_client.h
@@ -79,8 +79,8 @@ struct TCreateQueueProducerSessionOptions
struct TCreateQueueProducerSessionResult
{
- ui64 SequenceNumber;
- ui64 Epoch;
+ i64 SequenceNumber;
+ i64 Epoch;
std::optional<NYson::TYsonString> UserMeta;
};
@@ -145,7 +145,7 @@ struct IQueueClient
const NYPath::TRichYPath& producerPath,
const NYPath::TRichYPath& queuePath,
const TString& sessionId,
- const std::optional<NYson::TYsonString>& userMeta,
+ const std::optional<NYson::TYsonString>& userMeta = {},
const TCreateQueueProducerSessionOptions& options = {}) = 0;
virtual TFuture<void> RemoveQueueProducerSession(
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h
index 50ef073360..cfaa07c49d 100644
--- a/yt/yt/client/api/queue_transaction.h
+++ b/yt/yt/client/api/queue_transaction.h
@@ -12,6 +12,38 @@ struct TAdvanceConsumerOptions
: public TTimeoutOptions
{ };
+struct TPushQueueProducerOptions
+ : public TTimeoutOptions
+{
+ //! Sequence number of the first row in the batch.
+ /*!
+ * Rows in the batch will have such sequence numbers: SequenceNumber, SequenceNumber+1, SequenceNumber+2, ...
+ * If the option is not set the $sequence_number column must be present in each row.
+ */
+ std::optional<i64> SequenceNumber;
+
+ //! Any yson data which will be saved for the session
+ /*!
+ * It can be retrieved later and used, for example, to understand
+ * what data should be written after fail of the user process.
+ */
+ NYTree::INodePtr UserMeta;
+};
+
+struct TPushQueueProducerResult
+{
+ //! Sequence number of the last row in the written batch.
+ /*!
+ * All rows with greater sequence number will be ignored in future calls of PushQueueProducer.
+ */
+ i64 LastSequenceNumber = -1;
+ //! Count of rows which were skipped because of their sequence number.
+ /*!
+ * Skipped rows were written before.
+ */
+ i64 SkippedRowCount = 0;
+};
+
////////////////////////////////////////////////////////////////////////////////
struct IQueueTransaction
@@ -45,6 +77,21 @@ struct IQueueTransaction
std::optional<i64> oldOffset,
i64 newOffset,
const TAdvanceConsumerOptions& options) = 0;
+
+ //! Write rows in the queue with checking their sequence number.
+ /*!
+ * If row sequence number is less than sequence number saved in producer table, then this row will not be written.
+ * #sessionId - an identificator of write session, for example, `<host>-<filename>`.
+ * #epoch - a number of producer epoch. All calls with an epoch less than the current epoch will fail.
+ */
+ virtual TFuture<TPushQueueProducerResult> PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ i64 epoch,
+ NTableClient::TNameTablePtr nameTable,
+ TSharedRange<NTableClient::TUnversionedRow> rows,
+ const TPushQueueProducerOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index e73a63a437..dfe5550379 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -94,6 +94,7 @@ public:
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListQueueConsumerRegistrations);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CreateQueueProducerSession);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RemoveQueueProducerSession);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PushQueueProducer);
// Scheduler pools
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, TransferPoolResources);
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index a1d8f1f563..16725b33b0 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -900,8 +900,8 @@ TFuture<TCreateQueueProducerSessionResult> TClient::CreateQueueProducerSession(
}
return TCreateQueueProducerSessionResult{
- .SequenceNumber = FromProto<ui64>(rsp->sequence_number()),
- .Epoch = FromProto<ui64>(rsp->epoch()),
+ .SequenceNumber = FromProto<i64>(rsp->sequence_number()),
+ .Epoch = FromProto<i64>(rsp->epoch()),
.UserMeta = std::move(userMeta),
};
}));
diff --git a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
index f957aa7471..c7440cc29f 100644
--- a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
@@ -57,6 +57,7 @@ private:
auto primarySchema = NYT::FromProto<NTableClient::TTableSchemaPtr>(rsp->schema());
tableInfo->Schemas[ETableSchemaKind::Primary] = primarySchema;
tableInfo->Schemas[ETableSchemaKind::Write] = primarySchema->ToWrite();
+ tableInfo->Schemas[ETableSchemaKind::WriteViaQueueProducer] = primarySchema->ToWriteViaQueueProducer();
tableInfo->Schemas[ETableSchemaKind::VersionedWrite] = primarySchema->ToVersionedWrite();
tableInfo->Schemas[ETableSchemaKind::Delete] = primarySchema->ToDelete();
tableInfo->Schemas[ETableSchemaKind::Query] = primarySchema->ToQuery();
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index d451b755b8..7dfb4d20bc 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -22,6 +22,7 @@ using namespace NCypressClient;
using namespace NApi;
using namespace NYTree;
using namespace NYPath;
+using namespace NYson;
////////////////////////////////////////////////////////////////////////////////
@@ -525,6 +526,60 @@ TFuture<void> TTransaction::AdvanceConsumer(
return req->Invoke().As<void>();
}
+TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ i64 epoch,
+ NTableClient::TNameTablePtr nameTable,
+ TSharedRange<NTableClient::TUnversionedRow> rows,
+ const TPushQueueProducerOptions& options)
+{
+ ValidateTabletTransactionId(GetId());
+
+ THROW_ERROR_EXCEPTION_IF(epoch < 0,
+ "Epoch number %v cannot be negative", epoch);
+ THROW_ERROR_EXCEPTION_IF(options.SequenceNumber && *options.SequenceNumber < 0,
+ "Sequence number %v cannot be negative", *options.SequenceNumber);
+
+ auto req = Proxy_.PushQueueProducer();
+ SetTimeoutOptions(*req, options);
+ if (options.SequenceNumber) {
+ req->set_sequence_number(*options.SequenceNumber);
+ }
+
+ if (NTracing::IsCurrentTraceContextRecorded()) {
+ req->TracingTags().emplace_back("yt.producer_path", ToString(producerPath));
+ req->TracingTags().emplace_back("yt.queue_path", ToString(queuePath));
+ req->TracingTags().emplace_back("yt.session_id", ToString(sessionId));
+ req->TracingTags().emplace_back("yt.epoch", ToString(epoch));
+ }
+
+ ToProto(req->mutable_transaction_id(), GetId());
+
+ ToProto(req->mutable_producer_path(), producerPath);
+ ToProto(req->mutable_queue_path(), queuePath);
+
+ ToProto(req->mutable_session_id(), sessionId);
+ req->set_epoch(epoch);
+
+ if (options.UserMeta) {
+ ToProto(req->mutable_user_meta(), ConvertToYsonString(options.UserMeta).ToString());
+ }
+
+ req->Attachments() = SerializeRowset(
+ nameTable,
+ MakeRange(rows),
+ req->mutable_rowset_descriptor());
+
+ return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPushQueueProducerPtr& rsp) {
+ return TPushQueueProducerResult{
+ .LastSequenceNumber = rsp->last_sequence_number(),
+ .SkippedRowCount = rsp->skipped_row_count(),
+ };
+ }));
+}
+
TFuture<ITransactionPtr> TTransaction::StartTransaction(
ETransactionType type,
const TTransactionStartOptions& options)
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index 57f7850b3e..34b93ba90d 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -86,6 +86,15 @@ public:
i64 newOffset,
const TAdvanceConsumerOptions& options) override;
+ TFuture<TPushQueueProducerResult> PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ i64 epoch,
+ NTableClient::TNameTablePtr nameTable,
+ TSharedRange<NTableClient::TUnversionedRow> rows,
+ const TPushQueueProducerOptions& options) override;
+
// IClientBase implementation.
TFuture<NApi::ITransactionPtr> StartTransaction(
NTransactionClient::ETransactionType type,
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index f099122416..9dc3c56237 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -354,6 +354,7 @@ public:
REGISTER (TAdvanceConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4);
REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4);
REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TPushQueueProducerCommand, "push_queue_producer", Null, Structured, true, false, ApiVersion4);
REGISTER (TStartQueryCommand, "start_query", Null, Structured, true, false, ApiVersion4);
REGISTER (TAbortQueryCommand, "abort_query", Null, Structured, true, false, ApiVersion4);
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index 0fbe979af3..d1e4bb3d81 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -1,15 +1,40 @@
#include "queue_commands.h"
+
#include "config.h"
+#include "helpers.h"
#include <yt/yt/client/api/config.h>
+#include <yt/yt/client/formats/parser.h>
+
+#include <yt/yt/client/table_client/adapters.h>
+#include <yt/yt/client/table_client/public.h>
+#include <yt/yt/client/table_client/row_buffer.h>
+#include <yt/yt/client/table_client/table_consumer.h>
+#include <yt/yt/client/table_client/table_output.h>
+
+#include <yt/yt/client/tablet_client/table_mount_cache.h>
+
#include <yt/yt/library/formats/format.h>
namespace NYT::NDriver {
-using namespace NConcurrency;
using namespace NApi;
+using namespace NConcurrency;
+using namespace NTableClient;
+using namespace NTabletClient;
using namespace NYTree;
+using namespace NYson;
+
+////////////////////////////////////////////////////////////////////////////////
+
+static NLogging::TLogger WithCommandTag(
+ const NLogging::TLogger& logger,
+ const ICommandContextPtr& context)
+{
+ return logger.WithTag("Command: %v",
+ context->Request().CommandName);
+}
////////////////////////////////////////////////////////////////////////////////
@@ -311,4 +336,93 @@ void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
+void TPushQueueProducerCommand::Register(TRegistrar registrar)
+{
+ registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
+ "sequence_number",
+ [] (TThis* command) -> auto& {
+ return command->Options.SequenceNumber;
+ })
+ .Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<INodePtr>(
+ "user_meta",
+ [] (TThis* command) -> auto& {
+ return command->Options.UserMeta;
+ })
+ .Optional(/*init*/ false);
+
+ registrar.Parameter("producer_path", &TThis::ProducerPath);
+ registrar.Parameter("queue_path", &TThis::QueuePath);
+ registrar.Parameter("session_id", &TThis::SessionId);
+ registrar.Parameter("epoch", &TThis::Epoch);
+
+}
+
+void TPushQueueProducerCommand::DoExecute(ICommandContextPtr context)
+{
+ auto tableMountCache = context->GetClient()->GetTableMountCache();
+
+ auto queueTableInfoFuture = tableMountCache->GetTableInfo(QueuePath.GetPath());
+ auto producerTableInfoFuture = tableMountCache->GetTableInfo(ProducerPath.GetPath());
+
+ auto queueTableInfo = WaitFor(queueTableInfoFuture).ValueOrThrow("Path %v is not valid queue", QueuePath);
+ queueTableInfo->ValidateOrdered();
+
+ auto producerTableInfo = WaitFor(producerTableInfoFuture).ValueOrThrow("Path %v is not valid producer", ProducerPath);
+ producerTableInfo->ValidateSorted();
+
+ struct TPushQueueProducerBufferTag
+ { };
+
+ auto insertRowsFormatConfig = ConvertTo<TInsertRowsFormatConfigPtr>(context->GetInputFormat().Attributes());
+ auto typeConversionConfig = ConvertTo<TTypeConversionConfigPtr>(context->GetInputFormat().Attributes());
+ // Parse input data.
+ TBuildingValueConsumer valueConsumer(
+ queueTableInfo->Schemas[ETableSchemaKind::WriteViaQueueProducer],
+ WithCommandTag(Logger, context),
+ insertRowsFormatConfig->EnableNullToYsonEntityConversion,
+ typeConversionConfig);
+ valueConsumer.SetTreatMissingAsNull(true);
+
+ TTableOutput output(CreateParserForFormat(
+ context->GetInputFormat(),
+ &valueConsumer));
+
+ PipeInputToOutput(context->Request().InputStream, &output);
+ auto rows = valueConsumer.GetRows();
+ auto rowBuffer = New<TRowBuffer>(TPushQueueProducerBufferTag());
+ auto capturedRows = rowBuffer->CaptureRows(rows);
+ auto rowRange = MakeSharedRange(
+ std::vector<TUnversionedRow>(capturedRows.begin(), capturedRows.end()),
+ std::move(rowBuffer));
+
+ auto transaction = GetTransaction(context);
+
+ auto result = WaitFor(transaction->PushQueueProducer(
+ ProducerPath,
+ QueuePath,
+ SessionId,
+ Epoch,
+ valueConsumer.GetNameTable(),
+ std::move(rowRange),
+ Options))
+ .ValueOrThrow();
+
+ if (ShouldCommitTransaction()) {
+ WaitFor(transaction->Commit())
+ .ThrowOnError();
+ }
+
+ ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("last_sequence_number").Value(result.LastSequenceNumber)
+ .Item("skipped_row_count").Value(result.SkippedRowCount)
+ .EndMap();
+ });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NDriver
diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h
index fc5bec8cb7..85ea7637af 100644
--- a/yt/yt/client/driver/queue_commands.h
+++ b/yt/yt/client/driver/queue_commands.h
@@ -162,4 +162,28 @@ private:
////////////////////////////////////////////////////////////////////////////////
+struct TPushQueueProducerOptions
+ : public NApi::TPushQueueProducerOptions
+ , public TTabletWriteOptions
+{ };
+
+class TPushQueueProducerCommand
+ : public TTypedCommand<TPushQueueProducerOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TPushQueueProducerCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TRichYPath ProducerPath;
+ NYPath::TRichYPath QueuePath;
+ TString SessionId;
+ i64 Epoch;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NDriver
diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp
index 362a814c95..3c9d42cfe2 100644
--- a/yt/yt/client/driver/table_commands.cpp
+++ b/yt/yt/client/driver/table_commands.cpp
@@ -7,6 +7,9 @@
#include <yt/yt/client/chaos_client/replication_card_serialization.h>
+#include <yt/yt/client/formats/config.h>
+#include <yt/yt/client/formats/parser.h>
+
#include <yt/yt/client/table_client/adapters.h>
#include <yt/yt/client/table_client/blob_reader.h>
#include <yt/yt/client/table_client/columnar_statistics.h>
@@ -19,9 +22,6 @@
#include <yt/yt/client/tablet_client/table_mount_cache.h>
-#include <yt/yt/client/formats/config.h>
-#include <yt/yt/client/formats/parser.h>
-
#include <yt/yt/client/ypath/public.h>
#include <yt/yt/core/concurrency/periodic_executor.h>
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 657cb4afb0..c4c71e882e 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -90,6 +90,15 @@ public:
i64 newOffset,
const TAdvanceConsumerOptions& options) override;
+ TFuture<TPushQueueProducerResult> PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ i64 epoch,
+ NTableClient::TNameTablePtr nameTable,
+ TSharedRange<NTableClient::TUnversionedRow> rows,
+ const TPushQueueProducerOptions& options) override;
+
TFuture<TTransactionFlushResult> Flush() override;
TFuture<void> Ping(const NApi::TTransactionPingOptions& options = {}) override;
@@ -507,6 +516,7 @@ TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const
TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
TRANSACTION_METHOD_IMPL(void, AdvanceConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceConsumerOptions&));
+TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, i64, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index 3027d32f8d..cf06d4e150 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -6,6 +6,7 @@
#include "rpc.h"
#include <yt/yt/client/api/client.h>
+#include <yt/yt/client/api/queue_transaction.h>
#include <yt/yt/client/misc/method_helpers.h>
diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h
index eb5cf3f289..72ea49cd2b 100644
--- a/yt/yt/client/queue_client/public.h
+++ b/yt/yt/client/queue_client/public.h
@@ -11,6 +11,9 @@ namespace NYT::NQueueClient {
YT_DEFINE_ERROR_ENUM(
((ConsumerOffsetConflict) (3100))
+ ((InvalidEpoch) (3101))
+ ((ZombieEpoch) (3102))
+ ((InvalidRowSequenceNumbers) (3103))
);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/table_client/public.cpp b/yt/yt/client/table_client/public.cpp
index d0bba15185..0608491cc2 100644
--- a/yt/yt/client/table_client/public.cpp
+++ b/yt/yt/client/table_client/public.cpp
@@ -15,6 +15,7 @@ const TString TimestampColumnName = SystemColumnNamePrefix + "timestamp";
const TString TtlColumnName = SystemColumnNamePrefix + "ttl";
const TString CumulativeDataWeightColumnName = SystemColumnNamePrefix + "cumulative_data_weight";
const TString EmptyValueColumnName = SystemColumnNamePrefix + "empty";
+const TString SequenceNumberColumnName = SystemColumnNamePrefix + "sequence_number";
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h
index 392cc45a18..2f03d1c8b7 100644
--- a/yt/yt/client/table_client/public.h
+++ b/yt/yt/client/table_client/public.h
@@ -126,6 +126,7 @@ extern const TString TtlColumnName;
extern const TString CumulativeDataWeightColumnName;
extern const TString EmptyValueColumnName;
extern const TString PrimaryLockName;
+extern const TString SequenceNumberColumnName;
constexpr int TypicalHunkColumnCount = 8;
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index b2c9093b9b..c03818a999 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -962,6 +962,31 @@ TTableSchemaPtr TTableSchema::ToQuery() const
}
}
+TTableSchemaPtr TTableSchema::ToWriteViaQueueProducer() const
+{
+ std::vector<TColumnSchema> columns;
+ if (IsSorted()) {
+ for (const auto& column : Columns()) {
+ if (!column.Expression()) {
+ columns.push_back(column);
+ }
+ }
+ } else {
+ columns.push_back(TColumnSchema(TabletIndexColumnName, ESimpleLogicalValueType::Int64)
+ .SetSortOrder(ESortOrder::Ascending));
+ columns.push_back(TColumnSchema(SequenceNumberColumnName, ESimpleLogicalValueType::Int64));
+ for (const auto& column : Columns()) {
+ if (column.StableName().Underlying() != TimestampColumnName &&
+ column.StableName().Underlying() != CumulativeDataWeightColumnName)
+ {
+ columns.push_back(column);
+ }
+ }
+ }
+ return New<TTableSchema>(std::move(columns), Strict_, UniqueKeys_,
+ ETableSchemaModification::None, DeletedColumns());
+}
+
TTableSchemaPtr TTableSchema::ToWrite() const
{
std::vector<TColumnSchema> columns;
diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h
index d5bb5a37aa..7813c7b888 100644
--- a/yt/yt/client/table_client/schema.h
+++ b/yt/yt/client/table_client/schema.h
@@ -327,7 +327,11 @@ public:
//! but without |$timestamp| column, if any.
TTableSchemaPtr ToWrite() const;
- //! For sorted tables, return the current schema
+ //! For sorted tables, return the current schema.
+ //! For ordered tables, prepends the current schema with |(tablet_index, sequence_number)| key column.
+ TTableSchemaPtr ToWriteViaQueueProducer() const;
+
+ //! For sorted tables, return the current schema.
//! For ordered tables, prepends the current schema with |(tablet_index)| key column.
TTableSchemaPtr WithTabletIndex() const;
diff --git a/yt/yt/client/tablet_client/table_mount_cache.h b/yt/yt/client/tablet_client/table_mount_cache.h
index 0490d9ea02..8dc6e6ceee 100644
--- a/yt/yt/client/tablet_client/table_mount_cache.h
+++ b/yt/yt/client/tablet_client/table_mount_cache.h
@@ -92,6 +92,8 @@ DEFINE_ENUM(ETableSchemaKind,
(PrimaryWithTabletIndex)
// Schema used for replication log rows.
(ReplicationLog)
+ // Schema used for inserting rows into ordered tables via queue producer.
+ (WriteViaQueueProducer)
);
struct TTableMountInfo
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index 6b3ba70b77..60ad83bef3 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -213,6 +213,15 @@ public:
NTableClient::TNameTablePtr nameTable,
TSharedRange<TRowModification> modifications,
const TModifyRowsOptions& options), (override));
+
+ MOCK_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TString& sessionId,
+ i64 epoch,
+ NTableClient::TNameTablePtr nameTable,
+ TSharedRange<NTableClient::TUnversionedRow> rows,
+ const TPushQueueProducerOptions& options), (override));
};
DEFINE_REFCOUNTED_TYPE(TMockTransaction)
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 5401e33444..5447738bb2 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -658,6 +658,29 @@ message TRspAdvanceConsumer
{
}
+////////////////////////////////////////////////////////////////////////////////
+
+message TReqPushQueueProducer
+{
+ optional NYT.NProto.TGuid transaction_id = 1;
+ optional string producer_path = 2;
+ optional string queue_path = 3;
+ optional string session_id = 4;
+ optional int64 epoch = 5;
+ optional bytes user_meta = 6; // YSON
+ optional int64 sequence_number = 7;
+
+ required TRowsetDescriptor rowset_descriptor = 200;
+}
+
+message TRspPushQueueProducer
+{
+ optional int64 last_sequence_number = 1;
+ optional int64 skipped_row_count = 2;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
message TRowBatchReadOptions
{
optional int64 max_row_count = 1;
@@ -770,8 +793,8 @@ message TReqCreateQueueProducerSession
message TRspCreateQueueProducerSession
{
- optional uint64 sequence_number = 1;
- optional uint64 epoch = 2;
+ optional int64 sequence_number = 1;
+ optional int64 epoch = 2;
optional bytes user_meta = 3; // YSON
}