aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/subscription/subscription-inl.h
blob: a45d8999d3b652bf095edbd9eb0a7dfecd3c82eb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#pragma once

#if !defined(INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H)
#error "you should never include subscription-inl.h directly"
#endif

namespace NThreading {

namespace NPrivate {

template <typename T>
TFutureStateId CheckedStateId(TFuture<T> const& future) {
    auto const id = future.StateId();
    if (id.Defined()) {
        return *id;
    }
    ythrow TFutureException() << "Future state should be initialized";
}

}

template <typename T, typename F, typename TCallbackExecutor>
inline TSubscriptionManager::TSubscription::TSubscription(TFuture<T> future, F&& callback, TCallbackExecutor&& executor)
    : Callback(
            [future = std::move(future), callback = std::forward<F>(callback), executor = std::forward<TCallbackExecutor>(executor)]() mutable {
                executor(std::as_const(future), callback);
            })
{
}

template <typename T, typename F, typename TCallbackExecutor>
inline std::optional<TSubscriptionId> TSubscriptionManager::Subscribe(TFuture<T> const& future, F&& callback, TCallbackExecutor&& executor) {
    auto stateId = NPrivate::CheckedStateId(future);
    with_lock(Lock) {
        auto const status = TrySubscribe(future, std::forward<F>(callback), stateId, std::forward<TCallbackExecutor>(executor));
        switch (status) {
            case ECallbackStatus::Subscribed:
                return TSubscriptionId(stateId, Revision);
            case ECallbackStatus::ExecutedSynchronously:
                return {};
            default:
                Y_FAIL("Unexpected callback status");
        }
    }
}

template <typename TFutures, typename F, typename TCallbackExecutor>
inline TVector<TSubscriptionId> TSubscriptionManager::Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled
                                                                , TCallbackExecutor&& executor)
{
    return SubscribeImpl(futures, std::forward<F>(callback), revertOnSignaled, std::forward<TCallbackExecutor>(executor));
}

template <typename T, typename F, typename TCallbackExecutor>
inline TVector<TSubscriptionId> TSubscriptionManager::Subscribe(std::initializer_list<TFuture<T> const> futures, F&& callback
                                                                , bool revertOnSignaled, TCallbackExecutor&& executor)
{
    return SubscribeImpl(futures, std::forward<F>(callback), revertOnSignaled, std::forward<TCallbackExecutor>(executor));
}

template <typename T, typename F, typename TCallbackExecutor>
inline TSubscriptionManager::ECallbackStatus TSubscriptionManager::TrySubscribe(TFuture<T> const& future, F&& callback, TFutureStateId stateId
                                                                                , TCallbackExecutor&& executor)
{
    TSubscription subscription(future, std::forward<F>(callback), std::forward<TCallbackExecutor>(executor));
    auto const it = Subscriptions.find(stateId);
    auto const revision = ++Revision;
    if (it == std::end(Subscriptions)) {
        auto const success = Subscriptions.emplace(stateId, THashMap<ui64, TSubscription>{ { revision, std::move(subscription) } }).second;
        Y_VERIFY(success);
        auto self = TSubscriptionManagerPtr(this);
        future.Subscribe([self, stateId](TFuture<T> const&) { self->OnCallback(stateId); });
        if (Subscriptions.find(stateId) == std::end(Subscriptions)) {
            return ECallbackStatus::ExecutedSynchronously;
        }
    } else {
        Y_VERIFY(it->second.emplace(revision, std::move(subscription)).second);
    }
    return ECallbackStatus::Subscribed;
}

template <typename TFutures, typename F, typename TCallbackExecutor>
inline TVector<TSubscriptionId> TSubscriptionManager::SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled
                                                                    , TCallbackExecutor const& executor)
{
    TVector<TSubscriptionId> results;
    results.reserve(std::size(futures));
    // resolve all state ids to minimize processing under the lock
    for (auto const& f : futures) {
        results.push_back(TSubscriptionId(NPrivate::CheckedStateId(f), 0));
    }
    with_lock(Lock) {
        size_t i = 0;
        for (auto const& f : futures) {
            auto& r = results[i];
            auto const status = TrySubscribe(f, callback, r.StateId(), executor);
            switch (status) {
                case ECallbackStatus::Subscribed:
                    break;
                case ECallbackStatus::ExecutedSynchronously:
                    if (revertOnSignaled) {
                        // revert
                        results.crop(i);
                        UnsubscribeImpl(results);
                        return {};
                    }
                    break;
                default:
                    Y_FAIL("Unexpected callback status");
            }
            r.SetSubId(Revision);
            ++i;
        }
    }
    return results;
}

}