#pragma once #include "subscription.h" #include #include #include #include namespace NThreading::NPrivate { template class TWait : public TThrRefBase { private: TSubscriptionManagerPtr Manager; TVector Subscriptions; bool Unsubscribed = false; protected: TAdaptiveLock Lock; TPromise Promise; public: template static TFuture Make(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { TIntrusivePtr w(new TDerived(std::move(manager))); w->Subscribe(futures, std::forward(executor)); return w->Promise.GetFuture(); } protected: TWait(TSubscriptionManagerPtr manager) : Manager(std::move(manager)) , Subscriptions() , Unsubscribed(false) , Lock() , Promise(NewPromise()) { Y_ENSURE(Manager != nullptr); } protected: //! Unsubscribes all existing subscriptions /** Lock should be acquired! **/ void Unsubscribe() noexcept { if (Unsubscribed) { return; } Unsubscribe(Subscriptions); Subscriptions.clear(); } private: //! Performs a subscription to the given futures /** Lock should not be acquired! @param future - The futures to subscribe to @param callback - The callback to call for each future **/ template void Subscribe(TFutures const& futures, TCallbackExecutor&& executor) { auto self = TIntrusivePtr(static_cast(this)); self->BeforeSubscribe(futures); auto callback = [self = std::move(self)](const auto& future) mutable { self->Set(future); }; auto subscriptions = Manager->Subscribe(futures, callback, TDerived::RevertOnSignaled, std::forward(executor)); if (subscriptions.empty()) { return; } with_lock (Lock) { if (Unsubscribed) { Unsubscribe(subscriptions); } else { Subscriptions = std::move(subscriptions); } } } void Unsubscribe(TVector& subscriptions) noexcept { Manager->Unsubscribe(subscriptions); Unsubscribed = true; } }; template TFuture Wait(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { switch (std::size(futures)) { case 0: return MakeFuture(); case 1: return std::begin(futures)->IgnoreResult(); default: return TWaiter::Make(futures, std::move(manager), std::forward(executor)); } } template TFuture Wait(std::initializer_list const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { switch (std::size(futures)) { case 0: return MakeFuture(); case 1: return std::begin(futures)->IgnoreResult(); default: return TWaiter::Make(futures, std::move(manager), std::forward(executor)); } } template TFuture Wait(TFuture const& future1, TFuture const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { return TWaiter::Make(std::initializer_list const>({ future1, future2 }), std::move(manager) , std::forward(executor)); } }