blob: 7d14eed51b302e761a1ab81342ebbad0bfe87e02 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
#pragma once
#include "public.h"
#include "delayed_executor.h"
#include <yt/yt/core/actions/callback.h>
#include <yt/yt/core/actions/future.h>
namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
struct TPeriodicExecutorOptions
{
static constexpr double DefaultJitter = 0.2;
//! Interval between usual consequent invocations; if null then no invocations will be happening.
std::optional<TDuration> Period;
TDuration Splay;
double Jitter = 0.0;
//! Sets #Period and Applies set#DefaultJitter.
static TPeriodicExecutorOptions WithJitter(TDuration period);
};
//! Helps to perform certain actions periodically.
class TPeriodicExecutor
: public TRefCounted
{
public:
//! Initializes an instance.
/*!
* \note
* We must call #Start to activate the instance.
*
* \param invoker Invoker used for wrapping actions.
* \param callback Callback to invoke periodically.
* \param options Period, splay, etc.
*/
TPeriodicExecutor(
IInvokerPtr invoker,
TClosure callback,
TPeriodicExecutorOptions options);
TPeriodicExecutor(
IInvokerPtr invoker,
TClosure callback,
std::optional<TDuration> period = {});
//! Starts the instance.
//! The first invocation happens with a random delay within splay time.
void Start();
//! Stops the instance, cancels all subsequent invocations.
//! Returns a future that becomes set when all outstanding callback
//! invocations are finished and no more invocations are expected to happen.
TFuture<void> Stop();
//! Requests an immediate invocation.
void ScheduleOutOfBand();
//! Changes execution period.
void SetPeriod(std::optional<TDuration> period);
//! Returns the future that become set when
//! at least one action be fully executed from the moment of method call.
//! Cancellation of the returned future will not affect the action
//! or other futures returned by this method.
TFuture<void> GetExecutedEvent();
private:
const IInvokerPtr Invoker_;
const TClosure Callback_;
std::optional<TDuration> Period_;
const TDuration Splay_;
const double Jitter_;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
bool Started_ = false;
bool Busy_ = false;
bool OutOfBandRequested_ = false;
bool ExecutingCallback_ = false;
TCallback<void(const TError&)> ExecutionCanceler_;
TDelayedExecutorCookie Cookie_;
TPromise<void> IdlePromise_;
TPromise<void> ExecutedPromise_;
void DoStop(TGuard<NThreading::TSpinLock>& guard);
static TError MakeStoppedError();
void InitIdlePromise();
void InitExecutedPromise();
void PostDelayedCallback(TDuration delay);
void PostCallback();
void OnTimer(bool aborted);
void OnCallbackSuccess();
void OnCallbackFailure();
TDuration NextDelay();
};
DEFINE_REFCOUNTED_TYPE(TPeriodicExecutor)
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NConcurrency
|