diff options
| author | kuzin57 <[email protected]> | 2026-01-26 12:22:41 +0300 |
|---|---|---|
| committer | kuzin57 <[email protected]> | 2026-01-26 12:59:27 +0300 |
| commit | 4a70eca87d0fea9a35044ebf904be96b86e2b2c8 (patch) | |
| tree | a1e1c536ca67b0dd6e7d0f29b4d5a0f9d6523384 /library/cpp/threading/future/subscription/subscription.cpp | |
| parent | 1d9c3ef1247f5847171b1afab464c01a8362c1cc (diff) | |
feat contrib: add futures subscription from contrib
<https://nda.ya.ru/t/nesU8Ssd7StzZd>
commit_hash:3b5733332bec71a4f2f26e6e878afb23dde01fe7
Diffstat (limited to 'library/cpp/threading/future/subscription/subscription.cpp')
| -rw-r--r-- | library/cpp/threading/future/subscription/subscription.cpp | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/library/cpp/threading/future/subscription/subscription.cpp b/library/cpp/threading/future/subscription/subscription.cpp new file mode 100644 index 00000000000..e3cb3052c8d --- /dev/null +++ b/library/cpp/threading/future/subscription/subscription.cpp @@ -0,0 +1,65 @@ +#include "subscription.h" + +namespace NThreading { + +bool operator==(TSubscriptionId const& l, TSubscriptionId const& r) noexcept { + return l.StateId() == r.StateId() && l.SubId() == r.SubId(); +} + +bool operator!=(TSubscriptionId const& l, TSubscriptionId const& r) noexcept { + return !(l == r); +} + +void TSubscriptionManager::TSubscription::operator()() { + Callback(); +} + +TSubscriptionManagerPtr TSubscriptionManager::NewInstance() { + return new TSubscriptionManager(); +} + +TSubscriptionManagerPtr TSubscriptionManager::Default() { + static auto instance = NewInstance(); + return instance; +} + +void TSubscriptionManager::Unsubscribe(TSubscriptionId id) { + with_lock(Lock) { + UnsubscribeImpl(id); + } +} + +void TSubscriptionManager::Unsubscribe(TVector<TSubscriptionId> const& ids) { + with_lock(Lock) { + UnsubscribeImpl(ids); + } +} + +void TSubscriptionManager::OnCallback(TFutureStateId stateId) noexcept { + THashMap<ui64, TSubscription> subscriptions; + with_lock(Lock) { + auto const it = Subscriptions.find(stateId); + Y_ABORT_UNLESS(it != Subscriptions.end(), "The callback has been triggered more than once"); + subscriptions.swap(it->second); + Subscriptions.erase(it); + } + for (auto& [_, subscription] : subscriptions) { + subscription(); + } +} + +void TSubscriptionManager::UnsubscribeImpl(TSubscriptionId id) { + auto const it = Subscriptions.find(id.StateId()); + if (it == std::end(Subscriptions)) { + return; + } + it->second.erase(id.SubId()); +} + +void TSubscriptionManager::UnsubscribeImpl(TVector<TSubscriptionId> const& ids) { + for (auto const& id : ids) { + UnsubscribeImpl(id); + } +} + +} |
