aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/subscription/subscription_ut.cpp
diff options
context:
space:
mode:
authorlexeyo <lexeyo@yandex-team.ru>2022-02-10 16:50:52 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:52 +0300
commit7bd28c29ae2a1fba7a03bcf4c658af66fe1373bf (patch)
tree125a82183c08c617e85c03a2036a11878fe21fce /library/cpp/threading/future/subscription/subscription_ut.cpp
parent00b32f5b0810b417c619169d137c29a64b54d464 (diff)
downloadydb-7bd28c29ae2a1fba7a03bcf4c658af66fe1373bf.tar.gz
Restoring authorship annotation for <lexeyo@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/future/subscription/subscription_ut.cpp')
-rw-r--r--library/cpp/threading/future/subscription/subscription_ut.cpp862
1 files changed, 431 insertions, 431 deletions
diff --git a/library/cpp/threading/future/subscription/subscription_ut.cpp b/library/cpp/threading/future/subscription/subscription_ut.cpp
index d018ea15cc..ea553ae0c6 100644
--- a/library/cpp/threading/future/subscription/subscription_ut.cpp
+++ b/library/cpp/threading/future/subscription/subscription_ut.cpp
@@ -1,432 +1,432 @@
-#include "subscription.h"
-
+#include "subscription.h"
+
#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NThreading;
-
-Y_UNIT_TEST_SUITE(TSubscriptionManagerTest) {
-
- Y_UNIT_TEST(TestSubscribeUnsignaled) {
- auto m = TSubscriptionManager::NewInstance();
- auto p = NewPromise();
-
- size_t callCount = 0;
- auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
- UNIT_ASSERT(id.has_value());
- UNIT_ASSERT_EQUAL(callCount, 0);
-
- p.SetValue();
- UNIT_ASSERT_EQUAL(callCount, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeSignaled) {
- auto m = TSubscriptionManager::NewInstance();
- auto f = MakeFuture();
-
- size_t callCount = 0;
- auto id = m->Subscribe(f, [&callCount](auto&&) { ++callCount; } );
- UNIT_ASSERT(!id.has_value());
- UNIT_ASSERT_EQUAL(callCount, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeUnsignaledAndSignaled) {
- auto m = TSubscriptionManager::NewInstance();
- auto p = NewPromise();
-
- size_t callCount1 = 0;
- auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
- UNIT_ASSERT(id1.has_value());
- UNIT_ASSERT_EQUAL(callCount1, 0);
-
- p.SetValue();
- UNIT_ASSERT_EQUAL(callCount1, 1);
-
- size_t callCount2 = 0;
- auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
- UNIT_ASSERT(!id2.has_value());
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount1, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeUnsubscribeUnsignaled) {
- auto m = TSubscriptionManager::NewInstance();
- auto p = NewPromise();
-
- size_t callCount = 0;
- auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
- UNIT_ASSERT(id.has_value());
- UNIT_ASSERT_EQUAL(callCount, 0);
-
- m->Unsubscribe(id.value());
-
- p.SetValue();
- UNIT_ASSERT_EQUAL(callCount, 0);
- }
-
- Y_UNIT_TEST(TestSubscribeUnsignaledUnsubscribeSignaled) {
- auto m = TSubscriptionManager::NewInstance();
- auto p = NewPromise();
-
- size_t callCount = 0;
- auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
- UNIT_ASSERT(id.has_value());
- UNIT_ASSERT_EQUAL(callCount, 0);
-
- p.SetValue();
- UNIT_ASSERT_EQUAL(callCount, 1);
-
- m->Unsubscribe(id.value());
- UNIT_ASSERT_EQUAL(callCount, 1);
- }
-
- Y_UNIT_TEST(TestUnsubscribeTwice) {
- auto m = TSubscriptionManager::NewInstance();
- auto p = NewPromise();
-
- size_t callCount = 0;
- auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
- UNIT_ASSERT(id.has_value());
- UNIT_ASSERT_EQUAL(callCount, 0);
-
- m->Unsubscribe(id.value());
- UNIT_ASSERT_EQUAL(callCount, 0);
- m->Unsubscribe(id.value());
- UNIT_ASSERT_EQUAL(callCount, 0);
-
- p.SetValue();
- UNIT_ASSERT_EQUAL(callCount, 0);
- }
-
- Y_UNIT_TEST(TestSubscribeOneUnsignaledManyTimes) {
- auto m = TSubscriptionManager::NewInstance();
- auto p = NewPromise();
-
- size_t callCount1 = 0;
- auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
- size_t callCount2 = 0;
- auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
- size_t callCount3 = 0;
- auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
-
- UNIT_ASSERT(id1.has_value());
- UNIT_ASSERT(id2.has_value());
- UNIT_ASSERT(id3.has_value());
- UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
- UNIT_ASSERT_UNEQUAL(id2.value(), id3.value());
- UNIT_ASSERT_UNEQUAL(id3.value(), id1.value());
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 0);
-
- p.SetValue();
- UNIT_ASSERT_EQUAL(callCount1, 1);
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount3, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeOneSignaledManyTimes) {
- auto m = TSubscriptionManager::NewInstance();
- auto f = MakeFuture();
-
- size_t callCount1 = 0;
- auto id1 = m->Subscribe(f, [&callCount1](auto&&) { ++callCount1; } );
- size_t callCount2 = 0;
- auto id2 = m->Subscribe(f, [&callCount2](auto&&) { ++callCount2; } );
- size_t callCount3 = 0;
- auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } );
-
- UNIT_ASSERT(!id1.has_value());
- UNIT_ASSERT(!id2.has_value());
- UNIT_ASSERT(!id3.has_value());
- UNIT_ASSERT_EQUAL(callCount1, 1);
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount3, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeUnsubscribeOneUnsignaledManyTimes) {
- auto m = TSubscriptionManager::NewInstance();
- auto p = NewPromise();
-
- size_t callCount1 = 0;
- auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
- size_t callCount2 = 0;
- auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
- size_t callCount3 = 0;
- auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
- size_t callCount4 = 0;
- auto id4 = m->Subscribe(p.GetFuture(), [&callCount4](auto&&) { ++callCount4; } );
-
- UNIT_ASSERT(id1.has_value());
- UNIT_ASSERT(id2.has_value());
- UNIT_ASSERT(id3.has_value());
- UNIT_ASSERT(id4.has_value());
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 0);
- UNIT_ASSERT_EQUAL(callCount4, 0);
-
- m->Unsubscribe(id3.value());
- m->Unsubscribe(id1.value());
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 0);
- UNIT_ASSERT_EQUAL(callCount4, 0);
-
- p.SetValue();
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount3, 0);
- UNIT_ASSERT_EQUAL(callCount4, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeManyUnsignaled) {
- auto m = TSubscriptionManager::NewInstance();
- auto p1 = NewPromise<int>();
- auto p2 = NewPromise<int>();
-
- size_t callCount1 = 0;
- auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
- size_t callCount2 = 0;
- auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
- size_t callCount3 = 0;
- auto id3 = m->Subscribe(p1.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
-
- UNIT_ASSERT(id1.has_value());
- UNIT_ASSERT(id2.has_value());
- UNIT_ASSERT(id3.has_value());
- UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
- UNIT_ASSERT_UNEQUAL(id2.value(), id3.value());
- UNIT_ASSERT_UNEQUAL(id3.value(), id1.value());
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 0);
-
- p1.SetValue(33);
- UNIT_ASSERT_EQUAL(callCount1, 1);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 1);
-
- p2.SetValue(111);
- UNIT_ASSERT_EQUAL(callCount1, 1);
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount3, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeManySignaled) {
- auto m = TSubscriptionManager::NewInstance();
- auto f1 = MakeFuture(0);
- auto f2 = MakeFuture(1);
-
- size_t callCount1 = 0;
- auto id1 = m->Subscribe(f1, [&callCount1](auto&&) { ++callCount1; } );
- size_t callCount2 = 0;
- auto id2 = m->Subscribe(f2, [&callCount2](auto&&) { ++callCount2; } );
- size_t callCount3 = 0;
- auto id3 = m->Subscribe(f2, [&callCount3](auto&&) { ++callCount3; } );
-
- UNIT_ASSERT(!id1.has_value());
- UNIT_ASSERT(!id2.has_value());
- UNIT_ASSERT(!id3.has_value());
- UNIT_ASSERT_EQUAL(callCount1, 1);
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount3, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeManyMixed) {
- auto m = TSubscriptionManager::NewInstance();
- auto p1 = NewPromise<int>();
- auto p2 = NewPromise<int>();
- auto f = MakeFuture(42);
-
- size_t callCount1 = 0;
- auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
- size_t callCount2 = 0;
- auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
- size_t callCount3 = 0;
- auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } );
-
- UNIT_ASSERT(id1.has_value());
- UNIT_ASSERT(id2.has_value());
- UNIT_ASSERT(!id3.has_value());
- UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 1);
-
- p1.SetValue(45);
- UNIT_ASSERT_EQUAL(callCount1, 1);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 1);
-
- p2.SetValue(-7);
- UNIT_ASSERT_EQUAL(callCount1, 1);
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount3, 1);
- }
-
- Y_UNIT_TEST(TestSubscribeUnsubscribeMany) {
- auto m = TSubscriptionManager::NewInstance();
- auto p1 = NewPromise<int>();
- auto p2 = NewPromise<int>();
- auto p3 = NewPromise<int>();
-
- size_t callCount1 = 0;
- auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
- size_t callCount2 = 0;
- auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
- size_t callCount3 = 0;
- auto id3 = m->Subscribe(p3.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
- size_t callCount4 = 0;
- auto id4 = m->Subscribe(p2.GetFuture(), [&callCount4](auto&&) { ++callCount4; } );
- size_t callCount5 = 0;
- auto id5 = m->Subscribe(p1.GetFuture(), [&callCount5](auto&&) { ++callCount5; } );
-
- UNIT_ASSERT(id1.has_value());
- UNIT_ASSERT(id2.has_value());
- UNIT_ASSERT(id3.has_value());
- UNIT_ASSERT(id4.has_value());
- UNIT_ASSERT(id5.has_value());
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 0);
- UNIT_ASSERT_EQUAL(callCount4, 0);
- UNIT_ASSERT_EQUAL(callCount5, 0);
-
- m->Unsubscribe(id1.value());
- p1.SetValue(-1);
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 0);
- UNIT_ASSERT_EQUAL(callCount3, 0);
- UNIT_ASSERT_EQUAL(callCount4, 0);
- UNIT_ASSERT_EQUAL(callCount5, 1);
-
- m->Unsubscribe(id4.value());
- p2.SetValue(23);
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount3, 0);
- UNIT_ASSERT_EQUAL(callCount4, 0);
- UNIT_ASSERT_EQUAL(callCount5, 1);
-
- p3.SetValue(100500);
- UNIT_ASSERT_EQUAL(callCount1, 0);
- UNIT_ASSERT_EQUAL(callCount2, 1);
- UNIT_ASSERT_EQUAL(callCount3, 1);
- UNIT_ASSERT_EQUAL(callCount4, 0);
- UNIT_ASSERT_EQUAL(callCount5, 1);
- }
-
- Y_UNIT_TEST(TestBulkSubscribeManyUnsignaled) {
- auto m = TSubscriptionManager::NewInstance();
- auto p1 = NewPromise<int>();
- auto p2 = NewPromise<int>();
-
- size_t callCount = 0;
- auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), p1.GetFuture() }, [&callCount](auto&&) { ++callCount; });
-
- UNIT_ASSERT_EQUAL(ids.size(), 3);
- UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
- UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
- UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
- UNIT_ASSERT_EQUAL(callCount, 0);
-
- p1.SetValue(33);
- UNIT_ASSERT_EQUAL(callCount, 2);
-
- p2.SetValue(111);
- UNIT_ASSERT_EQUAL(callCount, 3);
- }
-
- Y_UNIT_TEST(TestBulkSubscribeManySignaledNoRevert) {
- auto m = TSubscriptionManager::NewInstance();
- auto f1 = MakeFuture(0);
- auto f2 = MakeFuture(1);
-
- size_t callCount = 0;
- auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; });
-
- UNIT_ASSERT_EQUAL(ids.size(), 3);
- UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
- UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
- UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
- UNIT_ASSERT_EQUAL(callCount, 3);
- }
-
- Y_UNIT_TEST(TestBulkSubscribeManySignaledRevert) {
- auto m = TSubscriptionManager::NewInstance();
- auto f1 = MakeFuture(0);
- auto f2 = MakeFuture(1);
-
- size_t callCount = 0;
- auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; }, true);
-
- UNIT_ASSERT(ids.empty());
- UNIT_ASSERT_EQUAL(callCount, 1);
- }
-
- Y_UNIT_TEST(TestBulkSubscribeManyMixedNoRevert) {
- auto m = TSubscriptionManager::NewInstance();
- auto p1 = NewPromise<int>();
- auto p2 = NewPromise<int>();
- auto f = MakeFuture(42);
-
- size_t callCount = 0;
- auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), f }, [&callCount](auto&&) { ++callCount; } );
-
- UNIT_ASSERT_EQUAL(ids.size(), 3);
- UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
- UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
- UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
- UNIT_ASSERT_EQUAL(callCount, 1);
-
- p1.SetValue(45);
- UNIT_ASSERT_EQUAL(callCount, 2);
-
- p2.SetValue(-7);
- UNIT_ASSERT_EQUAL(callCount, 3);
- }
-
- Y_UNIT_TEST(TestBulkSubscribeManyMixedRevert) {
- auto m = TSubscriptionManager::NewInstance();
- auto p1 = NewPromise();
- auto p2 = NewPromise();
- auto f = MakeFuture();
-
- size_t callCount = 0;
- auto ids = m->Subscribe({ p1.GetFuture(), f, p2.GetFuture() }, [&callCount](auto&&) { ++callCount; }, true);
-
- UNIT_ASSERT(ids.empty());
- UNIT_ASSERT_EQUAL(callCount, 1);
-
- p1.SetValue();
- p2.SetValue();
- UNIT_ASSERT_EQUAL(callCount, 1);
- }
-
- Y_UNIT_TEST(TestBulkSubscribeUnsubscribeMany) {
- auto m = TSubscriptionManager::NewInstance();
- auto p1 = NewPromise<int>();
- auto p2 = NewPromise<int>();
- auto p3 = NewPromise<int>();
-
- size_t callCount = 0;
- auto ids = m->Subscribe(
- TVector<TFuture<int>>{ p1.GetFuture(), p2.GetFuture(), p3.GetFuture(), p2.GetFuture(), p1.GetFuture() }
- , [&callCount](auto&&) { ++callCount; } );
-
- UNIT_ASSERT_EQUAL(ids.size(), 5);
- UNIT_ASSERT_EQUAL(callCount, 0);
-
- m->Unsubscribe(TVector<TSubscriptionId>{ ids[0], ids[3] });
- UNIT_ASSERT_EQUAL(callCount, 0);
-
- p1.SetValue(-1);
- UNIT_ASSERT_EQUAL(callCount, 1);
-
- p2.SetValue(23);
- UNIT_ASSERT_EQUAL(callCount, 2);
-
- p3.SetValue(100500);
- UNIT_ASSERT_EQUAL(callCount, 3);
- }
-}
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TSubscriptionManagerTest) {
+
+ Y_UNIT_TEST(TestSubscribeUnsignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeSignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f = MakeFuture();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(f, [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(!id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsignaledAndSignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ UNIT_ASSERT(!id2.has_value());
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsubscribeUnsignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ m->Unsubscribe(id.value());
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 0);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsignaledUnsubscribeSignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 1);
+
+ m->Unsubscribe(id.value());
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestUnsubscribeTwice) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ m->Unsubscribe(id.value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+ m->Unsubscribe(id.value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 0);
+ }
+
+ Y_UNIT_TEST(TestSubscribeOneUnsignaledManyTimes) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(id3.has_value());
+ UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+ UNIT_ASSERT_UNEQUAL(id2.value(), id3.value());
+ UNIT_ASSERT_UNEQUAL(id3.value(), id1.value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeOneSignaledManyTimes) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f = MakeFuture();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(f, [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(f, [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(!id1.has_value());
+ UNIT_ASSERT(!id2.has_value());
+ UNIT_ASSERT(!id3.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsubscribeOneUnsignaledManyTimes) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+ size_t callCount4 = 0;
+ auto id4 = m->Subscribe(p.GetFuture(), [&callCount4](auto&&) { ++callCount4; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(id3.has_value());
+ UNIT_ASSERT(id4.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+
+ m->Unsubscribe(id3.value());
+ m->Unsubscribe(id1.value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeManyUnsignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(p1.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(id3.has_value());
+ UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+ UNIT_ASSERT_UNEQUAL(id2.value(), id3.value());
+ UNIT_ASSERT_UNEQUAL(id3.value(), id1.value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+
+ p1.SetValue(33);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+
+ p2.SetValue(111);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeManySignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f1 = MakeFuture(0);
+ auto f2 = MakeFuture(1);
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(f1, [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(f2, [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(f2, [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(!id1.has_value());
+ UNIT_ASSERT(!id2.has_value());
+ UNIT_ASSERT(!id3.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeManyMixed) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(!id3.has_value());
+ UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+
+ p1.SetValue(45);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+
+ p2.SetValue(-7);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsubscribeMany) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto p3 = NewPromise<int>();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(p3.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+ size_t callCount4 = 0;
+ auto id4 = m->Subscribe(p2.GetFuture(), [&callCount4](auto&&) { ++callCount4; } );
+ size_t callCount5 = 0;
+ auto id5 = m->Subscribe(p1.GetFuture(), [&callCount5](auto&&) { ++callCount5; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(id3.has_value());
+ UNIT_ASSERT(id4.has_value());
+ UNIT_ASSERT(id5.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+ UNIT_ASSERT_EQUAL(callCount5, 0);
+
+ m->Unsubscribe(id1.value());
+ p1.SetValue(-1);
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+ UNIT_ASSERT_EQUAL(callCount5, 1);
+
+ m->Unsubscribe(id4.value());
+ p2.SetValue(23);
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+ UNIT_ASSERT_EQUAL(callCount5, 1);
+
+ p3.SetValue(100500);
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+ UNIT_ASSERT_EQUAL(callCount5, 1);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManyUnsignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), p1.GetFuture() }, [&callCount](auto&&) { ++callCount; });
+
+ UNIT_ASSERT_EQUAL(ids.size(), 3);
+ UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+ UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+ UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p1.SetValue(33);
+ UNIT_ASSERT_EQUAL(callCount, 2);
+
+ p2.SetValue(111);
+ UNIT_ASSERT_EQUAL(callCount, 3);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManySignaledNoRevert) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f1 = MakeFuture(0);
+ auto f2 = MakeFuture(1);
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; });
+
+ UNIT_ASSERT_EQUAL(ids.size(), 3);
+ UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+ UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+ UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+ UNIT_ASSERT_EQUAL(callCount, 3);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManySignaledRevert) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f1 = MakeFuture(0);
+ auto f2 = MakeFuture(1);
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; }, true);
+
+ UNIT_ASSERT(ids.empty());
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManyMixedNoRevert) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), f }, [&callCount](auto&&) { ++callCount; } );
+
+ UNIT_ASSERT_EQUAL(ids.size(), 3);
+ UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+ UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+ UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+ UNIT_ASSERT_EQUAL(callCount, 1);
+
+ p1.SetValue(45);
+ UNIT_ASSERT_EQUAL(callCount, 2);
+
+ p2.SetValue(-7);
+ UNIT_ASSERT_EQUAL(callCount, 3);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManyMixedRevert) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise();
+ auto p2 = NewPromise();
+ auto f = MakeFuture();
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ p1.GetFuture(), f, p2.GetFuture() }, [&callCount](auto&&) { ++callCount; }, true);
+
+ UNIT_ASSERT(ids.empty());
+ UNIT_ASSERT_EQUAL(callCount, 1);
+
+ p1.SetValue();
+ p2.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeUnsubscribeMany) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto p3 = NewPromise<int>();
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe(
+ TVector<TFuture<int>>{ p1.GetFuture(), p2.GetFuture(), p3.GetFuture(), p2.GetFuture(), p1.GetFuture() }
+ , [&callCount](auto&&) { ++callCount; } );
+
+ UNIT_ASSERT_EQUAL(ids.size(), 5);
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ m->Unsubscribe(TVector<TSubscriptionId>{ ids[0], ids[3] });
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p1.SetValue(-1);
+ UNIT_ASSERT_EQUAL(callCount, 1);
+
+ p2.SetValue(23);
+ UNIT_ASSERT_EQUAL(callCount, 2);
+
+ p3.SetValue(100500);
+ UNIT_ASSERT_EQUAL(callCount, 3);
+ }
+}