aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/subscription/wait.h
blob: 516a27bb731ed7b6225fcfbe8563bce89df6d3e8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#pragma once 
 
#include "subscription.h" 
 
#include <util/generic/vector.h> 
#include <util/generic/yexception.h> 
#include <util/system/spinlock.h> 
 
 
#include <initializer_list> 
 
namespace NThreading::NPrivate { 
 
template <typename TDerived> 
class TWait : public TThrRefBase { 
private: 
    TSubscriptionManagerPtr Manager; 
    TVector<TSubscriptionId> Subscriptions; 
    bool Unsubscribed = false; 
 
protected: 
    TAdaptiveLock Lock; 
    TPromise<void> Promise; 
 
public: 
    template <typename TFutures, typename TCallbackExecutor> 
    static TFuture<void> Make(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { 
        TIntrusivePtr<TDerived> w(new TDerived(std::move(manager))); 
        w->Subscribe(futures, std::forward<TCallbackExecutor>(executor)); 
        return w->Promise.GetFuture(); 
    } 
 
protected: 
    TWait(TSubscriptionManagerPtr manager) 
        : Manager(std::move(manager)) 
        , Subscriptions() 
        , Unsubscribed(false) 
        , Lock() 
        , Promise(NewPromise()) 
    { 
        Y_ENSURE(Manager != nullptr); 
    } 
 
protected: 
    //! Unsubscribes all existing subscriptions 
    /** Lock should be acquired! 
    **/ 
    void Unsubscribe() noexcept { 
        if (Unsubscribed) { 
            return; 
        } 
        Unsubscribe(Subscriptions); 
        Subscriptions.clear(); 
    } 
 
private: 
    //! Performs a subscription to the given futures 
    /** Lock should not be acquired! 
        @param future - The futures to subscribe to 
        @param callback - The callback to call for each future 
    **/ 
    template <typename TFutures, typename TCallbackExecutor> 
    void Subscribe(TFutures const& futures, TCallbackExecutor&& executor) { 
        auto self = TIntrusivePtr<TDerived>(static_cast<TDerived*>(this)); 
        self->BeforeSubscribe(futures); 
        auto callback = [self = std::move(self)](const auto& future) mutable { 
            self->Set(future); 
        }; 
        auto subscriptions = Manager->Subscribe(futures, callback, TDerived::RevertOnSignaled, std::forward<TCallbackExecutor>(executor)); 
        if (subscriptions.empty()) { 
            return; 
        } 
        with_lock (Lock) { 
            if (Unsubscribed) { 
                Unsubscribe(subscriptions); 
            } else { 
                Subscriptions = std::move(subscriptions); 
            } 
        } 
    } 
 
    void Unsubscribe(TVector<TSubscriptionId>& subscriptions) noexcept { 
        Manager->Unsubscribe(subscriptions); 
        Unsubscribed = true; 
    } 
}; 
 
template <typename TWaiter, typename TFutures, typename TCallbackExecutor> 
TFuture<void> Wait(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { 
    switch (std::size(futures)) { 
        case 0: 
            return MakeFuture(); 
        case 1: 
            return std::begin(futures)->IgnoreResult(); 
        default: 
            return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor)); 
    } 
} 
 
template <typename TWaiter, typename T, typename TCallbackExecutor> 
TFuture<void> Wait(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { 
    switch (std::size(futures)) { 
        case 0: 
            return MakeFuture(); 
        case 1: 
            return std::begin(futures)->IgnoreResult(); 
        default: 
            return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor)); 
    } 
} 
 
 
template <typename TWaiter, typename T, typename TCallbackExecutor> 
TFuture<void> Wait(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { 
    return TWaiter::Make(std::initializer_list<TFuture<T> const>({ future1, future2 }), std::move(manager) 
                            , std::forward<TCallbackExecutor>(executor)); 
} 
 
}