aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/subscription/subscription.h
blob: afe5eda71119846fc8f3f2afe31a6810301e2359 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#pragma once

#include <library/cpp/threading/future/future.h>

#include <util/generic/hash.h>
#include <util/generic/ptr.h>
#include <util/generic/vector.h>
#include <util/system/mutex.h>

#include <functional>
#include <optional>
#include <utility>

namespace NThreading {

namespace NPrivate {

struct TNoexceptExecutor {
    template <typename T, typename F>
    void operator()(TFuture<T> const& future, F&& callee) const noexcept {
        return callee(future);
    }
};

}

class TSubscriptionManager;

using TSubscriptionManagerPtr = TIntrusivePtr<TSubscriptionManager>;

//! A subscription id
class TSubscriptionId {
private:
    TFutureStateId StateId_;
    ui64 SubId_; // Secondary id to make the whole subscription id unique

    friend class TSubscriptionManager;

public:
    TFutureStateId StateId() const noexcept {
        return StateId_;
    }

    ui64 SubId() const noexcept {
        return SubId_;
    }

private:
    TSubscriptionId(TFutureStateId stateId, ui64 subId)
        : StateId_(stateId)
        , SubId_(subId)
    {
    }

    void SetSubId(ui64 subId) noexcept {
        SubId_ = subId;
    }
};

bool operator==(TSubscriptionId const& l, TSubscriptionId const& r) noexcept;
bool operator!=(TSubscriptionId const& l, TSubscriptionId const& r) noexcept;

//! The subscription manager manages subscriptions to futures
/** It provides an ability to create (and drop) multiple subscriptions to any future
    with just a single underlying subscription per future.

    When a future is signaled all its subscriptions are removed.
    So, there no need to call Unsubscribe for subscriptions to already signaled futures.

    Warning!!! For correct operation this class imposes the following requirement to futures/promises:
    Any used future must be signaled (value or exception set) before the future state destruction.
    Otherwise subscriptions and futures may happen.
    Current future design does not provide the required guarantee. But that should be fixed soon.
**/
class TSubscriptionManager final : public TAtomicRefCount<TSubscriptionManager> {
private:
    //! A single subscription
    class TSubscription {
    private:
        std::function<void()> Callback;

    public:
        template <typename T, typename F, typename TCallbackExecutor>
        TSubscription(TFuture<T> future, F&& callback, TCallbackExecutor&& executor);

        void operator()();
    };

    struct TFutureStateIdHash {
        size_t operator()(TFutureStateId const id) const noexcept {
            auto const value = id.Value();
            return ::hash<decltype(value)>()(value);
        }
    };

private:
    THashMap<TFutureStateId, THashMap<ui64, TSubscription>, TFutureStateIdHash> Subscriptions;
    ui64 Revision = 0;
    TMutex Lock;

public:
    //! Creates a new subscription manager instance
    static TSubscriptionManagerPtr NewInstance();

    //! The default subscription manager instance
    static TSubscriptionManagerPtr Default();

    //! Attempts to subscribe the callback to the future
    /** Subscription should succeed if the future is not signaled yet.
        Otherwise the callback will be called synchronously and nullopt will be returned

        @param future - The future to subscribe to
        @param callback - The callback to attach
        @return The subscription id on success, nullopt if the future has been signaled already
    **/
    template <typename T, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
    std::optional<TSubscriptionId> Subscribe(TFuture<T> const& future, F&& callback
                                                , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());

    //! Drops the subscription with the given id
    /** @param id - The subscription id
    **/
    void Unsubscribe(TSubscriptionId id);

    //! Attempts to subscribe the callback to the set of futures
    /** @param futures - The futures to subscribe to
        @param callback - The callback to attach
        @param revertOnSignaled - Shows whether to stop and revert the subscription process if one of the futures is in signaled state
        @return The vector of subscription ids if no revert happened or an empty vector otherwise
                A subscription id will be valid even if a corresponding future has been signaled
    **/
    template <typename TFutures, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
    TVector<TSubscriptionId> Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled = false
                                                    , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());

    //! Attempts to subscribe the callback to the set of futures
    /** @param futures - The futures to subscribe to
        @param callback - The callback to attach
        @param revertOnSignaled - Shows whether to stop and revert the subscription process if one of the futures is in signaled state
        @return The vector of subscription ids if no revert happened or an empty vector otherwise
                A subscription id will be valid even if a corresponding future has been signaled
    **/
    template <typename T, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
    TVector<TSubscriptionId> Subscribe(std::initializer_list<TFuture<T> const> futures, F&& callback, bool revertOnSignaled = false
                                                    , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());

    //! Drops the subscriptions with the given ids
    /** @param ids - The subscription ids
    **/
    void Unsubscribe(TVector<TSubscriptionId> const& ids);

private:
    enum class ECallbackStatus {
        Subscribed, //! A subscription has been created. The callback will be called asynchronously.
        ExecutedSynchronously //! A callback has been called synchronously. No subscription has been created
    };

private:
    //! .ctor
    TSubscriptionManager() = default;
    //! Processes a callback from a future
    void OnCallback(TFutureStateId stateId) noexcept;
    //! Attempts to create a subscription
    /** This method should be called under the lock
    **/
    template <typename T, typename F, typename TCallbackExecutor>
    ECallbackStatus TrySubscribe(TFuture<T> const& future, F&& callback, TFutureStateId stateId, TCallbackExecutor&& executor);
    //! Batch subscribe implementation
    template <typename TFutures, typename F, typename TCallbackExecutor>
    TVector<TSubscriptionId> SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled
                                                        , TCallbackExecutor const& executor);
    //! Unsubscribe implementation
    /** This method should be called under the lock
    **/
    void UnsubscribeImpl(TSubscriptionId id);
    //! Batch unsubscribe implementation
    /** This method should be called under the lock
    **/
    void UnsubscribeImpl(TVector<TSubscriptionId> const& ids);
};

}

#define INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H
#include "subscription-inl.h"
#undef INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H