1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
#pragma once
#include "subscription.h"
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/system/spinlock.h>
#include <initializer_list>
namespace NThreading::NPrivate {
template <typename TDerived>
class TWait : public TThrRefBase {
private:
TSubscriptionManagerPtr Manager;
TVector<TSubscriptionId> Subscriptions;
bool Unsubscribed = false;
protected:
TAdaptiveLock Lock;
TPromise<void> Promise;
public:
template <typename TFutures, typename TCallbackExecutor>
static TFuture<void> Make(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
TIntrusivePtr<TDerived> w(new TDerived(std::move(manager)));
w->Subscribe(futures, std::forward<TCallbackExecutor>(executor));
return w->Promise.GetFuture();
}
protected:
TWait(TSubscriptionManagerPtr manager)
: Manager(std::move(manager))
, Subscriptions()
, Unsubscribed(false)
, Lock()
, Promise(NewPromise())
{
Y_ENSURE(Manager != nullptr);
}
protected:
//! Unsubscribes all existing subscriptions
/** Lock should be acquired!
**/
void Unsubscribe() noexcept {
if (Unsubscribed) {
return;
}
Unsubscribe(Subscriptions);
Subscriptions.clear();
}
private:
//! Performs a subscription to the given futures
/** Lock should not be acquired!
@param future - The futures to subscribe to
@param callback - The callback to call for each future
**/
template <typename TFutures, typename TCallbackExecutor>
void Subscribe(TFutures const& futures, TCallbackExecutor&& executor) {
auto self = TIntrusivePtr<TDerived>(static_cast<TDerived*>(this));
self->BeforeSubscribe(futures);
auto callback = [self = std::move(self)](const auto& future) mutable {
self->Set(future);
};
auto subscriptions = Manager->Subscribe(futures, callback, TDerived::RevertOnSignaled, std::forward<TCallbackExecutor>(executor));
if (subscriptions.empty()) {
return;
}
with_lock (Lock) {
if (Unsubscribed) {
Unsubscribe(subscriptions);
} else {
Subscriptions = std::move(subscriptions);
}
}
}
void Unsubscribe(TVector<TSubscriptionId>& subscriptions) noexcept {
Manager->Unsubscribe(subscriptions);
Unsubscribed = true;
}
};
template <typename TWaiter, typename TFutures, typename TCallbackExecutor>
TFuture<void> Wait(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
switch (std::size(futures)) {
case 0:
return MakeFuture();
case 1:
return std::begin(futures)->IgnoreResult();
default:
return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
}
}
template <typename TWaiter, typename T, typename TCallbackExecutor>
TFuture<void> Wait(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
switch (std::size(futures)) {
case 0:
return MakeFuture();
case 1:
return std::begin(futures)->IgnoreResult();
default:
return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
}
}
template <typename TWaiter, typename T, typename TCallbackExecutor>
TFuture<void> Wait(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
return TWaiter::Make(std::initializer_list<TFuture<T> const>({ future1, future2 }), std::move(manager)
, std::forward<TCallbackExecutor>(executor));
}
}
|