diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-08-06 16:10:12 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-08-06 16:32:14 +0300 |
commit | 0322f8aa6f5794b8dca79988173ee93dd29c49f6 (patch) | |
tree | 7829a81314be7e35a77ff95cc2cee28feafdcd86 | |
parent | 62f2557217f2b1f9e00011d5b328d7ac85fa1d9e (diff) | |
download | ydb-0322f8aa6f5794b8dca79988173ee93dd29c49f6.tar.gz |
[queues] YT-21996: Producer client with batching
f005b79bfccc891d618b2f69e38fa2e322f352dc
-rw-r--r-- | yt/yt/client/api/delegating_transaction.cpp | 10 | ||||
-rw-r--r-- | yt/yt/client/api/delegating_transaction.h | 9 | ||||
-rw-r--r-- | yt/yt/client/api/queue_transaction.h | 15 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.cpp | 39 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.h | 9 | ||||
-rw-r--r-- | yt/yt/client/federated/client.cpp | 10 | ||||
-rw-r--r-- | yt/yt/client/queue_client/producer_client.cpp | 177 | ||||
-rw-r--r-- | yt/yt/client/queue_client/producer_client.h | 67 | ||||
-rw-r--r-- | yt/yt/client/queue_client/public.h | 3 | ||||
-rw-r--r-- | yt/yt/client/table_client/wire_protocol.cpp | 24 | ||||
-rw-r--r-- | yt/yt/client/table_client/wire_protocol.h | 4 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/transaction.h | 9 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 1 | ||||
-rw-r--r-- | yt/yt/core/test_framework/framework.cpp | 13 | ||||
-rw-r--r-- | yt/yt/core/test_framework/framework.h | 5 |
15 files changed, 380 insertions, 15 deletions
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp index 2b1cb55d32..3e539bf9f7 100644 --- a/yt/yt/client/api/delegating_transaction.cpp +++ b/yt/yt/client/api/delegating_transaction.cpp @@ -302,6 +302,16 @@ DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( const TPushQueueProducerOptions& options), (producerPath, queuePath, sessionId, epoch, nameTable, rows, options)) +DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, + NTableClient::TNameTablePtr nameTable, + const std::vector<TSharedRef>& serializedRows, + const TPushQueueProducerOptions& options), + (producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options)) + #undef DELEGATE_METHOD //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h index 37aeb46f4a..533a048204 100644 --- a/yt/yt/client/api/delegating_transaction.h +++ b/yt/yt/client/api/delegating_transaction.h @@ -247,6 +247,15 @@ public: TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options) override; + TFuture<TPushQueueProducerResult> PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, + NTableClient::TNameTablePtr nameTable, + const std::vector<TSharedRef>& serializedRows, + const TPushQueueProducerOptions& options) override; + protected: const ITransactionPtr Underlying_; }; diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h index 4c47445619..f01beae541 100644 --- a/yt/yt/client/api/queue_transaction.h +++ b/yt/yt/client/api/queue_transaction.h @@ -87,6 +87,21 @@ struct IQueueTransaction NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options = {}) = 0; + + //! Write rows in the queue with checking their sequence number. + /*! + * If row sequence number is less than sequence number saved in producer table, then this row will not be written. + * #sessionId - an identificator of write session, for example, `<host>-<filename>`. + * #epoch - a number of producer epoch. All calls with an epoch less than the current epoch will fail. + */ + virtual TFuture<TPushQueueProducerResult> PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, + NTableClient::TNameTablePtr nameTable, + const std::vector<TSharedRef>& serializedRows, + const TPushQueueProducerOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index a5ed64fa5c..6d32ba3995 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -4,11 +4,14 @@ #include "config.h" #include "private.h" -#include <yt/yt/client/transaction_client/helpers.h> +#include <yt/yt/client/api/transaction.h> + +#include <yt/yt/client/table_client/name_table.h> +#include <yt/yt/client/table_client/wire_protocol.h> #include <yt/yt/client/tablet_client/table_mount_cache.h> -#include <yt/yt/client/api/transaction.h> +#include <yt/yt/client/transaction_client/helpers.h> namespace NYT::NApi::NRpcProxy { @@ -534,7 +537,7 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( const TQueueProducerSessionId& sessionId, TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, - TSharedRange<NTableClient::TUnversionedRow> rows, + const std::vector<TSharedRef>& serializedRows, const TPushQueueProducerOptions& options) { ValidateTabletTransactionId(GetId()); @@ -569,10 +572,16 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( ToProto(req->mutable_user_meta(), ConvertToYsonString(options.UserMeta).ToString()); } - req->Attachments() = SerializeRowset( - nameTable, - TRange(rows), - req->mutable_rowset_descriptor()); + auto* descriptor = req->mutable_rowset_descriptor(); + descriptor->Clear(); + descriptor->set_wire_format_version(NApi::NRpcProxy::CurrentWireFormatVersion); + descriptor->set_rowset_kind(NProto::RK_UNVERSIONED); + for (int id = 0; id < nameTable->GetSize(); ++id) { + auto* entry = descriptor->add_name_table_entries(); + entry->set_name(TString(nameTable->GetName(id))); + } + + req->Attachments() = serializedRows; return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPushQueueProducerPtr& rsp) { return TPushQueueProducerResult{ @@ -582,6 +591,22 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( })); } +TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TQueueProducerSessionId& sessionId, + TQueueProducerEpoch epoch, + NTableClient::TNameTablePtr nameTable, + TSharedRange<NTableClient::TUnversionedRow> rows, + const TPushQueueProducerOptions& options) +{ + auto writer = CreateWireProtocolWriter(); + writer->WriteUnversionedRowset(rows); + auto serializedRows = writer->Finish(); + + return PushQueueProducer(producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options); +} + TFuture<ITransactionPtr> TTransaction::StartTransaction( ETransactionType type, const TTransactionStartOptions& options) diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index 588469d752..636b8fe2e2 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -93,6 +93,15 @@ public: const NQueueClient::TQueueProducerSessionId& sessionId, NQueueClient::TQueueProducerEpoch epoch, NTableClient::TNameTablePtr nameTable, + const std::vector<TSharedRef>& serializedRows, + const TPushQueueProducerOptions& options) override; + + TFuture<TPushQueueProducerResult> PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, + NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options) override; diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index c5b200d409..3302fcd923 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -102,6 +102,15 @@ public: TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options) override; + TFuture<TPushQueueProducerResult> PushQueueProducer( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const TQueueProducerSessionId& sessionId, + TQueueProducerEpoch epoch, + NTableClient::TNameTablePtr nameTable, + const std::vector<TSharedRef>& serializedRows, + const TPushQueueProducerOptions& options) override; + TFuture<TTransactionFlushResult> Flush() override; TFuture<void> Ping(const NApi::TTransactionPingOptions& options = {}) override; @@ -527,6 +536,7 @@ TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRo TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); TRANSACTION_METHOD_IMPL(void, AdvanceQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceQueueConsumerOptions&)); TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, TQueueProducerEpoch, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&)); +TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TQueueProducerSessionId&, TQueueProducerEpoch, NTableClient::TNameTablePtr, const std::vector<TSharedRef>&, const TPushQueueProducerOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&)); diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp new file mode 100644 index 0000000000..2115805de0 --- /dev/null +++ b/yt/yt/client/queue_client/producer_client.cpp @@ -0,0 +1,177 @@ +#include "producer_client.h" + +#include <yt/yt/client/api/client.h> +#include <yt/yt/client/api/transaction.h> + +#include <yt/yt/client/table_client/name_table.h> +#include <yt/yt/client/table_client/wire_protocol.h> + +#include <yt/yt/client/ypath/rich.h> + +#include <yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h> + +namespace NYT::NQueueClient { + +using namespace NApi; +using namespace NConcurrency; +using namespace NCrypto; +using namespace NTableClient; +using namespace NThreading; +using namespace NTransactionClient; +using namespace NYPath; +using namespace NYTree; + +//////////////////////////////////////////////////////////////////////////////// + +class TProducerSession + : public IProducerSession +{ +public: + TProducerSession( + IClientPtr client, + TRichYPath producerPath, + TRichYPath queuePath, + TNameTablePtr nameTable, + TQueueProducerSessionId sessionId, + TCreateQueueProducerSessionResult createSessionResult, + TProducerSessionOptions options) + : Client_(std::move(client)) + , ProducerPath_(std::move(producerPath)) + , QueuePath_(std::move(queuePath)) + , NameTable_(std::move(nameTable)) + , SessionId_(std::move(sessionId)) + , Options_(std::move(options)) + , Epoch_(createSessionResult.Epoch) + , LastSequenceNumber_(createSessionResult.SequenceNumber) + , UserMeta_(std::move(createSessionResult.UserMeta)) + , BufferedRowWriter_(CreateWireProtocolWriter()) + { } + + // TODO(nadya73): add possibility to pass user meta. + + TQueueProducerSequenceNumber GetLastSequenceNumber() const override + { + return LastSequenceNumber_; + } + + const INodePtr& GetUserMeta() const override + { + return UserMeta_; + } + + bool Write(TRange<TUnversionedRow> rows) override + { + for (const auto& row : rows) { + BufferedRowWriter_->WriteUnversionedRow(row); + ++BufferedRowCount_; + } + return BufferedRowWriter_->GetByteSize() < Options_.MaxBufferSize; + } + + TFuture<void> GetReadyEvent() override + { + return TryToFlush(); + } + + TFuture<void> Close() override + { + return Flush(); + } + + + std::optional<TMD5Hash> GetDigest() const override + { + return std::nullopt; + } + +private: + const IClientPtr Client_; + const TRichYPath ProducerPath_; + const TRichYPath QueuePath_; + const TNameTablePtr NameTable_; + const TQueueProducerSessionId SessionId_; + const TProducerSessionOptions Options_; + + TQueueProducerEpoch Epoch_ = TQueueProducerEpoch{0}; + TQueueProducerSequenceNumber LastSequenceNumber_ = TQueueProducerSequenceNumber{0}; + INodePtr UserMeta_; + + std::unique_ptr<IWireProtocolWriter> BufferedRowWriter_; + i64 BufferedRowCount_ = 0; + + YT_DECLARE_SPIN_LOCK(TSpinLock, SpinLock_); + + TFuture<void> TryToFlush() + { + if (BufferedRowWriter_->GetByteSize() >= Options_.MaxBufferSize) { + return Flush(); + } + return VoidFuture; + } + + TFuture<void> Flush() + { + auto guard = Guard(SpinLock_); + + auto writer = CreateWireProtocolWriter(); + writer->WriteSerializedRowset(BufferedRowCount_, BufferedRowWriter_->Finish()); + BufferedRowWriter_ = CreateWireProtocolWriter(); + BufferedRowCount_ = 0; + + return Client_->StartTransaction(ETransactionType::Tablet) + .Apply(BIND([writer = std::move(writer), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) { + TPushQueueProducerOptions pushQueueProducerOptions; + if (Options_.AutoSequenceNumber) { + pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.Underlying() + 1}; + } + + return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, writer->Finish(), pushQueueProducerOptions) + .Apply(BIND([=, this, this_ = MakeStrong(this)] (const TPushQueueProducerResult& pushQueueProducerResult) { + LastSequenceNumber_ = pushQueueProducerResult.LastSequenceNumber; + return transaction->Commit(); + })); + })).AsVoid(); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TProducerClient + : public IProducerClient +{ +public: + TProducerClient(IClientPtr client, TRichYPath producerPath) + : Client_(std::move(client)) + , ProducerPath_(std::move(producerPath)) + { } + + TFuture<IProducerSessionPtr> CreateSession( + const TRichYPath& queuePath, + const TNameTablePtr& nameTable, + const TQueueProducerSessionId& sessionId, + const TProducerSessionOptions& options) override + { + return Client_->CreateQueueProducerSession(ProducerPath_, queuePath, sessionId) + .Apply(BIND([=, this, this_ = MakeStrong(this)] + (const TCreateQueueProducerSessionResult& createSessionResult) -> IProducerSessionPtr { + return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options); + })); + } + +private: + const IClientPtr Client_; + const TRichYPath ProducerPath_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IProducerClientPtr CreateProducerClient( + const IClientPtr& client, + const TRichYPath& producerPath) +{ + return New<TProducerClient>(client, producerPath); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NQueueClient diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h new file mode 100644 index 0000000000..4e1b46756a --- /dev/null +++ b/yt/yt/client/queue_client/producer_client.h @@ -0,0 +1,67 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/api/public.h> + +#include <yt/yt/client/table_client/public.h> +#include <yt/yt/client/table_client/unversioned_writer.h> + +#include <yt/yt/client/ypath/public.h> + +#include <yt/yt/core/actions/future.h> + +#include <yt/yt/core/ytree/yson_struct.h> + +namespace NYT::NQueueClient { + +//////////////////////////////////////////////////////////////////////////////// + +struct TProducerSessionOptions +{ + //! If true, sequence numbers will be incremented automatically, + //! and rows should not contain values of $sequence_number column. + //! If false, each row should contain value of $sequnce_number column. + bool AutoSequenceNumber = false; + + //! Size of buffer when rows will be flushed to server. + size_t MaxBufferSize = 1_MB; +}; + +struct IProducerSession + : public NTableClient::IUnversionedRowsetWriter +{ + //! Get sequence number of last pushed row. + //! For example, it can be gotten right after creating session + //! to understand what rows should be written now. + virtual TQueueProducerSequenceNumber GetLastSequenceNumber() const = 0; + + //! Get user meta saved in the producer session. + virtual const NYTree::INodePtr& GetUserMeta() const = 0; +}; +DEFINE_REFCOUNTED_TYPE(IProducerSession) + +//////////////////////////////////////////////////////////////////////////////// + +struct IProducerClient + : public virtual TRefCounted +{ + //! Create a session (or increase its epoch) and return session writer. + //! NB: Session writer return by this method is NOT thread-safe. + virtual TFuture<IProducerSessionPtr> CreateSession( + const NYPath::TRichYPath& queuePath, + const NTableClient::TNameTablePtr& nameTable, + const NQueueClient::TQueueProducerSessionId& sessionId, + const TProducerSessionOptions& options = {}) = 0; +}; +DEFINE_REFCOUNTED_TYPE(IProducerClient) + +//////////////////////////////////////////////////////////////////////////////// + +IProducerClientPtr CreateProducerClient( + const NApi::IClientPtr& client, + const NYPath::TRichYPath& producerPath); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NQueueClient diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h index 12f0378140..f9a0e2e132 100644 --- a/yt/yt/client/queue_client/public.h +++ b/yt/yt/client/queue_client/public.h @@ -25,6 +25,9 @@ DECLARE_REFCOUNTED_STRUCT(IPersistentQueueRowset) DECLARE_REFCOUNTED_STRUCT(IConsumerClient) DECLARE_REFCOUNTED_STRUCT(ISubConsumerClient) +DECLARE_REFCOUNTED_STRUCT(IProducerClient) +DECLARE_REFCOUNTED_STRUCT(IProducerSession) + DECLARE_REFCOUNTED_STRUCT(IPartitionReader) DECLARE_REFCOUNTED_CLASS(TPartitionReaderConfig) DECLARE_REFCOUNTED_CLASS(TQueueStaticExportDestinationConfig) diff --git a/yt/yt/client/table_client/wire_protocol.cpp b/yt/yt/client/table_client/wire_protocol.cpp index 6bb5446eca..32da27163e 100644 --- a/yt/yt/client/table_client/wire_protocol.cpp +++ b/yt/yt/client/table_client/wire_protocol.cpp @@ -87,7 +87,7 @@ public: size_t GetByteSize() const override { - return Stream_.GetSize(); + return Stream_.GetSize() + (Current_ - BeginPreallocated_); } void WriteCommand(EWireProtocolCommand command) override @@ -206,17 +206,29 @@ public: TRange<TUnversionedRow> rowset, const TNameTableToSchemaIdMapping* idMapping) override { - WriteRowCount(rowset); + WriteRowCount(rowset.Size()); for (auto row : rowset) { WriteUnversionedRow(row, idMapping); } } + void WriteSerializedRowset( + size_t rowCount, + const std::vector<TSharedRef>& serializedRowset) override + { + WriteRowCount(rowCount); + + for (const auto& item : serializedRowset) { + EnsureCapacity(item.Size()); + UnsafeWriteRaw(item.Data(), item.Size()); + } + } + void WriteSchemafulRowset( TRange<TUnversionedRow> rowset, const TNameTableToSchemaIdMapping* idMapping) override { - WriteRowCount(rowset); + WriteRowCount(rowset.Size()); for (auto row : rowset) { WriteSchemafulRow(row, idMapping); } @@ -224,7 +236,7 @@ public: void WriteVersionedRowset(TRange<TVersionedRow> rowset) override { - WriteRowCount(rowset); + WriteRowCount(rowset.Size()); for (auto row : rowset) { WriteVersionedRow(row); } @@ -326,10 +338,8 @@ private: UnsafeWritePod(value); } - template <class TRow> - void WriteRowCount(TRange<TRow> rowset) + void WriteRowCount(size_t rowCount) { - size_t rowCount = rowset.Size(); ValidateRowCount(rowCount); WriteUint64(rowCount); } diff --git a/yt/yt/client/table_client/wire_protocol.h b/yt/yt/client/table_client/wire_protocol.h index a38259b986..b83888b1ab 100644 --- a/yt/yt/client/table_client/wire_protocol.h +++ b/yt/yt/client/table_client/wire_protocol.h @@ -171,6 +171,10 @@ public: NTableClient::TUnversionedValueRange valueRange, const NTableClient::TNameTableToSchemaIdMapping* idMapping = nullptr) = 0; + virtual void WriteSerializedRowset( + size_t rowCount, + const std::vector<TSharedRef>& serializedRowset) = 0; + virtual void WriteUnversionedRowset( TRange<NTableClient::TUnversionedRow> rowset, const NTableClient::TNameTableToSchemaIdMapping* idMapping = nullptr) = 0; diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index d158b665e6..5d4ada47f9 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -222,6 +222,15 @@ public: NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, const TPushQueueProducerOptions& options), (override)); + + MOCK_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( + const NYPath::TRichYPath& producerPath, + const NYPath::TRichYPath& queuePath, + const NQueueClient::TQueueProducerSessionId& sessionId, + NQueueClient::TQueueProducerEpoch epoch, + NTableClient::TNameTablePtr nameTable, + const std::vector<TSharedRef>& serializedRows, + const TPushQueueProducerOptions& options), (override)); }; DEFINE_REFCOUNTED_TYPE(TMockTransaction) diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index 1886973025..1d135c9366 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -152,6 +152,7 @@ SRCS( queue_client/consumer_client.cpp queue_client/helpers.cpp queue_client/partition_reader.cpp + queue_client/producer_client.cpp queue_client/queue_rowset.cpp ypath/rich.cpp diff --git a/yt/yt/core/test_framework/framework.cpp b/yt/yt/core/test_framework/framework.cpp index 7ae48b9f68..4c1c4ea563 100644 --- a/yt/yt/core/test_framework/framework.cpp +++ b/yt/yt/core/test_framework/framework.cpp @@ -59,7 +59,18 @@ void WaitForPredicate( } } } - THROW_ERROR_EXCEPTION("Wait failed"); + THROW_ERROR_EXCEPTION("Wait failed: %s", options.Message); +} + +void WaitForPredicate( + std::function<bool()> predicate, + const TString& message) +{ + WaitForPredicate( + std::move(predicate), + TWaitForPredicateOptions{ + .Message = message, + }); } void WaitForPredicate( diff --git a/yt/yt/core/test_framework/framework.h b/yt/yt/core/test_framework/framework.h index f7bc3e0528..4255a3eb44 100644 --- a/yt/yt/core/test_framework/framework.h +++ b/yt/yt/core/test_framework/framework.h @@ -51,6 +51,7 @@ struct TWaitForPredicateOptions int IterationCount = 300; TDuration Period = TDuration::MilliSeconds(100); bool IgnoreExceptions = false; + TString Message = "<no-message>"; }; void WaitForPredicate( @@ -59,6 +60,10 @@ void WaitForPredicate( void WaitForPredicate( std::function<bool()> predicate, + const TString& message); + +void WaitForPredicate( + std::function<bool()> predicate, int iterationCount = 300, TDuration period = TDuration::MilliSeconds(100)); |