aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yt/core/concurrency/periodic_executor.h
blob: 7d14eed51b302e761a1ab81342ebbad0bfe87e02 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#pragma once

#include "public.h"
#include "delayed_executor.h"

#include <yt/yt/core/actions/callback.h>
#include <yt/yt/core/actions/future.h>

namespace NYT::NConcurrency {

////////////////////////////////////////////////////////////////////////////////

struct TPeriodicExecutorOptions
{
    static constexpr double DefaultJitter = 0.2;

    //! Interval between usual consequent invocations; if null then no invocations will be happening.
    std::optional<TDuration> Period;
    TDuration Splay;
    double Jitter = 0.0;

    //! Sets #Period and Applies set#DefaultJitter.
    static TPeriodicExecutorOptions WithJitter(TDuration period);
};

//! Helps to perform certain actions periodically.
class TPeriodicExecutor
    : public TRefCounted
{
public:
    //! Initializes an instance.
    /*!
     *  \note
     *  We must call #Start to activate the instance.
     *
     *  \param invoker Invoker used for wrapping actions.
     *  \param callback Callback to invoke periodically.
     *  \param options Period, splay, etc.
     */
    TPeriodicExecutor(
        IInvokerPtr invoker,
        TClosure callback,
        TPeriodicExecutorOptions options);

    TPeriodicExecutor(
        IInvokerPtr invoker,
        TClosure callback,
        std::optional<TDuration> period = {});

    //! Starts the instance.
    //! The first invocation happens with a random delay within splay time.
    void Start();

    //! Stops the instance, cancels all subsequent invocations.
    //! Returns a future that becomes set when all outstanding callback
    //! invocations are finished and no more invocations are expected to happen.
    TFuture<void> Stop();

    //! Requests an immediate invocation.
    void ScheduleOutOfBand();

    //! Changes execution period.
    void SetPeriod(std::optional<TDuration> period);

    //! Returns the future that become set when
    //! at least one action be fully executed from the moment of method call.
    //! Cancellation of the returned future will not affect the action
    //! or other futures returned by this method.
    TFuture<void> GetExecutedEvent();

private:
    const IInvokerPtr Invoker_;
    const TClosure Callback_;
    std::optional<TDuration> Period_;
    const TDuration Splay_;
    const double Jitter_;

    YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
    bool Started_ = false;
    bool Busy_ = false;
    bool OutOfBandRequested_ = false;
    bool ExecutingCallback_ = false;
    TCallback<void(const TError&)> ExecutionCanceler_;
    TDelayedExecutorCookie Cookie_;
    TPromise<void> IdlePromise_;
    TPromise<void> ExecutedPromise_;

    void DoStop(TGuard<NThreading::TSpinLock>& guard);

    static TError MakeStoppedError();

    void InitIdlePromise();
    void InitExecutedPromise();

    void PostDelayedCallback(TDuration delay);
    void PostCallback();

    void OnTimer(bool aborted);
    void OnCallbackSuccess();
    void OnCallbackFailure();

    TDuration NextDelay();
};

DEFINE_REFCOUNTED_TYPE(TPeriodicExecutor)

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NConcurrency