blob: 5681f27b135f2e9b61041fc04e19eebcf03bd07d (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
#pragma once
#include <yt/yt/core/actions/future.h>
#include <vector>
namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
DEFINE_ENUM(ETNonblockingBatcherTimerState,
(Initial)
(Started)
(Finished)
);
////////////////////////////////////////////////////////////////////////////////
template <class TBatchLimiter, class T>
concept CBatchLimiter = requires(TBatchLimiter& limiter, const T& element) {
{ limiter.IsFull() } -> std::same_as<bool>;
{ limiter.Add(element) } -> std::same_as<void>;
};
////////////////////////////////////////////////////////////////////////////////
class TBatchSizeLimiter
{
public:
explicit TBatchSizeLimiter(int maxBatchSize);
// Should return true only if batch is ready for flush.
bool IsFull() const;
// Invoked for each element added to the current batch.
template <class T>
void Add(const T& element);
private:
int CurrentBatchSize_ = 0;
int MaxBatchSize_;
};
////////////////////////////////////////////////////////////////////////////////
template <class... TLimiters>
class TCompositeBatchLimiter
{
public:
using TLimitersTuple = std::tuple<TLimiters...>;
explicit TCompositeBatchLimiter(TLimiters... limiters);
bool IsFull() const;
template <class T>
void Add(const T& element);
private:
TLimitersTuple Limiters_;
};
////////////////////////////////////////////////////////////////////////////////
//! Nonblocking MPMC queue that supports batching.
/*!
* TNonblockingBatcher accepts is configured as follows:
* - batchLimiter is a custom batch limiter. See TBatchSizeLimiter for an example.
* - batchDuration is a time period to create the batch.
* If producer exceeds batchDuration the consumer receives awaited batch.
* If there is no consumer then the batch will be limited by batchLimiter.
*/
template <class T, CBatchLimiter<T> TBatchLimiter = TBatchSizeLimiter>
class TNonblockingBatcher
: public TRefCounted
{
public:
using TBatch = std::vector<T>;
TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration, bool allowEmptyBatches = false);
~TNonblockingBatcher();
template <class... U>
void Enqueue(U&& ... u);
TFuture<TBatch> DequeueBatch();
void Drop();
void UpdateBatchDuration(TDuration batchDuration);
void UpdateBatchLimiter(TBatchLimiter batchLimiter);
void UpdateAllowEmptyBatches(bool allowEmptyBatches);
void UpdateSettings(TDuration batchDuration, TBatchLimiter batchLimiter, bool allowEmptyBatches);
//! Flush all prepared and in-progress batches and set active promises with error.
//! Used to clear batcher at the end of its lifetime.
std::vector<TBatch> Drain();
private:
using ETimerState = ETNonblockingBatcherTimerState;
TBatchLimiter BatchLimiter_;
TDuration BatchDuration_;
bool AllowEmptyBatches_;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
TBatch CurrentBatch_;
TBatchLimiter CurrentBatchLimiter_;
ETimerState TimerState_ = ETimerState::Initial;
std::deque<TBatch> Batches_;
std::deque<TPromise<TBatch>> Promises_;
TDelayedExecutorCookie BatchFlushCookie_;
ui64 FlushGeneration_ = 0;
void ResetTimer(TGuard<NThreading::TSpinLock>& guard);
void StartTimer(TGuard<NThreading::TSpinLock>& guard);
bool IsFlushNeeded(TGuard<NThreading::TSpinLock>& guard) const;
void CheckFlush(TGuard<NThreading::TSpinLock>& guard);
void CheckReturn(TGuard<NThreading::TSpinLock>& guard);
void OnBatchTimeout(ui64 generation);
};
template <class T>
using TNonblockingBatcherPtr = TIntrusivePtr<TNonblockingBatcher<T>>;
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NConcurrency
#define NONBLOCKING_BATCHER_INL_H_
#include "nonblocking_batcher-inl.h"
#undef NONBLOCKING_BATCHER_INL_H_
|