#pragma once #if !defined(INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H) #error "you should never include subscription-inl.h directly" #endif namespace NThreading { namespace NPrivate { template TFutureStateId CheckedStateId(TFuture const& future) { auto const id = future.StateId(); if (id.Defined()) { return *id; } ythrow TFutureException() << "Future state should be initialized"; } } template inline TSubscriptionManager::TSubscription::TSubscription(TFuture future, F&& callback, TCallbackExecutor&& executor) : Callback( [future = std::move(future), callback = std::forward(callback), executor = std::forward(executor)]() mutable { executor(std::as_const(future), callback); }) { } template inline std::optional TSubscriptionManager::Subscribe(TFuture const& future, F&& callback, TCallbackExecutor&& executor) { auto stateId = NPrivate::CheckedStateId(future); with_lock(Lock) { auto const status = TrySubscribe(future, std::forward(callback), stateId, std::forward(executor)); switch (status) { case ECallbackStatus::Subscribed: return TSubscriptionId(stateId, Revision); case ECallbackStatus::ExecutedSynchronously: return {}; default: Y_ABORT("Unexpected callback status"); } } } template inline TVector TSubscriptionManager::Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled , TCallbackExecutor&& executor) { return SubscribeImpl(futures, std::forward(callback), revertOnSignaled, std::forward(executor)); } template inline TVector TSubscriptionManager::Subscribe(std::initializer_list const> futures, F&& callback , bool revertOnSignaled, TCallbackExecutor&& executor) { return SubscribeImpl(futures, std::forward(callback), revertOnSignaled, std::forward(executor)); } template inline TSubscriptionManager::ECallbackStatus TSubscriptionManager::TrySubscribe(TFuture const& future, F&& callback, TFutureStateId stateId , TCallbackExecutor&& executor) { TSubscription subscription(future, std::forward(callback), std::forward(executor)); auto const it = Subscriptions.find(stateId); auto const revision = ++Revision; if (it == std::end(Subscriptions)) { auto const success = Subscriptions.emplace(stateId, THashMap{ { revision, std::move(subscription) } }).second; Y_ABORT_UNLESS(success); auto self = TSubscriptionManagerPtr(this); future.Subscribe([self, stateId](TFuture const&) { self->OnCallback(stateId); }); if (Subscriptions.find(stateId) == std::end(Subscriptions)) { return ECallbackStatus::ExecutedSynchronously; } } else { Y_ABORT_UNLESS(it->second.emplace(revision, std::move(subscription)).second); } return ECallbackStatus::Subscribed; } template inline TVector TSubscriptionManager::SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled , TCallbackExecutor const& executor) { TVector results; results.reserve(std::size(futures)); // resolve all state ids to minimize processing under the lock for (auto const& f : futures) { results.push_back(TSubscriptionId(NPrivate::CheckedStateId(f), 0)); } with_lock(Lock) { size_t i = 0; for (auto const& f : futures) { auto& r = results[i]; auto const status = TrySubscribe(f, callback, r.StateId(), executor); switch (status) { case ECallbackStatus::Subscribed: break; case ECallbackStatus::ExecutedSynchronously: if (revertOnSignaled) { // revert results.crop(i); UnsubscribeImpl(results); return {}; } break; default: Y_ABORT("Unexpected callback status"); } r.SetSubId(Revision); ++i; } } return results; } }