diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-05-29 19:36:48 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-05-29 19:36:48 +0300 |
commit | 1f91bff6f709b6f762453f020dce570db6d2b180 (patch) | |
tree | 5b4bb56f40ae41dc9aaf455696afd4b0282582ee | |
parent | 17e7defc44375d4dc7ef3466536d5321c27f81b9 (diff) | |
download | ydb-1f91bff6f709b6f762453f020dce570db6d2b180.tar.gz |
reconnect replicas for subscriber
-rw-r--r-- | ydb/core/base/board_lookup.cpp | 270 | ||||
-rw-r--r-- | ydb/core/base/board_replica.cpp | 38 | ||||
-rw-r--r-- | ydb/core/base/board_subscriber_ut.cpp | 208 | ||||
-rw-r--r-- | ydb/core/base/statestorage.h | 9 | ||||
-rw-r--r-- | ydb/core/base/statestorage_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/protos/statestorage.proto | 2 |
6 files changed, 416 insertions, 114 deletions
diff --git a/ydb/core/base/board_lookup.cpp b/ydb/core/base/board_lookup.cpp index 28959ab861..d991cbc810 100644 --- a/ydb/core/base/board_lookup.cpp +++ b/ydb/core/base/board_lookup.cpp @@ -1,5 +1,7 @@ #include "statestorage_impl.h" #include "tabletid.h" + +#include <ydb/core/base/appdata.h> #include <ydb/core/protos/services.pb.h> #include <library/cpp/actors/core/interconnect.h> @@ -26,17 +28,38 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { const ui32 StateStorageGroupId; const bool Subscriber; + static constexpr int MAX_REPLICAS_COUNT_EXP = 32; // Replicas.size() <= 2**MAX_REPLICAS_COUNT_EXP + + struct TEvPrivate { + enum EEv { + EvReconnectReplicas = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + + struct TEvReconnectReplicas : + public TEventLocal<TEvReconnectReplicas, EEv::EvReconnectReplicas> { + ui32 ReplicaIdx; + + TEvReconnectReplicas(ui32 replicaIdx) : ReplicaIdx(replicaIdx) { + } + }; + }; + enum class EReplicaState { Unknown, NotAvailable, NoInfo, Ready, + Reconnect, }; struct TReplica { TActorId Replica; EReplicaState State = EReplicaState::Unknown; THashSet<TActorId> Infos; + bool IsScheduled = false; + ui32 ReconnectNumber = 0; + NMonotonic::TMonotonic LastReconnectAt = TMonotonic::Zero(); + TDuration CurrentDelay = TDuration::MilliSeconds(100); }; TVector<TReplica> Replicas; @@ -46,6 +69,19 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { ui32 WaitForReplicasToSuccess; + TDuration GetReconnectDelayForReplica(TReplica& replica) { + auto ret = replica.CurrentDelay; + auto newDelay = replica.CurrentDelay; + newDelay *= 2; + if (newDelay > TDuration::Seconds(5)) { + newDelay = TDuration::Seconds(5); + } + newDelay *= AppData()->RandomProvider->Uniform(100, 115); + newDelay /= 100; + replica.CurrentDelay = newDelay; + return ret; + } + struct { ui32 Replied = 0; ui32 NoInfo = 0; @@ -121,8 +157,9 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { for (auto idx : xrange(msg->Replicas.size())) { const TActorId &replica = msg->Replicas[idx]; Send(replica, - new TEvStateStorage::TEvReplicaBoardLookup(Path, TActorId(), Subscriber), - IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, idx); + new TEvStateStorage::TEvReplicaBoardLookup(Path, Subscriber), + IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, + EncodeCookie(idx, 0)); Replicas[idx].Replica = replica; Replicas[idx].State = EReplicaState::Unknown; } @@ -149,18 +186,20 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { } void Handle(TEvStateStorage::TEvReplicaBoardInfoUpdate::TPtr &ev) { - const auto &record = ev->Get()->Record; - const ui32 idx = ev->Cookie; - if (idx >= Replicas.size()) { + const auto [idx, reconnectNumber] = DecodeCookie(ev->Cookie); + + if (idx >= Replicas.size()) return; - } auto &replica = Replicas[idx]; - if (replica.State == EReplicaState::NotAvailable) { + + if (reconnectNumber != replica.ReconnectNumber) { + return; + } + if (replica.State != EReplicaState::Ready && replica.State != EReplicaState::NoInfo) { return; } - replica.State = EReplicaState::Ready; - + const auto &record = ev->Get()->Record; auto& info = record.GetInfo(); const TActorId oid = ActorIdFromProto(info.GetOwner()); @@ -174,26 +213,27 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { } if (CurrentStateFunc() == &TThis::StateSubscribe) { - TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate update; - update.Owner = oid; + std::optional<TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> update; if (info.GetDropped()) { if (!replicas.empty()) { return; } InfoReplicas.erase(oid); Info.erase(oid); - update.Dropped = true; + update = { "", true }; } else { - if (Info[oid].Payload != info.GetPayload()) { - Info[oid].Payload = info.GetPayload(); - update.Payload = std::move(info.GetPayload()); + auto& currentInfo = Info[oid]; + if (currentInfo.Payload != info.GetPayload()) { + currentInfo.Payload = info.GetPayload(); + update = { info.GetPayload(), false }; } } - - auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>( - TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); - reply->Update = std::move(update); - Send(Owner, std::move(reply)); + if (update.has_value()) { + auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>( + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + reply->Updates = { { oid, std::move(update.value()) } }; + Send(Owner, std::move(reply)); + } } else { if (info.GetDropped()) { if (!replicas.empty()) { @@ -202,20 +242,34 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { InfoReplicas.erase(oid); Info.erase(oid); } else { - Info[oid].Payload = std::move(info.GetPayload()); + Info[oid].Payload = info.GetPayload(); } } } void Handle(TEvStateStorage::TEvReplicaBoardInfo::TPtr &ev) { - const auto &record = ev->Get()->Record; - const ui32 idx = ev->Cookie; + const auto [idx, reconnectNumber] = DecodeCookie(ev->Cookie); + if (idx >= Replicas.size()) return; auto &replica = Replicas[idx]; - if (replica.State != EReplicaState::Unknown) + + if (reconnectNumber != replica.ReconnectNumber) { + return; + } + if (replica.State != EReplicaState::Unknown && replica.State != EReplicaState::Reconnect) { return; - ++Stats.Replied; + } + + const auto &record = ev->Get()->Record; + + if (replica.State == EReplicaState::Unknown) { + Stats.Replied++; + } else { + Y_VERIFY(Stats.NotAvailable); + Stats.NotAvailable--; + } + if (record.GetDropped()) { replica.State = EReplicaState::NoInfo; ++Stats.NoInfo; @@ -224,12 +278,30 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { replica.State = EReplicaState::Ready; ++Stats.HasInfo; - for (auto &x : record.GetInfo()) { + bool isStateSubscribe = (CurrentStateFunc() == &TThis::StateSubscribe); + TMap<TActorId, TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> updates; + + for (const auto &x : record.GetInfo()) { const TActorId oid = ActorIdFromProto(x.GetOwner()); - Info[oid].Payload = std::move(x.GetPayload()); + + auto& currentInfo = Info[oid]; + if (currentInfo.Payload != x.GetPayload()) { + currentInfo.Payload = x.GetPayload(); + if (isStateSubscribe) { + updates[oid] = {x.GetPayload(), false}; + } + } + InfoReplicas[oid].insert(idx); replica.Infos.insert(oid); } + + if (isStateSubscribe && !updates.empty()) { + auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>( + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + reply->Updates = std::move(updates); + Send(Owner, std::move(reply)); + } } CheckCompletion(); @@ -244,13 +316,23 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { ++Stats.Replied; } if (replica.State != EReplicaState::NotAvailable) { + if (replica.State == EReplicaState::Ready) { + Y_VERIFY(Stats.HasInfo); + Stats.HasInfo--; + } else if (replica.State == EReplicaState::NoInfo) { + Y_VERIFY(Stats.NoInfo); + Stats.NoInfo--; + } + if (replica.State != EReplicaState::Reconnect) { + ++Stats.NotAvailable; + } replica.State = EReplicaState::NotAvailable; - ++Stats.NotAvailable; } - for (auto infoId : replica.Infos) { - InfoReplicas[infoId].erase(idx); - } + ClearInfosByReplica(idx); + replica.Infos.clear(); + + ReconnectReplica(idx); } } @@ -258,24 +340,34 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { } void Handle(TEvStateStorage::TEvReplicaShutdown::TPtr &ev) { - const ui32 idx = ev->Cookie; + const auto [idx, reconnectNumber] = DecodeCookie(ev->Cookie); + if (idx >= Replicas.size()) return; auto &replica = Replicas[idx]; - Y_VERIFY(replica.Replica == ev->Sender); - if (replica.State == EReplicaState::Unknown) { - ++Stats.Replied; + if (reconnectNumber != replica.ReconnectNumber) { + return; } - if (replica.State != EReplicaState::NotAvailable) { - replica.State = EReplicaState::NotAvailable; - ++Stats.NotAvailable; + + if (replica.State != EReplicaState::Ready && replica.State != EReplicaState::NoInfo) { + return; } - for (auto infoId : replica.Infos) { - InfoReplicas[infoId].erase(idx); + if (replica.State == EReplicaState::Ready) { + Y_VERIFY(Stats.HasInfo); + --Stats.HasInfo; + } else if (replica.State == EReplicaState::NoInfo) { + Y_VERIFY(Stats.NoInfo); + --Stats.NoInfo; } + replica.State = EReplicaState::NotAvailable; + ++Stats.NotAvailable; + + ClearInfosByReplica(idx); + replica.Infos.clear(); + CheckCompletion(); } @@ -283,22 +375,103 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { auto *msg = ev->Get(); if (msg->SourceType != TEvStateStorage::TEvReplicaBoardLookup::EventType) return; - const ui32 idx = ev->Cookie; + + const auto [idx, reconnectNumber] = DecodeCookie(ev->Cookie); + if (idx >= Replicas.size()) return; auto &replica = Replicas[idx]; - if (replica.State != EReplicaState::Unknown) + + if (reconnectNumber != replica.ReconnectNumber) { return; - replica.State = EReplicaState::NotAvailable; - ++Stats.Replied; - ++Stats.NotAvailable; + } + if (replica.State != EReplicaState::Reconnect && replica.State != EReplicaState::Unknown) { + return; + } - for (auto infoId : replica.Infos) { - InfoReplicas[infoId].erase(idx); + if (replica.State == EReplicaState::Unknown) { + ++Stats.Replied; + ++Stats.NotAvailable; } + replica.State = EReplicaState::NotAvailable; + + ClearInfosByReplica(idx); + replica.Infos.clear(); + CheckCompletion(); } + + void ReconnectReplica(ui32 replicaIdx) { + auto& replica = Replicas[replicaIdx]; + + if (!Subscriber) { + return; + } + if (replica.IsScheduled || replica.State != EReplicaState::NotAvailable) { + return; + } + + auto now = TlsActivationContext->Monotonic(); + if (now - replica.LastReconnectAt < replica.CurrentDelay) { + auto at = replica.LastReconnectAt + GetReconnectDelayForReplica(replica); + replica.IsScheduled = true; + Schedule(at - now, new TEvPrivate::TEvReconnectReplicas(replicaIdx)); + return; + } + + replica.ReconnectNumber++; + replica.State = EReplicaState::Reconnect; + Send(replica.Replica, + new TEvStateStorage::TEvReplicaBoardLookup(Path, Subscriber), + IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, + EncodeCookie(replicaIdx, replica.ReconnectNumber)); + + replica.LastReconnectAt = now; + } + + void Handle(TEvPrivate::TEvReconnectReplicas::TPtr& ev) { + const auto& idx = ev->Get()->ReplicaIdx; + Replicas[idx].IsScheduled = false; + ReconnectReplica(idx); + } + + std::pair<ui64, ui64> DecodeCookie(ui64 cookie) { + return {((1ULL << MAX_REPLICAS_COUNT_EXP) - 1) & cookie, cookie >> MAX_REPLICAS_COUNT_EXP}; + } + + ui64 EncodeCookie(ui64 idx, ui64 reconnectNumber) { + return idx | (reconnectNumber << MAX_REPLICAS_COUNT_EXP); + } + + void ClearInfosByReplica(ui32 replicaIdx) { + bool isStateSubscribe = (CurrentStateFunc() == &TThis::StateSubscribe); + TMap<TActorId, TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> updates; + + const auto& replica = Replicas[replicaIdx]; + for (auto infoId : replica.Infos) { + auto infoReplicasIt = InfoReplicas.find(infoId); + if (infoReplicasIt == InfoReplicas.end()) { + continue; + } + infoReplicasIt->second.erase(replicaIdx); + if (infoReplicasIt->second.empty()) { + if (isStateSubscribe) { + auto& update = updates[infoId]; + update.Dropped = true; + } + InfoReplicas.erase(infoId); + Info.erase(infoId); + } + } + if (isStateSubscribe && !updates.empty()) { + auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>( + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + reply->Updates = std::move(updates); + Send(Owner, std::move(reply)); + } + } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::BOARD_LOOKUP_ACTOR; @@ -333,16 +506,19 @@ public: hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); hFunc(TEvStateStorage::TEvReplicaShutdown, Handle); + hFunc(TEvPrivate::TEvReconnectReplicas, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } STATEFN(StateSubscribe) { switch (ev->GetTypeRewrite()) { + hFunc(TEvStateStorage::TEvReplicaBoardInfo, Handle); hFunc(TEvStateStorage::TEvReplicaBoardInfoUpdate, Handle); hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); hFunc(TEvStateStorage::TEvReplicaShutdown, Handle); + hFunc(TEvPrivate::TEvReconnectReplicas, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } diff --git a/ydb/core/base/board_replica.cpp b/ydb/core/base/board_replica.cpp index c82e1debd8..a01260debc 100644 --- a/ydb/core/base/board_replica.cpp +++ b/ydb/core/base/board_replica.cpp @@ -31,7 +31,7 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { }; struct TPathSubscribeData { - THashMap<TActorId, ui32> Subscribers; // Subcriber -> Cookie + THashMap<TActorId, ui64> Subscribers; // Subcriber -> Cookie }; struct TSubscriber { @@ -253,6 +253,7 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { auto &record = ev->Get()->Record; const auto &path = record.GetPath(); + ui32 flags = 0; if (record.GetSubscribe()) { auto& pathSubscribeData = PathToSubscribers[path]; pathSubscribeData.Subscribers[ev->Sender] = ev->Cookie; @@ -268,37 +269,28 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { sessionsIt->second.Subscribers.insert(ev->Sender); } } - } - - ui32 flags = 0; - if (record.GetSubscribe()) { flags = IEventHandle::FlagTrackDelivery; } + std::unique_ptr<TEvStateStorage::TEvReplicaBoardInfo> reply; + auto pathIt = IndexPath.find(path); if (pathIt == IndexPath.end()) { - auto reply = std::make_unique<TEvStateStorage::TEvReplicaBoardInfo>(path, true); - auto resp = std::make_unique<IEventHandle>( - ev->Sender, ev->Recipient, reply.release(), flags, ev->Cookie); - if (ev->InterconnectSession) { - resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + reply = std::make_unique<TEvStateStorage::TEvReplicaBoardInfo>(path, true); + } else { + reply = std::make_unique<TEvStateStorage::TEvReplicaBoardInfo>(path, false); + auto *info = reply->Record.MutableInfo(); + info->Reserve(pathIt->second.size()); + for (ui32 entryIndex : pathIt->second) { + const TEntry &entry = Entries[entryIndex]; + auto *ex = info->Add(); + ActorIdToProto(entry.Owner, ex->MutableOwner()); + ex->SetPayload(entry.Payload); } - TActivationContext::Send(resp.release()); - return; - } - - auto reply = MakeHolder<TEvStateStorage::TEvReplicaBoardInfo>(path, false); - auto *info = reply->Record.MutableInfo(); - info->Reserve(pathIt->second.size()); - for (ui32 entryIndex : pathIt->second) { - const TEntry &entry = Entries[entryIndex]; - auto *ex = info->Add(); - ActorIdToProto(entry.Owner, ex->MutableOwner()); - ex->SetPayload(entry.Payload); } auto resp = std::make_unique<IEventHandle>( - ev->Sender, SelfId(), reply.Release(), flags, ev->Cookie); + ev->Sender, SelfId(), reply.release(), flags, ev->Cookie); if (ev->InterconnectSession) { resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); } diff --git a/ydb/core/base/board_subscriber_ut.cpp b/ydb/core/base/board_subscriber_ut.cpp index 82432dde09..404432b979 100644 --- a/ydb/core/base/board_subscriber_ut.cpp +++ b/ydb/core/base/board_subscriber_ut.cpp @@ -60,13 +60,23 @@ class TBoardSubscriberTest: public NUnitTest::TTestBase { Context->DispatchEvents(options); } + TVector<TActorId> ResolveReplicas() { + const TActorId proxy = MakeStateStorageProxyID(0); + const TActorId edge = Context->AllocateEdgeActor(); + + Context->Send(proxy, edge, new TEvStateStorage::TEvResolveBoard("path")); + auto ev = Context->GrabEdgeEvent<TEvStateStorage::TEvResolveReplicasList>(edge); + + auto allReplicas = ev->Get()->Replicas; + return TVector<TActorId>(allReplicas.begin(), allReplicas.end()); + } + + public: void SetUp() override { - Context = MakeHolder<TTestBasicRuntime>(2); + Context = MakeHolder<TTestBasicRuntime>(3); - for (ui32 i : xrange(Context->GetNodeCount())) { - SetupStateStorage(*Context, i, 0); - } + SetupCustomStateStorage(*Context, 3, 3, 1, 0); Context->Initialize(TAppPrepare().Unwrap()); } @@ -78,12 +88,16 @@ public: UNIT_TEST_SUITE(TBoardSubscriberTest); UNIT_TEST(SimpleSubscriber); UNIT_TEST(ManySubscribersManyPublisher); - UNIT_TEST(DisconnectReplica); + UNIT_TEST(NotAvailableByShutdown); + UNIT_TEST(ReconnectReplica); + UNIT_TEST(DropByDisconnect); UNIT_TEST_SUITE_END(); void SimpleSubscriber(); void ManySubscribersManyPublisher(); - void DisconnectReplica(); + void NotAvailableByShutdown(); + void ReconnectReplica(); + void DropByDisconnect(); private: THolder<TTestBasicRuntime> Context; @@ -109,20 +123,26 @@ void TBoardSubscriberTest::SimpleSubscriber() { auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber); UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); - const auto &update = event->Get()->Update; + const auto &updates = event->Get()->Updates; + UNIT_ASSERT(updates.size() == 1); + + auto updatesIt = updates.find(publisher); + UNIT_ASSERT(updatesIt != updates.end()); - UNIT_ASSERT(!update.Dropped); - UNIT_ASSERT_EQUAL(update.Owner, publisher); - UNIT_ASSERT_EQUAL(update.Payload, "test"); + UNIT_ASSERT(!updatesIt->second.Dropped); + UNIT_ASSERT_EQUAL(updatesIt->second.Payload, "test"); } KillPublisher(edgePublisher, publisher, 1); { auto event = Context->GrabEdgeEventIf<TEvStateStorage::TEvBoardInfoUpdate>( - {edgeSubscriber}, [edgeSubscriber](const auto& ev){ - const auto &update = ev->Get()->Update; - if (ev->Recipient == edgeSubscriber && update.Dropped) { + {edgeSubscriber}, [edgeSubscriber, publisher](const auto& ev){ + const auto &updates = ev->Get()->Updates; + UNIT_ASSERT(updates.size() == 1); + auto updatesIt = updates.find(publisher); + UNIT_ASSERT(updatesIt != updates.end()); + if (ev->Recipient == edgeSubscriber && updatesIt->second.Dropped) { return true; } return false; @@ -130,10 +150,11 @@ void TBoardSubscriberTest::SimpleSubscriber() { UNIT_ASSERT(event); UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); - const auto &update = event->Get()->Update; + const auto &updates = event->Get()->Updates; - UNIT_ASSERT(update.Dropped); - UNIT_ASSERT_EQUAL(update.Owner, publisher); + auto updatesIt = updates.find(publisher); + UNIT_ASSERT(updatesIt != updates.end()); + UNIT_ASSERT(updatesIt->second.Dropped); } } @@ -144,8 +165,8 @@ void TBoardSubscriberTest::ManySubscribersManyPublisher() { THashSet<TActorId> subscribers; for (size_t i = 0; i < subscribersCount; i++) { - const auto edgeSubscriber = Context->AllocateEdgeActor(i % 2); - CreateSubscriber("path", edgeSubscriber, i % 2); + const auto edgeSubscriber = Context->AllocateEdgeActor(i % 3); + CreateSubscriber("path", edgeSubscriber, i % 3); subscribers.insert(edgeSubscriber); { auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber); @@ -156,41 +177,162 @@ void TBoardSubscriberTest::ManySubscribersManyPublisher() { } for (size_t i = 0; i < publishersCount; i++) { - const auto edgePublisher = Context->AllocateEdgeActor(i % 2); - const auto publisher = CreatePublisher("path", ToString(i), edgePublisher, i % 2); + const auto edgePublisher = Context->AllocateEdgeActor(i % 3); + const auto publisher = CreatePublisher("path", ToString(i), edgePublisher, i % 3); for (const auto& subscriber : subscribers) { auto event = Context->GrabEdgeEventIf<TEvStateStorage::TEvBoardInfoUpdate>( {subscriber}, [publisher](const auto& ev) mutable { - if (ev->Get()->Update.Owner == publisher) { + const auto &updates = ev->Get()->Updates; + UNIT_ASSERT(updates.size() == 1); + auto updatesIt = updates.find(publisher); + if (updatesIt != updates.end()) { return true; } return false; }); UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); - UNIT_ASSERT_EQUAL(event->Get()->Update.Payload, ToString(i)); + + const auto &updates = event->Get()->Updates; + UNIT_ASSERT(updates.size() == 1); + + auto updatesIt = updates.find(publisher); + UNIT_ASSERT(updatesIt != updates.end()); + + UNIT_ASSERT_EQUAL(updatesIt->second.Payload, ToString(i)); } } } -void TBoardSubscriberTest::DisconnectReplica() { +void TBoardSubscriberTest::NotAvailableByShutdown() { - std::vector<TActorId> replicas; + auto replicas = ResolveReplicas(); + + const auto edgeSubscriber = Context->AllocateEdgeActor(1); + CreateSubscriber("path", edgeSubscriber, 1); { - const auto edgeSubscriber = Context->AllocateEdgeActor(1); - CreateSubscriber("path", edgeSubscriber, 1); + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + UNIT_ASSERT(event->Get()->InfoEntries.empty()); + } - { - auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber); - UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); - UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); - UNIT_ASSERT(event->Get()->InfoEntries.empty()); + Send(replicas[0], TActorId(), new TEvents::TEvPoisonPill(), 0, true); + Send(replicas[1], TActorId(), new TEvents::TEvPoisonPill(), 0, true); + + + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable); +} + +void TBoardSubscriberTest::ReconnectReplica() { + + const auto edgeSubscriber = Context->AllocateEdgeActor(1); + CreateSubscriber("path", edgeSubscriber, 1); + + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + UNIT_ASSERT(event->Get()->InfoEntries.empty()); + } + + Disconnect(1, 0); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvStateStorage::EvReplicaBoardInfo); + Context->DispatchEvents(options); + + Disconnect(1, 2); + + const auto edgePublisher = Context->AllocateEdgeActor(0); + const TActorId publisher = CreatePublisher("path", "test", edgePublisher, 0); + + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + const auto &updates = event->Get()->Updates; + UNIT_ASSERT(updates.size() == 1); + + auto updatesIt = updates.find(publisher); + UNIT_ASSERT(updatesIt != updates.end()); + + UNIT_ASSERT_EQUAL(updatesIt->second.Payload, "test"); + } +} + +void TBoardSubscriberTest::DropByDisconnect() { + auto replicas = ResolveReplicas(); + + const auto edgeSubscriber = Context->AllocateEdgeActor(1); + CreateSubscriber("path", edgeSubscriber, 1); + + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + UNIT_ASSERT(event->Get()->InfoEntries.empty()); + } + + auto prevObserverFunc = Context->SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvStateStorage::TEvReplicaBoardPublish::EventType: { + if (ev->Recipient != replicas[0]) { + return TTestActorRuntime::EEventAction::DROP; + } + break; + } } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + const auto edgePublisher = Context->AllocateEdgeActor(0); + const TActorId publisher = CreatePublisher("path", "test", edgePublisher, 0); + + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + const auto &updates = event->Get()->Updates; + UNIT_ASSERT(updates.size() == 1); + + auto updatesIt = updates.find(publisher); + UNIT_ASSERT(updatesIt != updates.end()); + + UNIT_ASSERT_EQUAL(updatesIt->second.Payload, "test"); + } + + Disconnect(1, 0); + + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + const auto &updates = event->Get()->Updates; + UNIT_ASSERT(updates.size() == 1); + + auto updatesIt = updates.find(publisher); + UNIT_ASSERT(updatesIt != updates.end()); + + UNIT_ASSERT(updatesIt->second.Dropped); + } - Disconnect(1, 0); + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvStateStorage::EvReplicaBoardInfo); + Context->DispatchEvents(options); + { auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber); - UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + const auto &updates = event->Get()->Updates; + UNIT_ASSERT(updates.size() == 1); + + auto updatesIt = updates.find(publisher); + UNIT_ASSERT(updatesIt != updates.end()); + + UNIT_ASSERT_EQUAL(updatesIt->second.Payload, "test"); } } diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h index f3a7c91608..b7ac8c124a 100644 --- a/ydb/core/base/statestorage.h +++ b/ydb/core/base/statestorage.h @@ -459,25 +459,18 @@ struct TEvStateStorage { struct TEvBoardInfoUpdate : public TEventLocal<TEvBoardInfoUpdate, EvBoardInfoUpdate> { struct TInfoEntryUpdate { - TActorId Owner; TString Payload; bool Dropped = false; }; const TEvBoardInfo::EStatus Status; const TString Path; - TInfoEntryUpdate Update; + TMap<TActorId,TInfoEntryUpdate> Updates; TEvBoardInfoUpdate(TEvBoardInfo::EStatus status, const TString &path) : Status(status) , Path(path) {} - - TEvBoardInfoUpdate(const TEvBoardInfoUpdate &x) - : Status(x.Status) - , Path(x.Path) - , Update(x.Update) - {} }; }; diff --git a/ydb/core/base/statestorage_impl.h b/ydb/core/base/statestorage_impl.h index 669518eea9..639108fc1e 100644 --- a/ydb/core/base/statestorage_impl.h +++ b/ydb/core/base/statestorage_impl.h @@ -292,10 +292,9 @@ struct TEvStateStorage::TEvReplicaBoardLookup : public TEventPB<TEvStateStorage: TEvReplicaBoardLookup() {} - TEvReplicaBoardLookup(const TString &path, TActorId owner, bool sub) + TEvReplicaBoardLookup(const TString &path, bool sub) { Record.SetPath(path); - ActorIdToProto(owner, Record.MutableOwner()); Record.SetSubscribe(sub); } }; diff --git a/ydb/core/protos/statestorage.proto b/ydb/core/protos/statestorage.proto index d746ca2917..7bd4a86ebe 100644 --- a/ydb/core/protos/statestorage.proto +++ b/ydb/core/protos/statestorage.proto @@ -96,7 +96,7 @@ message TEvReplicaBoardPublishAck { message TEvReplicaBoardLookup { optional string Path = 1; - optional NActorsProto.TActorId Owner = 2; + reserved 2; optional bool Subscribe = 3; }; |