diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-10-11 14:31:49 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-10-11 14:42:20 +0300 |
commit | d657fe6264cf660e2ebdd4b30541f5af72c4a3cf (patch) | |
tree | 3af1ce19a7ea668e3c2dd8cc6f20a3c848908652 | |
parent | 759d217781debc6eae3de2a2bce5c76c452ff6c4 (diff) | |
download | ydb-d657fe6264cf660e2ebdd4b30541f5af72c4a3cf.tar.gz |
[queues] YT-21996: Add Flush() in producer client
commit_hash:27868d66efa2608a6ce4e074e1c2d6516b4c5590
-rw-r--r-- | yt/yt/client/queue_client/producer_client.cpp | 32 | ||||
-rw-r--r-- | yt/yt/client/queue_client/producer_client.h | 5 |
2 files changed, 29 insertions, 8 deletions
diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp index c42e9c26f5..4040bd337c 100644 --- a/yt/yt/client/queue_client/producer_client.cpp +++ b/yt/yt/client/queue_client/producer_client.cpp @@ -50,8 +50,8 @@ public: , QueuePath_(std::move(queuePath)) , NameTable_(std::move(nameTable)) , SessionId_(std::move(sessionId)) - , Options_(std::move(options)) , Invoker_(std::move(invoker)) + , Options_(std::move(options)) , Epoch_(createSessionResult.Epoch) , LastSequenceNumber_(createSessionResult.SequenceNumber) , UserMeta_(std::move(createSessionResult.UserMeta)) @@ -68,7 +68,8 @@ public: *Options_.BackgroundFlushPeriod); } else { if (!Options_.BatchOptions.RowCount && !Options_.BatchOptions.ByteSize) { - THROW_ERROR_EXCEPTION("One of batch row count or batch byte size should be specified"); + YT_LOG_DEBUG("None of batch row count or batch byte size are specified, batch byte size will be equal to 16 MB"); + Options_.BatchOptions.ByteSize = 16_MB; } } } @@ -128,6 +129,23 @@ public: return TryToFlush(); } + TFuture<void> Flush() override + { + if (!FlushExecutor_) { + std::vector<TSharedRef> serializedRows; + { + auto guard = Guard(SpinLock_); + YT_LOG_DEBUG("Flushing rows (RowCount: %v)", BufferedRowCount_); + serializedRows = GetRowsToFlushAndResetBuffer(); + } + + return FlushImpl(serializedRows); + } + + FlushExecutor_->ScheduleOutOfBand(); + return FlushExecutor_->GetExecutedEvent(); + } + TFuture<void> Close() override { { @@ -135,7 +153,6 @@ public: Closed_ = true; } - // Run one last flush will finish writing the remaining items and // eventually lead to the stop promise being set. // A single flush is enough since it is guaranteed that no new messages are added to the queue after the @@ -164,9 +181,10 @@ private: const TRichYPath QueuePath_; const TNameTablePtr NameTable_; const TQueueProducerSessionId SessionId_; - const TProducerSessionOptions Options_; const IInvokerPtr Invoker_; + TProducerSessionOptions Options_; + TQueueProducerEpoch Epoch_ = TQueueProducerEpoch{0}; std::atomic<TQueueProducerSequenceNumber> LastSequenceNumber_ = TQueueProducerSequenceNumber{0}; INodePtr UserMeta_; @@ -200,7 +218,7 @@ private: } serializedRows = GetRowsToFlushAndResetBuffer(); } - return Flush(std::move(serializedRows)); + return FlushImpl(std::move(serializedRows)); } std::vector<TSharedRef> GetRowsToFlushAndResetBuffer() @@ -223,7 +241,7 @@ private: serializedRows = GetRowsToFlushAndResetBuffer(); } - WaitFor(Flush(std::move(serializedRows))) + WaitFor(FlushImpl(std::move(serializedRows))) .ThrowOnError(); bool isStopped = false; @@ -239,7 +257,7 @@ private: } } - TFuture<void> Flush(std::vector<TSharedRef> serializedRows) + TFuture<void> FlushImpl(std::vector<TSharedRef> serializedRows) { return Client_->StartTransaction(ETransactionType::Tablet) .Apply(BIND([serializedRows = std::move(serializedRows), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) { diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h index 4fc6c2deb9..3fc73ed3a8 100644 --- a/yt/yt/client/queue_client/producer_client.h +++ b/yt/yt/client/queue_client/producer_client.h @@ -33,7 +33,7 @@ struct TProducerSessionOptions bool AutoSequenceNumber = false; //! Batch sizes when rows will be flushed to server regardless of the background flush period (if it is specified). - //! If there is no background flush, than one of `BatchOptions::ByteSize` or `BatchOptions::RowCount` should be specified. + //! If there is no background flush, rows will be flush when `BatchOptions::ByteSize` or `BatchOptions::RowCount` is reached. If none of them are specified, `BatchOptions::ByteSize` will be equal to 16 MB. TProducerSessionBatchOptions BatchOptions; //! If set, rows will be flushed in background with this period. @@ -50,6 +50,9 @@ struct IProducerSession //! Get user meta saved in the producer session. virtual const NYTree::INodePtr& GetUserMeta() const = 0; + + //! Flush all written rows. + virtual TFuture<void> Flush() = 0; }; DEFINE_REFCOUNTED_TYPE(IProducerSession) |