aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-10-11 14:31:49 +0300
committernadya73 <nadya73@yandex-team.com>2024-10-11 14:42:20 +0300
commitd657fe6264cf660e2ebdd4b30541f5af72c4a3cf (patch)
tree3af1ce19a7ea668e3c2dd8cc6f20a3c848908652
parent759d217781debc6eae3de2a2bce5c76c452ff6c4 (diff)
downloadydb-d657fe6264cf660e2ebdd4b30541f5af72c4a3cf.tar.gz
[queues] YT-21996: Add Flush() in producer client
commit_hash:27868d66efa2608a6ce4e074e1c2d6516b4c5590
-rw-r--r--yt/yt/client/queue_client/producer_client.cpp32
-rw-r--r--yt/yt/client/queue_client/producer_client.h5
2 files changed, 29 insertions, 8 deletions
diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp
index c42e9c26f5..4040bd337c 100644
--- a/yt/yt/client/queue_client/producer_client.cpp
+++ b/yt/yt/client/queue_client/producer_client.cpp
@@ -50,8 +50,8 @@ public:
, QueuePath_(std::move(queuePath))
, NameTable_(std::move(nameTable))
, SessionId_(std::move(sessionId))
- , Options_(std::move(options))
, Invoker_(std::move(invoker))
+ , Options_(std::move(options))
, Epoch_(createSessionResult.Epoch)
, LastSequenceNumber_(createSessionResult.SequenceNumber)
, UserMeta_(std::move(createSessionResult.UserMeta))
@@ -68,7 +68,8 @@ public:
*Options_.BackgroundFlushPeriod);
} else {
if (!Options_.BatchOptions.RowCount && !Options_.BatchOptions.ByteSize) {
- THROW_ERROR_EXCEPTION("One of batch row count or batch byte size should be specified");
+ YT_LOG_DEBUG("None of batch row count or batch byte size are specified, batch byte size will be equal to 16 MB");
+ Options_.BatchOptions.ByteSize = 16_MB;
}
}
}
@@ -128,6 +129,23 @@ public:
return TryToFlush();
}
+ TFuture<void> Flush() override
+ {
+ if (!FlushExecutor_) {
+ std::vector<TSharedRef> serializedRows;
+ {
+ auto guard = Guard(SpinLock_);
+ YT_LOG_DEBUG("Flushing rows (RowCount: %v)", BufferedRowCount_);
+ serializedRows = GetRowsToFlushAndResetBuffer();
+ }
+
+ return FlushImpl(serializedRows);
+ }
+
+ FlushExecutor_->ScheduleOutOfBand();
+ return FlushExecutor_->GetExecutedEvent();
+ }
+
TFuture<void> Close() override
{
{
@@ -135,7 +153,6 @@ public:
Closed_ = true;
}
-
// Run one last flush will finish writing the remaining items and
// eventually lead to the stop promise being set.
// A single flush is enough since it is guaranteed that no new messages are added to the queue after the
@@ -164,9 +181,10 @@ private:
const TRichYPath QueuePath_;
const TNameTablePtr NameTable_;
const TQueueProducerSessionId SessionId_;
- const TProducerSessionOptions Options_;
const IInvokerPtr Invoker_;
+ TProducerSessionOptions Options_;
+
TQueueProducerEpoch Epoch_ = TQueueProducerEpoch{0};
std::atomic<TQueueProducerSequenceNumber> LastSequenceNumber_ = TQueueProducerSequenceNumber{0};
INodePtr UserMeta_;
@@ -200,7 +218,7 @@ private:
}
serializedRows = GetRowsToFlushAndResetBuffer();
}
- return Flush(std::move(serializedRows));
+ return FlushImpl(std::move(serializedRows));
}
std::vector<TSharedRef> GetRowsToFlushAndResetBuffer()
@@ -223,7 +241,7 @@ private:
serializedRows = GetRowsToFlushAndResetBuffer();
}
- WaitFor(Flush(std::move(serializedRows)))
+ WaitFor(FlushImpl(std::move(serializedRows)))
.ThrowOnError();
bool isStopped = false;
@@ -239,7 +257,7 @@ private:
}
}
- TFuture<void> Flush(std::vector<TSharedRef> serializedRows)
+ TFuture<void> FlushImpl(std::vector<TSharedRef> serializedRows)
{
return Client_->StartTransaction(ETransactionType::Tablet)
.Apply(BIND([serializedRows = std::move(serializedRows), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) {
diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h
index 4fc6c2deb9..3fc73ed3a8 100644
--- a/yt/yt/client/queue_client/producer_client.h
+++ b/yt/yt/client/queue_client/producer_client.h
@@ -33,7 +33,7 @@ struct TProducerSessionOptions
bool AutoSequenceNumber = false;
//! Batch sizes when rows will be flushed to server regardless of the background flush period (if it is specified).
- //! If there is no background flush, than one of `BatchOptions::ByteSize` or `BatchOptions::RowCount` should be specified.
+ //! If there is no background flush, rows will be flush when `BatchOptions::ByteSize` or `BatchOptions::RowCount` is reached. If none of them are specified, `BatchOptions::ByteSize` will be equal to 16 MB.
TProducerSessionBatchOptions BatchOptions;
//! If set, rows will be flushed in background with this period.
@@ -50,6 +50,9 @@ struct IProducerSession
//! Get user meta saved in the producer session.
virtual const NYTree::INodePtr& GetUserMeta() const = 0;
+
+ //! Flush all written rows.
+ virtual TFuture<void> Flush() = 0;
};
DEFINE_REFCOUNTED_TYPE(IProducerSession)