1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
#include "interconnect_proxy_wrapper.h"
#include "interconnect_tcp_proxy.h"
#include <library/cpp/actors/interconnect/mock/ic_mock.h>
namespace NActors {
class TInterconnectProxyWrapper : public IActorCallback {
TIntrusivePtr<TInterconnectProxyCommon> Common;
const ui32 NodeId;
TInterconnectMock *Mock;
IActor *Proxy = nullptr;
public:
TInterconnectProxyWrapper(TIntrusivePtr<TInterconnectProxyCommon> common, ui32 nodeId, TInterconnectMock *mock)
: IActorCallback(static_cast<TReceiveFunc>(&TInterconnectProxyWrapper::StateFunc), EActivityType::INTERCONNECT_PROXY_WRAPPER)
, Common(std::move(common))
, NodeId(nodeId)
, Mock(mock)
{}
STFUNC(StateFunc) {
if (ev->GetTypeRewrite() == TEvents::TSystem::Poison && !Proxy) {
PassAway();
} else {
if (!Proxy) {
IActor *actor = Mock
? Mock->CreateProxyMock(TActivationContext::ActorSystem()->NodeId, NodeId, Common)
: new TInterconnectProxyTCP(NodeId, Common, &Proxy);
RegisterWithSameMailbox(actor);
if (Mock) {
Proxy = actor;
}
Y_ABORT_UNLESS(Proxy);
}
InvokeOtherActor(*Proxy, &IActor::Receive, ev);
}
}
};
TProxyWrapperFactory CreateProxyWrapperFactory(TIntrusivePtr<TInterconnectProxyCommon> common, ui32 poolId,
TInterconnectMock *mock) {
return [=](TActorSystem *as, ui32 nodeId) -> TActorId {
return as->Register(new TInterconnectProxyWrapper(common, nodeId, mock), TMailboxType::HTSwap, poolId);
};
}
} // NActors
|