aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-08-06 16:10:12 +0300
committernadya73 <nadya73@yandex-team.com>2024-08-06 16:32:14 +0300
commit0322f8aa6f5794b8dca79988173ee93dd29c49f6 (patch)
tree7829a81314be7e35a77ff95cc2cee28feafdcd86
parent62f2557217f2b1f9e00011d5b328d7ac85fa1d9e (diff)
downloadydb-0322f8aa6f5794b8dca79988173ee93dd29c49f6.tar.gz
[queues] YT-21996: Producer client with batching
f005b79bfccc891d618b2f69e38fa2e322f352dc
-rw-r--r--yt/yt/client/api/delegating_transaction.cpp10
-rw-r--r--yt/yt/client/api/delegating_transaction.h9
-rw-r--r--yt/yt/client/api/queue_transaction.h15
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp39
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h9
-rw-r--r--yt/yt/client/federated/client.cpp10
-rw-r--r--yt/yt/client/queue_client/producer_client.cpp177
-rw-r--r--yt/yt/client/queue_client/producer_client.h67
-rw-r--r--yt/yt/client/queue_client/public.h3
-rw-r--r--yt/yt/client/table_client/wire_protocol.cpp24
-rw-r--r--yt/yt/client/table_client/wire_protocol.h4
-rw-r--r--yt/yt/client/unittests/mock/transaction.h9
-rw-r--r--yt/yt/client/ya.make1
-rw-r--r--yt/yt/core/test_framework/framework.cpp13
-rw-r--r--yt/yt/core/test_framework/framework.h5
15 files changed, 380 insertions, 15 deletions
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp
index 2b1cb55d32..3e539bf9f7 100644
--- a/yt/yt/client/api/delegating_transaction.cpp
+++ b/yt/yt/client/api/delegating_transaction.cpp
@@ -302,6 +302,16 @@ DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
const TPushQueueProducerOptions& options),
(producerPath, queuePath, sessionId, epoch, nameTable, rows, options))
+DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
+ NTableClient::TNameTablePtr nameTable,
+ const std::vector<TSharedRef>& serializedRows,
+ const TPushQueueProducerOptions& options),
+ (producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options))
+
#undef DELEGATE_METHOD
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h
index 37aeb46f4a..533a048204 100644
--- a/yt/yt/client/api/delegating_transaction.h
+++ b/yt/yt/client/api/delegating_transaction.h
@@ -247,6 +247,15 @@ public:
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options) override;
+ TFuture<TPushQueueProducerResult> PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
+ NTableClient::TNameTablePtr nameTable,
+ const std::vector<TSharedRef>& serializedRows,
+ const TPushQueueProducerOptions& options) override;
+
protected:
const ITransactionPtr Underlying_;
};
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h
index 4c47445619..f01beae541 100644
--- a/yt/yt/client/api/queue_transaction.h
+++ b/yt/yt/client/api/queue_transaction.h
@@ -87,6 +87,21 @@ struct IQueueTransaction
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options = {}) = 0;
+
+ //! Write rows in the queue with checking their sequence number.
+ /*!
+ * If row sequence number is less than sequence number saved in producer table, then this row will not be written.
+ * #sessionId - an identificator of write session, for example, `<host>-<filename>`.
+ * #epoch - a number of producer epoch. All calls with an epoch less than the current epoch will fail.
+ */
+ virtual TFuture<TPushQueueProducerResult> PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
+ NTableClient::TNameTablePtr nameTable,
+ const std::vector<TSharedRef>& serializedRows,
+ const TPushQueueProducerOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index a5ed64fa5c..6d32ba3995 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -4,11 +4,14 @@
#include "config.h"
#include "private.h"
-#include <yt/yt/client/transaction_client/helpers.h>
+#include <yt/yt/client/api/transaction.h>
+
+#include <yt/yt/client/table_client/name_table.h>
+#include <yt/yt/client/table_client/wire_protocol.h>
#include <yt/yt/client/tablet_client/table_mount_cache.h>
-#include <yt/yt/client/api/transaction.h>
+#include <yt/yt/client/transaction_client/helpers.h>
namespace NYT::NApi::NRpcProxy {
@@ -534,7 +537,7 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
const TQueueProducerSessionId& sessionId,
TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
- TSharedRange<NTableClient::TUnversionedRow> rows,
+ const std::vector<TSharedRef>& serializedRows,
const TPushQueueProducerOptions& options)
{
ValidateTabletTransactionId(GetId());
@@ -569,10 +572,16 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
ToProto(req->mutable_user_meta(), ConvertToYsonString(options.UserMeta).ToString());
}
- req->Attachments() = SerializeRowset(
- nameTable,
- TRange(rows),
- req->mutable_rowset_descriptor());
+ auto* descriptor = req->mutable_rowset_descriptor();
+ descriptor->Clear();
+ descriptor->set_wire_format_version(NApi::NRpcProxy::CurrentWireFormatVersion);
+ descriptor->set_rowset_kind(NProto::RK_UNVERSIONED);
+ for (int id = 0; id < nameTable->GetSize(); ++id) {
+ auto* entry = descriptor->add_name_table_entries();
+ entry->set_name(TString(nameTable->GetName(id)));
+ }
+
+ req->Attachments() = serializedRows;
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPushQueueProducerPtr& rsp) {
return TPushQueueProducerResult{
@@ -582,6 +591,22 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
}));
}
+TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TQueueProducerSessionId& sessionId,
+ TQueueProducerEpoch epoch,
+ NTableClient::TNameTablePtr nameTable,
+ TSharedRange<NTableClient::TUnversionedRow> rows,
+ const TPushQueueProducerOptions& options)
+{
+ auto writer = CreateWireProtocolWriter();
+ writer->WriteUnversionedRowset(rows);
+ auto serializedRows = writer->Finish();
+
+ return PushQueueProducer(producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options);
+}
+
TFuture<ITransactionPtr> TTransaction::StartTransaction(
ETransactionType type,
const TTransactionStartOptions& options)
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index 588469d752..636b8fe2e2 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -93,6 +93,15 @@ public:
const NQueueClient::TQueueProducerSessionId& sessionId,
NQueueClient::TQueueProducerEpoch epoch,
NTableClient::TNameTablePtr nameTable,
+ const std::vector<TSharedRef>& serializedRows,
+ const TPushQueueProducerOptions& options) override;
+
+ TFuture<TPushQueueProducerResult> PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
+ NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options) override;
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index c5b200d409..3302fcd923 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -102,6 +102,15 @@ public:
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options) override;
+ TFuture<TPushQueueProducerResult> PushQueueProducer(
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const TQueueProducerSessionId& sessionId,
+ TQueueProducerEpoch epoch,
+ NTableClient::TNameTablePtr nameTable,
+ const std::vector<TSharedRef>& serializedRows,
+ const TPushQueueProducerOptions& options) override;
+
TFuture<TTransactionFlushResult> Flush() override;
TFuture<void> Ping(const NApi::TTransactionPingOptions& options = {}) override;
@@ -527,6 +536,7 @@ TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRo
TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
TRANSACTION_METHOD_IMPL(void, AdvanceQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceQueueConsumerOptions&));
TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, TQueueProducerEpoch, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&));
+TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, TQueueProducerEpoch, NTableClient::TNameTablePtr, const std::vector<TSharedRef>&, const TPushQueueProducerOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp
new file mode 100644
index 0000000000..2115805de0
--- /dev/null
+++ b/yt/yt/client/queue_client/producer_client.cpp
@@ -0,0 +1,177 @@
+#include "producer_client.h"
+
+#include <yt/yt/client/api/client.h>
+#include <yt/yt/client/api/transaction.h>
+
+#include <yt/yt/client/table_client/name_table.h>
+#include <yt/yt/client/table_client/wire_protocol.h>
+
+#include <yt/yt/client/ypath/rich.h>
+
+#include <yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h>
+
+namespace NYT::NQueueClient {
+
+using namespace NApi;
+using namespace NConcurrency;
+using namespace NCrypto;
+using namespace NTableClient;
+using namespace NThreading;
+using namespace NTransactionClient;
+using namespace NYPath;
+using namespace NYTree;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TProducerSession
+ : public IProducerSession
+{
+public:
+ TProducerSession(
+ IClientPtr client,
+ TRichYPath producerPath,
+ TRichYPath queuePath,
+ TNameTablePtr nameTable,
+ TQueueProducerSessionId sessionId,
+ TCreateQueueProducerSessionResult createSessionResult,
+ TProducerSessionOptions options)
+ : Client_(std::move(client))
+ , ProducerPath_(std::move(producerPath))
+ , QueuePath_(std::move(queuePath))
+ , NameTable_(std::move(nameTable))
+ , SessionId_(std::move(sessionId))
+ , Options_(std::move(options))
+ , Epoch_(createSessionResult.Epoch)
+ , LastSequenceNumber_(createSessionResult.SequenceNumber)
+ , UserMeta_(std::move(createSessionResult.UserMeta))
+ , BufferedRowWriter_(CreateWireProtocolWriter())
+ { }
+
+ // TODO(nadya73): add possibility to pass user meta.
+
+ TQueueProducerSequenceNumber GetLastSequenceNumber() const override
+ {
+ return LastSequenceNumber_;
+ }
+
+ const INodePtr& GetUserMeta() const override
+ {
+ return UserMeta_;
+ }
+
+ bool Write(TRange<TUnversionedRow> rows) override
+ {
+ for (const auto& row : rows) {
+ BufferedRowWriter_->WriteUnversionedRow(row);
+ ++BufferedRowCount_;
+ }
+ return BufferedRowWriter_->GetByteSize() < Options_.MaxBufferSize;
+ }
+
+ TFuture<void> GetReadyEvent() override
+ {
+ return TryToFlush();
+ }
+
+ TFuture<void> Close() override
+ {
+ return Flush();
+ }
+
+
+ std::optional<TMD5Hash> GetDigest() const override
+ {
+ return std::nullopt;
+ }
+
+private:
+ const IClientPtr Client_;
+ const TRichYPath ProducerPath_;
+ const TRichYPath QueuePath_;
+ const TNameTablePtr NameTable_;
+ const TQueueProducerSessionId SessionId_;
+ const TProducerSessionOptions Options_;
+
+ TQueueProducerEpoch Epoch_ = TQueueProducerEpoch{0};
+ TQueueProducerSequenceNumber LastSequenceNumber_ = TQueueProducerSequenceNumber{0};
+ INodePtr UserMeta_;
+
+ std::unique_ptr<IWireProtocolWriter> BufferedRowWriter_;
+ i64 BufferedRowCount_ = 0;
+
+ YT_DECLARE_SPIN_LOCK(TSpinLock, SpinLock_);
+
+ TFuture<void> TryToFlush()
+ {
+ if (BufferedRowWriter_->GetByteSize() >= Options_.MaxBufferSize) {
+ return Flush();
+ }
+ return VoidFuture;
+ }
+
+ TFuture<void> Flush()
+ {
+ auto guard = Guard(SpinLock_);
+
+ auto writer = CreateWireProtocolWriter();
+ writer->WriteSerializedRowset(BufferedRowCount_, BufferedRowWriter_->Finish());
+ BufferedRowWriter_ = CreateWireProtocolWriter();
+ BufferedRowCount_ = 0;
+
+ return Client_->StartTransaction(ETransactionType::Tablet)
+ .Apply(BIND([writer = std::move(writer), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) {
+ TPushQueueProducerOptions pushQueueProducerOptions;
+ if (Options_.AutoSequenceNumber) {
+ pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.Underlying() + 1};
+ }
+
+ return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, writer->Finish(), pushQueueProducerOptions)
+ .Apply(BIND([=, this, this_ = MakeStrong(this)] (const TPushQueueProducerResult& pushQueueProducerResult) {
+ LastSequenceNumber_ = pushQueueProducerResult.LastSequenceNumber;
+ return transaction->Commit();
+ }));
+ })).AsVoid();
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TProducerClient
+ : public IProducerClient
+{
+public:
+ TProducerClient(IClientPtr client, TRichYPath producerPath)
+ : Client_(std::move(client))
+ , ProducerPath_(std::move(producerPath))
+ { }
+
+ TFuture<IProducerSessionPtr> CreateSession(
+ const TRichYPath& queuePath,
+ const TNameTablePtr& nameTable,
+ const TQueueProducerSessionId& sessionId,
+ const TProducerSessionOptions& options) override
+ {
+ return Client_->CreateQueueProducerSession(ProducerPath_, queuePath, sessionId)
+ .Apply(BIND([=, this, this_ = MakeStrong(this)]
+ (const TCreateQueueProducerSessionResult& createSessionResult) -> IProducerSessionPtr {
+ return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options);
+ }));
+ }
+
+private:
+ const IClientPtr Client_;
+ const TRichYPath ProducerPath_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IProducerClientPtr CreateProducerClient(
+ const IClientPtr& client,
+ const TRichYPath& producerPath)
+{
+ return New<TProducerClient>(client, producerPath);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h
new file mode 100644
index 0000000000..4e1b46756a
--- /dev/null
+++ b/yt/yt/client/queue_client/producer_client.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/client/api/public.h>
+
+#include <yt/yt/client/table_client/public.h>
+#include <yt/yt/client/table_client/unversioned_writer.h>
+
+#include <yt/yt/client/ypath/public.h>
+
+#include <yt/yt/core/actions/future.h>
+
+#include <yt/yt/core/ytree/yson_struct.h>
+
+namespace NYT::NQueueClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TProducerSessionOptions
+{
+ //! If true, sequence numbers will be incremented automatically,
+ //! and rows should not contain values of $sequence_number column.
+ //! If false, each row should contain value of $sequnce_number column.
+ bool AutoSequenceNumber = false;
+
+ //! Size of buffer when rows will be flushed to server.
+ size_t MaxBufferSize = 1_MB;
+};
+
+struct IProducerSession
+ : public NTableClient::IUnversionedRowsetWriter
+{
+ //! Get sequence number of last pushed row.
+ //! For example, it can be gotten right after creating session
+ //! to understand what rows should be written now.
+ virtual TQueueProducerSequenceNumber GetLastSequenceNumber() const = 0;
+
+ //! Get user meta saved in the producer session.
+ virtual const NYTree::INodePtr& GetUserMeta() const = 0;
+};
+DEFINE_REFCOUNTED_TYPE(IProducerSession)
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IProducerClient
+ : public virtual TRefCounted
+{
+ //! Create a session (or increase its epoch) and return session writer.
+ //! NB: Session writer return by this method is NOT thread-safe.
+ virtual TFuture<IProducerSessionPtr> CreateSession(
+ const NYPath::TRichYPath& queuePath,
+ const NTableClient::TNameTablePtr& nameTable,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ const TProducerSessionOptions& options = {}) = 0;
+};
+DEFINE_REFCOUNTED_TYPE(IProducerClient)
+
+////////////////////////////////////////////////////////////////////////////////
+
+IProducerClientPtr CreateProducerClient(
+ const NApi::IClientPtr& client,
+ const NYPath::TRichYPath& producerPath);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h
index 12f0378140..f9a0e2e132 100644
--- a/yt/yt/client/queue_client/public.h
+++ b/yt/yt/client/queue_client/public.h
@@ -25,6 +25,9 @@ DECLARE_REFCOUNTED_STRUCT(IPersistentQueueRowset)
DECLARE_REFCOUNTED_STRUCT(IConsumerClient)
DECLARE_REFCOUNTED_STRUCT(ISubConsumerClient)
+DECLARE_REFCOUNTED_STRUCT(IProducerClient)
+DECLARE_REFCOUNTED_STRUCT(IProducerSession)
+
DECLARE_REFCOUNTED_STRUCT(IPartitionReader)
DECLARE_REFCOUNTED_CLASS(TPartitionReaderConfig)
DECLARE_REFCOUNTED_CLASS(TQueueStaticExportDestinationConfig)
diff --git a/yt/yt/client/table_client/wire_protocol.cpp b/yt/yt/client/table_client/wire_protocol.cpp
index 6bb5446eca..32da27163e 100644
--- a/yt/yt/client/table_client/wire_protocol.cpp
+++ b/yt/yt/client/table_client/wire_protocol.cpp
@@ -87,7 +87,7 @@ public:
size_t GetByteSize() const override
{
- return Stream_.GetSize();
+ return Stream_.GetSize() + (Current_ - BeginPreallocated_);
}
void WriteCommand(EWireProtocolCommand command) override
@@ -206,17 +206,29 @@ public:
TRange<TUnversionedRow> rowset,
const TNameTableToSchemaIdMapping* idMapping) override
{
- WriteRowCount(rowset);
+ WriteRowCount(rowset.Size());
for (auto row : rowset) {
WriteUnversionedRow(row, idMapping);
}
}
+ void WriteSerializedRowset(
+ size_t rowCount,
+ const std::vector<TSharedRef>& serializedRowset) override
+ {
+ WriteRowCount(rowCount);
+
+ for (const auto& item : serializedRowset) {
+ EnsureCapacity(item.Size());
+ UnsafeWriteRaw(item.Data(), item.Size());
+ }
+ }
+
void WriteSchemafulRowset(
TRange<TUnversionedRow> rowset,
const TNameTableToSchemaIdMapping* idMapping) override
{
- WriteRowCount(rowset);
+ WriteRowCount(rowset.Size());
for (auto row : rowset) {
WriteSchemafulRow(row, idMapping);
}
@@ -224,7 +236,7 @@ public:
void WriteVersionedRowset(TRange<TVersionedRow> rowset) override
{
- WriteRowCount(rowset);
+ WriteRowCount(rowset.Size());
for (auto row : rowset) {
WriteVersionedRow(row);
}
@@ -326,10 +338,8 @@ private:
UnsafeWritePod(value);
}
- template <class TRow>
- void WriteRowCount(TRange<TRow> rowset)
+ void WriteRowCount(size_t rowCount)
{
- size_t rowCount = rowset.Size();
ValidateRowCount(rowCount);
WriteUint64(rowCount);
}
diff --git a/yt/yt/client/table_client/wire_protocol.h b/yt/yt/client/table_client/wire_protocol.h
index a38259b986..b83888b1ab 100644
--- a/yt/yt/client/table_client/wire_protocol.h
+++ b/yt/yt/client/table_client/wire_protocol.h
@@ -171,6 +171,10 @@ public:
NTableClient::TUnversionedValueRange valueRange,
const NTableClient::TNameTableToSchemaIdMapping* idMapping = nullptr) = 0;
+ virtual void WriteSerializedRowset(
+ size_t rowCount,
+ const std::vector<TSharedRef>& serializedRowset) = 0;
+
virtual void WriteUnversionedRowset(
TRange<NTableClient::TUnversionedRow> rowset,
const NTableClient::TNameTableToSchemaIdMapping* idMapping = nullptr) = 0;
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index d158b665e6..5d4ada47f9 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -222,6 +222,15 @@ public:
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
const TPushQueueProducerOptions& options), (override));
+
+ MOCK_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
+ const NYPath::TRichYPath& producerPath,
+ const NYPath::TRichYPath& queuePath,
+ const NQueueClient::TQueueProducerSessionId& sessionId,
+ NQueueClient::TQueueProducerEpoch epoch,
+ NTableClient::TNameTablePtr nameTable,
+ const std::vector<TSharedRef>& serializedRows,
+ const TPushQueueProducerOptions& options), (override));
};
DEFINE_REFCOUNTED_TYPE(TMockTransaction)
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index 1886973025..1d135c9366 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -152,6 +152,7 @@ SRCS(
queue_client/consumer_client.cpp
queue_client/helpers.cpp
queue_client/partition_reader.cpp
+ queue_client/producer_client.cpp
queue_client/queue_rowset.cpp
ypath/rich.cpp
diff --git a/yt/yt/core/test_framework/framework.cpp b/yt/yt/core/test_framework/framework.cpp
index 7ae48b9f68..4c1c4ea563 100644
--- a/yt/yt/core/test_framework/framework.cpp
+++ b/yt/yt/core/test_framework/framework.cpp
@@ -59,7 +59,18 @@ void WaitForPredicate(
}
}
}
- THROW_ERROR_EXCEPTION("Wait failed");
+ THROW_ERROR_EXCEPTION("Wait failed: %s", options.Message);
+}
+
+void WaitForPredicate(
+ std::function<bool()> predicate,
+ const TString& message)
+{
+ WaitForPredicate(
+ std::move(predicate),
+ TWaitForPredicateOptions{
+ .Message = message,
+ });
}
void WaitForPredicate(
diff --git a/yt/yt/core/test_framework/framework.h b/yt/yt/core/test_framework/framework.h
index f7bc3e0528..4255a3eb44 100644
--- a/yt/yt/core/test_framework/framework.h
+++ b/yt/yt/core/test_framework/framework.h
@@ -51,6 +51,7 @@ struct TWaitForPredicateOptions
int IterationCount = 300;
TDuration Period = TDuration::MilliSeconds(100);
bool IgnoreExceptions = false;
+ TString Message = "<no-message>";
};
void WaitForPredicate(
@@ -59,6 +60,10 @@ void WaitForPredicate(
void WaitForPredicate(
std::function<bool()> predicate,
+ const TString& message);
+
+void WaitForPredicate(
+ std::function<bool()> predicate,
int iterationCount = 300,
TDuration period = TDuration::MilliSeconds(100));