diff options
| -rw-r--r-- | yt/yt/client/queue_client/producer_client.cpp | 160 | ||||
| -rw-r--r-- | yt/yt/client/queue_client/producer_client.h | 20 |
2 files changed, 158 insertions, 22 deletions
diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp index 2115805de06..c42e9c26f52 100644 --- a/yt/yt/client/queue_client/producer_client.cpp +++ b/yt/yt/client/queue_client/producer_client.cpp @@ -1,5 +1,7 @@ #include "producer_client.h" +#include "private.h" + #include <yt/yt/client/api/client.h> #include <yt/yt/client/api/transaction.h> @@ -10,6 +12,9 @@ #include <yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h> +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/periodic_executor.h> + namespace NYT::NQueueClient { using namespace NApi; @@ -23,6 +28,10 @@ using namespace NYTree; //////////////////////////////////////////////////////////////////////////////// +static constexpr auto& Logger = QueueClientLogger; + +//////////////////////////////////////////////////////////////////////////////// + class TProducerSession : public IProducerSession { @@ -34,24 +43,41 @@ public: TNameTablePtr nameTable, TQueueProducerSessionId sessionId, TCreateQueueProducerSessionResult createSessionResult, - TProducerSessionOptions options) + TProducerSessionOptions options, + IInvokerPtr invoker) : Client_(std::move(client)) , ProducerPath_(std::move(producerPath)) , QueuePath_(std::move(queuePath)) , NameTable_(std::move(nameTable)) , SessionId_(std::move(sessionId)) , Options_(std::move(options)) + , Invoker_(std::move(invoker)) , Epoch_(createSessionResult.Epoch) , LastSequenceNumber_(createSessionResult.SequenceNumber) , UserMeta_(std::move(createSessionResult.UserMeta)) , BufferedRowWriter_(CreateWireProtocolWriter()) - { } + { + if (Options_.BackgroundFlushPeriod) { + if (!Invoker_) { + THROW_ERROR_EXCEPTION("Cannot create producer session with background flush without invoker"); + } + + FlushExecutor_ = New<TPeriodicExecutor>( + Invoker_, + BIND(&TProducerSession::OnFlush, NYT::MakeWeak(this)), + *Options_.BackgroundFlushPeriod); + } else { + if (!Options_.BatchOptions.RowCount && !Options_.BatchOptions.ByteSize) { + THROW_ERROR_EXCEPTION("One of batch row count or batch byte size should be specified"); + } + } + } // TODO(nadya73): add possibility to pass user meta. TQueueProducerSequenceNumber GetLastSequenceNumber() const override { - return LastSequenceNumber_; + return LastSequenceNumber_.load(); } const INodePtr& GetUserMeta() const override @@ -61,23 +87,71 @@ public: bool Write(TRange<TUnversionedRow> rows) override { + auto guard = Guard(SpinLock_); + + if (!Started_) { + if (FlushExecutor_) { + FlushExecutor_->Start(); + } + Started_ = true; + } + + if (Closed_) { + return false; + } + for (const auto& row : rows) { BufferedRowWriter_->WriteUnversionedRow(row); ++BufferedRowCount_; } - return BufferedRowWriter_->GetByteSize() < Options_.MaxBufferSize; + if (IsFlushNeeded()) { + if (!FlushExecutor_) { + return false; + } + FlushExecutor_->ScheduleOutOfBand(); + } + return true; } TFuture<void> GetReadyEvent() override { + if (FlushExecutor_) { + return VoidFuture; + } + + { + auto guard = Guard(SpinLock_); + if (Closed_) { + return MakeFuture<void>(TError("Producer session was closed")); + } + } return TryToFlush(); } TFuture<void> Close() override { - return Flush(); - } + { + auto guard = Guard(SpinLock_); + Closed_ = true; + } + + // Run one last flush will finish writing the remaining items and + // eventually lead to the stop promise being set. + // A single flush is enough since it is guaranteed that no new messages are added to the queue after the + // critical section above. + + if (!FlushExecutor_) { + return TryToFlush(); + } + + FlushExecutor_->ScheduleOutOfBand(); + + return StoppedPromise_.ToFuture() + .Apply(BIND([this, this_ = MakeStrong(this)] { + return FlushExecutor_->Stop(); + })); + } std::optional<TMD5Hash> GetDigest() const override { @@ -91,43 +165,92 @@ private: const TNameTablePtr NameTable_; const TQueueProducerSessionId SessionId_; const TProducerSessionOptions Options_; + const IInvokerPtr Invoker_; TQueueProducerEpoch Epoch_ = TQueueProducerEpoch{0}; - TQueueProducerSequenceNumber LastSequenceNumber_ = TQueueProducerSequenceNumber{0}; + std::atomic<TQueueProducerSequenceNumber> LastSequenceNumber_ = TQueueProducerSequenceNumber{0}; INodePtr UserMeta_; std::unique_ptr<IWireProtocolWriter> BufferedRowWriter_; i64 BufferedRowCount_ = 0; + bool Started_ = false; + TPeriodicExecutorPtr FlushExecutor_; + + bool Closed_ = false; + TPromise<void> StoppedPromise_ = NewPromise<void>(); + YT_DECLARE_SPIN_LOCK(TSpinLock, SpinLock_); + bool IsFlushNeeded() const + { + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + return (Options_.BatchOptions.ByteSize && static_cast<i64>(BufferedRowWriter_->GetByteSize()) >= *Options_.BatchOptions.ByteSize) + || (Options_.BatchOptions.RowCount && BufferedRowCount_ >= *Options_.BatchOptions.RowCount); + } + TFuture<void> TryToFlush() { - if (BufferedRowWriter_->GetByteSize() >= Options_.MaxBufferSize) { - return Flush(); + std::vector<TSharedRef> serializedRows; + { + auto guard = Guard(SpinLock_); + if (!IsFlushNeeded() && !Closed_) { + return VoidFuture; + } + serializedRows = GetRowsToFlushAndResetBuffer(); } - return VoidFuture; + return Flush(std::move(serializedRows)); } - TFuture<void> Flush() + std::vector<TSharedRef> GetRowsToFlushAndResetBuffer() { - auto guard = Guard(SpinLock_); + VERIFY_SPINLOCK_AFFINITY(SpinLock_); auto writer = CreateWireProtocolWriter(); writer->WriteSerializedRowset(BufferedRowCount_, BufferedRowWriter_->Finish()); BufferedRowWriter_ = CreateWireProtocolWriter(); BufferedRowCount_ = 0; + return writer->Finish(); + } + + void OnFlush() + { + std::vector<TSharedRef> serializedRows; + { + auto guard = Guard(SpinLock_); + YT_LOG_DEBUG("Flushing rows (RowCount: %v)", BufferedRowCount_); + serializedRows = GetRowsToFlushAndResetBuffer(); + } + + WaitFor(Flush(std::move(serializedRows))) + .ThrowOnError(); + bool isStopped = false; + { + auto guard = Guard(SpinLock_); + if (Closed_ && BufferedRowCount_ == 0) { + isStopped = true; + } + } + + if (isStopped) { + StoppedPromise_.TrySet(); + } + } + + TFuture<void> Flush(std::vector<TSharedRef> serializedRows) + { return Client_->StartTransaction(ETransactionType::Tablet) - .Apply(BIND([writer = std::move(writer), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) { + .Apply(BIND([serializedRows = std::move(serializedRows), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) { TPushQueueProducerOptions pushQueueProducerOptions; if (Options_.AutoSequenceNumber) { - pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.Underlying() + 1}; + pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.load().Underlying() + 1}; } - return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, writer->Finish(), pushQueueProducerOptions) + return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, serializedRows, pushQueueProducerOptions) .Apply(BIND([=, this, this_ = MakeStrong(this)] (const TPushQueueProducerResult& pushQueueProducerResult) { - LastSequenceNumber_ = pushQueueProducerResult.LastSequenceNumber; + LastSequenceNumber_.store(pushQueueProducerResult.LastSequenceNumber); return transaction->Commit(); })); })).AsVoid(); @@ -149,12 +272,13 @@ public: const TRichYPath& queuePath, const TNameTablePtr& nameTable, const TQueueProducerSessionId& sessionId, - const TProducerSessionOptions& options) override + const TProducerSessionOptions& options, + const IInvokerPtr& invoker) override { return Client_->CreateQueueProducerSession(ProducerPath_, queuePath, sessionId) .Apply(BIND([=, this, this_ = MakeStrong(this)] (const TCreateQueueProducerSessionResult& createSessionResult) -> IProducerSessionPtr { - return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options); + return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options, invoker); })); } diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h index 4e1b46756a5..4fc6c2deb97 100644 --- a/yt/yt/client/queue_client/producer_client.h +++ b/yt/yt/client/queue_client/producer_client.h @@ -17,6 +17,14 @@ namespace NYT::NQueueClient { //////////////////////////////////////////////////////////////////////////////// +struct TProducerSessionBatchOptions +{ + //! Weight of serialized buffered rows when flush will be called regardless of the background flush period. + std::optional<i64> ByteSize; + //! Buffered rows count when flush will be called regardless of the background flush period. + std::optional<i64> RowCount; +}; + struct TProducerSessionOptions { //! If true, sequence numbers will be incremented automatically, @@ -24,8 +32,12 @@ struct TProducerSessionOptions //! If false, each row should contain value of $sequnce_number column. bool AutoSequenceNumber = false; - //! Size of buffer when rows will be flushed to server. - size_t MaxBufferSize = 1_MB; + //! Batch sizes when rows will be flushed to server regardless of the background flush period (if it is specified). + //! If there is no background flush, than one of `BatchOptions::ByteSize` or `BatchOptions::RowCount` should be specified. + TProducerSessionBatchOptions BatchOptions; + + //! If set, rows will be flushed in background with this period. + std::optional<TDuration> BackgroundFlushPeriod; }; struct IProducerSession @@ -47,12 +59,12 @@ struct IProducerClient : public virtual TRefCounted { //! Create a session (or increase its epoch) and return session writer. - //! NB: Session writer return by this method is NOT thread-safe. virtual TFuture<IProducerSessionPtr> CreateSession( const NYPath::TRichYPath& queuePath, const NTableClient::TNameTablePtr& nameTable, const NQueueClient::TQueueProducerSessionId& sessionId, - const TProducerSessionOptions& options = {}) = 0; + const TProducerSessionOptions& options = {}, + const IInvokerPtr& invoker = nullptr) = 0; }; DEFINE_REFCOUNTED_TYPE(IProducerClient) |
