aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2022-07-20 13:02:44 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2022-07-20 13:02:44 +0300
commitc4a21555e38eadd62a62113b1935ea66e190ad3c (patch)
tree2589eb474cc4532fff472ff77bacbb2c6a067706 /library/cpp
parentd4381e2b1f8dd1bd89561b49b36481a01843de6b (diff)
downloadydb-c4a21555e38eadd62a62113b1935ea66e190ad3c.tar.gz
Add check for node existence in IC mock,
Add check for node availability in IC mock
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/mock/ic_mock.cpp102
1 files changed, 93 insertions, 9 deletions
diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp
index 884503e6022..54d919b650d 100644
--- a/library/cpp/actors/interconnect/mock/ic_mock.cpp
+++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp
@@ -2,6 +2,7 @@
#include <library/cpp/actors/core/interconnect.h>
#include <util/system/yield.h>
#include <thread>
+#include <deque>
namespace NActors {
@@ -122,13 +123,15 @@ namespace NActors {
}
void HandleForward(TAutoPtr<IEventHandle> ev) {
- if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
- Subscribe(ev->Sender, ev->Cookie);
- }
- if (Queue.empty()) {
- TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0));
+ if (CheckNodeStatus(ev)) {
+ if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
+ Subscribe(ev->Sender, ev->Cookie);
+ }
+ if (Queue.empty()) {
+ TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0));
+ }
+ Queue.emplace_back(ev.Release());
}
- Queue.emplace_back(ev.Release());
}
void HandleRam() {
@@ -140,15 +143,21 @@ namespace NActors {
}
void Handle(TEvInterconnect::TEvConnectNode::TPtr ev) {
- Subscribe(ev->Sender, ev->Cookie);
+ if (CheckNodeStatus(ev)) {
+ Subscribe(ev->Sender, ev->Cookie);
+ }
}
void Handle(TEvents::TEvSubscribe::TPtr ev) {
- Subscribe(ev->Sender, ev->Cookie);
+ if (CheckNodeStatus(ev)) {
+ Subscribe(ev->Sender, ev->Cookie);
+ }
}
void Handle(TEvents::TEvUnsubscribe::TPtr ev) {
- Subscribers.erase(ev->Sender);
+ if (CheckNodeStatus(ev)) {
+ Subscribers.erase(ev->Sender);
+ }
}
void HandlePoison() {
@@ -160,15 +169,90 @@ namespace NActors {
hFunc(TEvInterconnect::TEvConnectNode, Handle)
hFunc(TEvents::TEvSubscribe, Handle)
hFunc(TEvents::TEvUnsubscribe, Handle)
+ hFunc(TEvInterconnect::TEvNodeInfo, HandleNodeInfo)
cFunc(TEvents::TSystem::Poison, HandlePoison)
cFunc(EvRam, HandleRam)
)
private:
+ enum EPeerNodeStatus {
+ UNKNOWN,
+ EXISTS,
+ MISSING
+ };
+
+ bool IsWaitingForNodeInfo = false;
+ std::deque<std::unique_ptr<IEventHandle>> WaitingConnections;
+ EPeerNodeStatus PeerNodeStatus = EPeerNodeStatus::UNKNOWN;
+
void Subscribe(const TActorId& actorId, ui64 cookie) {
Subscribers[actorId] = cookie;
Send(actorId, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, cookie);
}
+
+ template <typename TEvent>
+ bool CheckNodeStatus(TAutoPtr<TEventHandle<TEvent>>& ev) {
+ if (PeerNodeStatus != EPeerNodeStatus::EXISTS) {
+ std::unique_ptr<IEventHandle> tmp(ev.Release());
+ CheckNonexistentNode(tmp);
+ return false;
+ }
+ return true;
+ }
+
+ bool CheckNodeStatus(TAutoPtr<IEventHandle>& ev) {
+ if (PeerNodeStatus != EPeerNodeStatus::EXISTS) {
+ std::unique_ptr<IEventHandle> tmp(ev.Release());
+ CheckNonexistentNode(tmp);
+ return false;
+ }
+ return true;
+ }
+
+ void CheckNonexistentNode(std::unique_ptr<IEventHandle>& ev) {
+ if (PeerNodeStatus == EPeerNodeStatus::UNKNOWN) {
+ WaitingConnections.emplace_back(ev.release());
+ if (!IsWaitingForNodeInfo) {
+ Send(Proxy->Common->NameserviceId, new TEvInterconnect::TEvGetNode(Proxy->PeerNodeId));
+ IsWaitingForNodeInfo = true;
+ }
+ } else if (PeerNodeStatus == EPeerNodeStatus::MISSING) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvInterconnect::EvForward:
+ if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
+ Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie);
+ }
+ TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected));
+ break;
+
+ case TEvents::TEvSubscribe::EventType:
+ case TEvInterconnect::TEvConnectNode::EventType:
+ Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie);
+ break;
+
+ case TEvents::TEvUnsubscribe::EventType:
+ break;
+
+ default:
+ Y_FAIL();
+ }
+ }
+ }
+
+ void HandleNodeInfo(TEvInterconnect::TEvNodeInfo::TPtr ev) {
+ Y_VERIFY(IsWaitingForNodeInfo);
+ if (!ev->Get()->Node) {
+ PeerNodeStatus = EPeerNodeStatus::MISSING;
+ } else {
+ PeerNodeStatus = EPeerNodeStatus::EXISTS;
+ }
+ IsWaitingForNodeInfo = false;
+ while (!WaitingConnections.empty()) {
+ TAutoPtr<IEventHandle> tmp(WaitingConnections.front().release());
+ WaitingConnections.pop_front();
+ Receive(tmp, TActivationContext::AsActorContext());
+ }
+ }
};
friend class TSessionMockActor;