summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yt/yt/client/queue_client/producer_client.cpp160
-rw-r--r--yt/yt/client/queue_client/producer_client.h20
2 files changed, 158 insertions, 22 deletions
diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp
index 2115805de06..c42e9c26f52 100644
--- a/yt/yt/client/queue_client/producer_client.cpp
+++ b/yt/yt/client/queue_client/producer_client.cpp
@@ -1,5 +1,7 @@
#include "producer_client.h"
+#include "private.h"
+
#include <yt/yt/client/api/client.h>
#include <yt/yt/client/api/transaction.h>
@@ -10,6 +12,9 @@
#include <yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h>
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/periodic_executor.h>
+
namespace NYT::NQueueClient {
using namespace NApi;
@@ -23,6 +28,10 @@ using namespace NYTree;
////////////////////////////////////////////////////////////////////////////////
+static constexpr auto& Logger = QueueClientLogger;
+
+////////////////////////////////////////////////////////////////////////////////
+
class TProducerSession
: public IProducerSession
{
@@ -34,24 +43,41 @@ public:
TNameTablePtr nameTable,
TQueueProducerSessionId sessionId,
TCreateQueueProducerSessionResult createSessionResult,
- TProducerSessionOptions options)
+ TProducerSessionOptions options,
+ IInvokerPtr invoker)
: Client_(std::move(client))
, ProducerPath_(std::move(producerPath))
, QueuePath_(std::move(queuePath))
, NameTable_(std::move(nameTable))
, SessionId_(std::move(sessionId))
, Options_(std::move(options))
+ , Invoker_(std::move(invoker))
, Epoch_(createSessionResult.Epoch)
, LastSequenceNumber_(createSessionResult.SequenceNumber)
, UserMeta_(std::move(createSessionResult.UserMeta))
, BufferedRowWriter_(CreateWireProtocolWriter())
- { }
+ {
+ if (Options_.BackgroundFlushPeriod) {
+ if (!Invoker_) {
+ THROW_ERROR_EXCEPTION("Cannot create producer session with background flush without invoker");
+ }
+
+ FlushExecutor_ = New<TPeriodicExecutor>(
+ Invoker_,
+ BIND(&TProducerSession::OnFlush, NYT::MakeWeak(this)),
+ *Options_.BackgroundFlushPeriod);
+ } else {
+ if (!Options_.BatchOptions.RowCount && !Options_.BatchOptions.ByteSize) {
+ THROW_ERROR_EXCEPTION("One of batch row count or batch byte size should be specified");
+ }
+ }
+ }
// TODO(nadya73): add possibility to pass user meta.
TQueueProducerSequenceNumber GetLastSequenceNumber() const override
{
- return LastSequenceNumber_;
+ return LastSequenceNumber_.load();
}
const INodePtr& GetUserMeta() const override
@@ -61,23 +87,71 @@ public:
bool Write(TRange<TUnversionedRow> rows) override
{
+ auto guard = Guard(SpinLock_);
+
+ if (!Started_) {
+ if (FlushExecutor_) {
+ FlushExecutor_->Start();
+ }
+ Started_ = true;
+ }
+
+ if (Closed_) {
+ return false;
+ }
+
for (const auto& row : rows) {
BufferedRowWriter_->WriteUnversionedRow(row);
++BufferedRowCount_;
}
- return BufferedRowWriter_->GetByteSize() < Options_.MaxBufferSize;
+ if (IsFlushNeeded()) {
+ if (!FlushExecutor_) {
+ return false;
+ }
+ FlushExecutor_->ScheduleOutOfBand();
+ }
+ return true;
}
TFuture<void> GetReadyEvent() override
{
+ if (FlushExecutor_) {
+ return VoidFuture;
+ }
+
+ {
+ auto guard = Guard(SpinLock_);
+ if (Closed_) {
+ return MakeFuture<void>(TError("Producer session was closed"));
+ }
+ }
return TryToFlush();
}
TFuture<void> Close() override
{
- return Flush();
- }
+ {
+ auto guard = Guard(SpinLock_);
+ Closed_ = true;
+ }
+
+ // Run one last flush will finish writing the remaining items and
+ // eventually lead to the stop promise being set.
+ // A single flush is enough since it is guaranteed that no new messages are added to the queue after the
+ // critical section above.
+
+ if (!FlushExecutor_) {
+ return TryToFlush();
+ }
+
+ FlushExecutor_->ScheduleOutOfBand();
+
+ return StoppedPromise_.ToFuture()
+ .Apply(BIND([this, this_ = MakeStrong(this)] {
+ return FlushExecutor_->Stop();
+ }));
+ }
std::optional<TMD5Hash> GetDigest() const override
{
@@ -91,43 +165,92 @@ private:
const TNameTablePtr NameTable_;
const TQueueProducerSessionId SessionId_;
const TProducerSessionOptions Options_;
+ const IInvokerPtr Invoker_;
TQueueProducerEpoch Epoch_ = TQueueProducerEpoch{0};
- TQueueProducerSequenceNumber LastSequenceNumber_ = TQueueProducerSequenceNumber{0};
+ std::atomic<TQueueProducerSequenceNumber> LastSequenceNumber_ = TQueueProducerSequenceNumber{0};
INodePtr UserMeta_;
std::unique_ptr<IWireProtocolWriter> BufferedRowWriter_;
i64 BufferedRowCount_ = 0;
+ bool Started_ = false;
+ TPeriodicExecutorPtr FlushExecutor_;
+
+ bool Closed_ = false;
+ TPromise<void> StoppedPromise_ = NewPromise<void>();
+
YT_DECLARE_SPIN_LOCK(TSpinLock, SpinLock_);
+ bool IsFlushNeeded() const
+ {
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ return (Options_.BatchOptions.ByteSize && static_cast<i64>(BufferedRowWriter_->GetByteSize()) >= *Options_.BatchOptions.ByteSize)
+ || (Options_.BatchOptions.RowCount && BufferedRowCount_ >= *Options_.BatchOptions.RowCount);
+ }
+
TFuture<void> TryToFlush()
{
- if (BufferedRowWriter_->GetByteSize() >= Options_.MaxBufferSize) {
- return Flush();
+ std::vector<TSharedRef> serializedRows;
+ {
+ auto guard = Guard(SpinLock_);
+ if (!IsFlushNeeded() && !Closed_) {
+ return VoidFuture;
+ }
+ serializedRows = GetRowsToFlushAndResetBuffer();
}
- return VoidFuture;
+ return Flush(std::move(serializedRows));
}
- TFuture<void> Flush()
+ std::vector<TSharedRef> GetRowsToFlushAndResetBuffer()
{
- auto guard = Guard(SpinLock_);
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
auto writer = CreateWireProtocolWriter();
writer->WriteSerializedRowset(BufferedRowCount_, BufferedRowWriter_->Finish());
BufferedRowWriter_ = CreateWireProtocolWriter();
BufferedRowCount_ = 0;
+ return writer->Finish();
+ }
+
+ void OnFlush()
+ {
+ std::vector<TSharedRef> serializedRows;
+ {
+ auto guard = Guard(SpinLock_);
+ YT_LOG_DEBUG("Flushing rows (RowCount: %v)", BufferedRowCount_);
+ serializedRows = GetRowsToFlushAndResetBuffer();
+ }
+
+ WaitFor(Flush(std::move(serializedRows)))
+ .ThrowOnError();
+ bool isStopped = false;
+ {
+ auto guard = Guard(SpinLock_);
+ if (Closed_ && BufferedRowCount_ == 0) {
+ isStopped = true;
+ }
+ }
+
+ if (isStopped) {
+ StoppedPromise_.TrySet();
+ }
+ }
+
+ TFuture<void> Flush(std::vector<TSharedRef> serializedRows)
+ {
return Client_->StartTransaction(ETransactionType::Tablet)
- .Apply(BIND([writer = std::move(writer), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) {
+ .Apply(BIND([serializedRows = std::move(serializedRows), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) {
TPushQueueProducerOptions pushQueueProducerOptions;
if (Options_.AutoSequenceNumber) {
- pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.Underlying() + 1};
+ pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.load().Underlying() + 1};
}
- return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, writer->Finish(), pushQueueProducerOptions)
+ return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, serializedRows, pushQueueProducerOptions)
.Apply(BIND([=, this, this_ = MakeStrong(this)] (const TPushQueueProducerResult& pushQueueProducerResult) {
- LastSequenceNumber_ = pushQueueProducerResult.LastSequenceNumber;
+ LastSequenceNumber_.store(pushQueueProducerResult.LastSequenceNumber);
return transaction->Commit();
}));
})).AsVoid();
@@ -149,12 +272,13 @@ public:
const TRichYPath& queuePath,
const TNameTablePtr& nameTable,
const TQueueProducerSessionId& sessionId,
- const TProducerSessionOptions& options) override
+ const TProducerSessionOptions& options,
+ const IInvokerPtr& invoker) override
{
return Client_->CreateQueueProducerSession(ProducerPath_, queuePath, sessionId)
.Apply(BIND([=, this, this_ = MakeStrong(this)]
(const TCreateQueueProducerSessionResult& createSessionResult) -> IProducerSessionPtr {
- return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options);
+ return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options, invoker);
}));
}
diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h
index 4e1b46756a5..4fc6c2deb97 100644
--- a/yt/yt/client/queue_client/producer_client.h
+++ b/yt/yt/client/queue_client/producer_client.h
@@ -17,6 +17,14 @@ namespace NYT::NQueueClient {
////////////////////////////////////////////////////////////////////////////////
+struct TProducerSessionBatchOptions
+{
+ //! Weight of serialized buffered rows when flush will be called regardless of the background flush period.
+ std::optional<i64> ByteSize;
+ //! Buffered rows count when flush will be called regardless of the background flush period.
+ std::optional<i64> RowCount;
+};
+
struct TProducerSessionOptions
{
//! If true, sequence numbers will be incremented automatically,
@@ -24,8 +32,12 @@ struct TProducerSessionOptions
//! If false, each row should contain value of $sequnce_number column.
bool AutoSequenceNumber = false;
- //! Size of buffer when rows will be flushed to server.
- size_t MaxBufferSize = 1_MB;
+ //! Batch sizes when rows will be flushed to server regardless of the background flush period (if it is specified).
+ //! If there is no background flush, than one of `BatchOptions::ByteSize` or `BatchOptions::RowCount` should be specified.
+ TProducerSessionBatchOptions BatchOptions;
+
+ //! If set, rows will be flushed in background with this period.
+ std::optional<TDuration> BackgroundFlushPeriod;
};
struct IProducerSession
@@ -47,12 +59,12 @@ struct IProducerClient
: public virtual TRefCounted
{
//! Create a session (or increase its epoch) and return session writer.
- //! NB: Session writer return by this method is NOT thread-safe.
virtual TFuture<IProducerSessionPtr> CreateSession(
const NYPath::TRichYPath& queuePath,
const NTableClient::TNameTablePtr& nameTable,
const NQueueClient::TQueueProducerSessionId& sessionId,
- const TProducerSessionOptions& options = {}) = 0;
+ const TProducerSessionOptions& options = {},
+ const IInvokerPtr& invoker = nullptr) = 0;
};
DEFINE_REFCOUNTED_TYPE(IProducerClient)