diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-07-20 13:02:44 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-07-20 13:02:44 +0300 |
commit | c4a21555e38eadd62a62113b1935ea66e190ad3c (patch) | |
tree | 2589eb474cc4532fff472ff77bacbb2c6a067706 /library/cpp | |
parent | d4381e2b1f8dd1bd89561b49b36481a01843de6b (diff) | |
download | ydb-c4a21555e38eadd62a62113b1935ea66e190ad3c.tar.gz |
Add check for node existence in IC mock,
Add check for node availability in IC mock
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/interconnect/mock/ic_mock.cpp | 102 |
1 files changed, 93 insertions, 9 deletions
diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp index 884503e6022..54d919b650d 100644 --- a/library/cpp/actors/interconnect/mock/ic_mock.cpp +++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp @@ -2,6 +2,7 @@ #include <library/cpp/actors/core/interconnect.h> #include <util/system/yield.h> #include <thread> +#include <deque> namespace NActors { @@ -122,13 +123,15 @@ namespace NActors { } void HandleForward(TAutoPtr<IEventHandle> ev) { - if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { - Subscribe(ev->Sender, ev->Cookie); - } - if (Queue.empty()) { - TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0)); + if (CheckNodeStatus(ev)) { + if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { + Subscribe(ev->Sender, ev->Cookie); + } + if (Queue.empty()) { + TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0)); + } + Queue.emplace_back(ev.Release()); } - Queue.emplace_back(ev.Release()); } void HandleRam() { @@ -140,15 +143,21 @@ namespace NActors { } void Handle(TEvInterconnect::TEvConnectNode::TPtr ev) { - Subscribe(ev->Sender, ev->Cookie); + if (CheckNodeStatus(ev)) { + Subscribe(ev->Sender, ev->Cookie); + } } void Handle(TEvents::TEvSubscribe::TPtr ev) { - Subscribe(ev->Sender, ev->Cookie); + if (CheckNodeStatus(ev)) { + Subscribe(ev->Sender, ev->Cookie); + } } void Handle(TEvents::TEvUnsubscribe::TPtr ev) { - Subscribers.erase(ev->Sender); + if (CheckNodeStatus(ev)) { + Subscribers.erase(ev->Sender); + } } void HandlePoison() { @@ -160,15 +169,90 @@ namespace NActors { hFunc(TEvInterconnect::TEvConnectNode, Handle) hFunc(TEvents::TEvSubscribe, Handle) hFunc(TEvents::TEvUnsubscribe, Handle) + hFunc(TEvInterconnect::TEvNodeInfo, HandleNodeInfo) cFunc(TEvents::TSystem::Poison, HandlePoison) cFunc(EvRam, HandleRam) ) private: + enum EPeerNodeStatus { + UNKNOWN, + EXISTS, + MISSING + }; + + bool IsWaitingForNodeInfo = false; + std::deque<std::unique_ptr<IEventHandle>> WaitingConnections; + EPeerNodeStatus PeerNodeStatus = EPeerNodeStatus::UNKNOWN; + void Subscribe(const TActorId& actorId, ui64 cookie) { Subscribers[actorId] = cookie; Send(actorId, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, cookie); } + + template <typename TEvent> + bool CheckNodeStatus(TAutoPtr<TEventHandle<TEvent>>& ev) { + if (PeerNodeStatus != EPeerNodeStatus::EXISTS) { + std::unique_ptr<IEventHandle> tmp(ev.Release()); + CheckNonexistentNode(tmp); + return false; + } + return true; + } + + bool CheckNodeStatus(TAutoPtr<IEventHandle>& ev) { + if (PeerNodeStatus != EPeerNodeStatus::EXISTS) { + std::unique_ptr<IEventHandle> tmp(ev.Release()); + CheckNonexistentNode(tmp); + return false; + } + return true; + } + + void CheckNonexistentNode(std::unique_ptr<IEventHandle>& ev) { + if (PeerNodeStatus == EPeerNodeStatus::UNKNOWN) { + WaitingConnections.emplace_back(ev.release()); + if (!IsWaitingForNodeInfo) { + Send(Proxy->Common->NameserviceId, new TEvInterconnect::TEvGetNode(Proxy->PeerNodeId)); + IsWaitingForNodeInfo = true; + } + } else if (PeerNodeStatus == EPeerNodeStatus::MISSING) { + switch (ev->GetTypeRewrite()) { + case TEvInterconnect::EvForward: + if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { + Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie); + } + TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected)); + break; + + case TEvents::TEvSubscribe::EventType: + case TEvInterconnect::TEvConnectNode::EventType: + Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie); + break; + + case TEvents::TEvUnsubscribe::EventType: + break; + + default: + Y_FAIL(); + } + } + } + + void HandleNodeInfo(TEvInterconnect::TEvNodeInfo::TPtr ev) { + Y_VERIFY(IsWaitingForNodeInfo); + if (!ev->Get()->Node) { + PeerNodeStatus = EPeerNodeStatus::MISSING; + } else { + PeerNodeStatus = EPeerNodeStatus::EXISTS; + } + IsWaitingForNodeInfo = false; + while (!WaitingConnections.empty()) { + TAutoPtr<IEventHandle> tmp(WaitingConnections.front().release()); + WaitingConnections.pop_front(); + Receive(tmp, TActivationContext::AsActorContext()); + } + } }; friend class TSessionMockActor; |