diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/future/subscription/wait.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/future/subscription/wait.h')
-rw-r--r-- | library/cpp/threading/future/subscription/wait.h | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/library/cpp/threading/future/subscription/wait.h b/library/cpp/threading/future/subscription/wait.h new file mode 100644 index 0000000000..533bab9d8d --- /dev/null +++ b/library/cpp/threading/future/subscription/wait.h @@ -0,0 +1,119 @@ +#pragma once + +#include "subscription.h" + +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/system/spinlock.h> + + +#include <initializer_list> + +namespace NThreading::NPrivate { + +template <typename TDerived> +class TWait : public TThrRefBase { +private: + TSubscriptionManagerPtr Manager; + TVector<TSubscriptionId> Subscriptions; + bool Unsubscribed = false; + +protected: + TAdaptiveLock Lock; + TPromise<void> Promise; + +public: + template <typename TFutures, typename TCallbackExecutor> + static TFuture<void> Make(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { + TIntrusivePtr<TDerived> w(new TDerived(std::move(manager))); + w->Subscribe(futures, std::forward<TCallbackExecutor>(executor)); + return w->Promise.GetFuture(); + } + +protected: + TWait(TSubscriptionManagerPtr manager) + : Manager(std::move(manager)) + , Subscriptions() + , Unsubscribed(false) + , Lock() + , Promise(NewPromise()) + { + Y_ENSURE(Manager != nullptr); + } + +protected: + //! Unsubscribes all existing subscriptions + /** Lock should be acquired! + **/ + void Unsubscribe() noexcept { + if (Unsubscribed) { + return; + } + Unsubscribe(Subscriptions); + Subscriptions.clear(); + } + +private: + //! Performs a subscription to the given futures + /** Lock should not be acquired! + @param future - The futures to subscribe to + @param callback - The callback to call for each future + **/ + template <typename TFutures, typename TCallbackExecutor> + void Subscribe(TFutures const& futures, TCallbackExecutor&& executor) { + auto self = TIntrusivePtr<TDerived>(static_cast<TDerived*>(this)); + self->BeforeSubscribe(futures); + auto callback = [self = std::move(self)](const auto& future) mutable { + self->Set(future); + }; + auto subscriptions = Manager->Subscribe(futures, callback, TDerived::RevertOnSignaled, std::forward<TCallbackExecutor>(executor)); + if (subscriptions.empty()) { + return; + } + with_lock (Lock) { + if (Unsubscribed) { + Unsubscribe(subscriptions); + } else { + Subscriptions = std::move(subscriptions); + } + } + } + + void Unsubscribe(TVector<TSubscriptionId>& subscriptions) noexcept { + Manager->Unsubscribe(subscriptions); + Unsubscribed = true; + } +}; + +template <typename TWaiter, typename TFutures, typename TCallbackExecutor> +TFuture<void> Wait(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { + switch (std::size(futures)) { + case 0: + return MakeFuture(); + case 1: + return std::begin(futures)->IgnoreResult(); + default: + return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor)); + } +} + +template <typename TWaiter, typename T, typename TCallbackExecutor> +TFuture<void> Wait(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { + switch (std::size(futures)) { + case 0: + return MakeFuture(); + case 1: + return std::begin(futures)->IgnoreResult(); + default: + return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor)); + } +} + + +template <typename TWaiter, typename T, typename TCallbackExecutor> +TFuture<void> Wait(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { + return TWaiter::Make(std::initializer_list<TFuture<T> const>({ future1, future2 }), std::move(manager) + , std::forward<TCallbackExecutor>(executor)); +} + +} |