aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-03-09 12:10:01 +0300
committerxenoxeno <xeno@ydb.tech>2023-03-09 12:10:01 +0300
commitad607bb887619f321dec03b02df8220e01b7f5aa (patch)
tree7d5c87352cbe835b56bb2bdac93b37cbdf8ead21 /library/cpp/actors/interconnect
parent6324d075a5e80b6943b5de6b465b775050fe83df (diff)
downloadydb-ad607bb887619f321dec03b02df8220e01b7f5aa.tar.gz
light events for actor system
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r--library/cpp/actors/interconnect/event_filter.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h10
-rw-r--r--library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_resolve.cpp6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp26
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp10
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h23
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h12
-rw-r--r--library/cpp/actors/interconnect/load.cpp2
-rw-r--r--library/cpp/actors/interconnect/mock/ic_mock.cpp67
-rw-r--r--library/cpp/actors/interconnect/packet.cpp25
-rw-r--r--library/cpp/actors/interconnect/packet.h10
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp2
-rw-r--r--library/cpp/actors/interconnect/types.h2
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/interconnect_ut.cpp38
-rw-r--r--library/cpp/actors/interconnect/ut/poller_actor_ut.cpp2
20 files changed, 165 insertions, 94 deletions
diff --git a/library/cpp/actors/interconnect/event_filter.h b/library/cpp/actors/interconnect/event_filter.h
index 47dabf5f16..de3fdc2a04 100644
--- a/library/cpp/actors/interconnect/event_filter.h
+++ b/library/cpp/actors/interconnect/event_filter.h
@@ -31,7 +31,7 @@ namespace NActors {
evSpaceIndex[subtype] = routes;
}
- bool CheckIncomingEvent(const IEventHandle& ev, const TScopeId& localScopeId) const {
+ bool CheckIncomingEvent(const IEventHandleFat& ev, const TScopeId& localScopeId) const {
TRouteMask routes = 0;
if (const auto& evSpaceIndex = ScopeRoutes[ev.Type >> 16]) {
const ui16 subtype = ev.Type & 65535;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 312eff2666..ce4adcdec2 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -59,10 +59,12 @@ namespace NActors {
TEventHolder& event = Pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));
OutputQueueSize += bytes;
- if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
- event.Span
- .Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
- .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
+ if (ev.IsEventFat()) {
+ if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
+ event.Span
+ .Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
+ .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
+ }
}
return std::make_pair(bytes, &event);
}
diff --git a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
index a450d16871..e917a27ea7 100644
--- a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
+++ b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
@@ -18,7 +18,7 @@ namespace NActors {
, Mock(mock)
{}
- STFUNC(StateFunc) {
+ STATEFN(StateFunc) {
if (ev->GetTypeRewrite() == TEvents::TSystem::Poison && !Proxy) {
PassAway();
} else {
@@ -32,7 +32,7 @@ namespace NActors {
}
Y_VERIFY(Proxy);
}
- InvokeOtherActor(*Proxy, &IActor::Receive, ev, ctx);
+ InvokeOtherActor(*Proxy, &IActor::Receive, ev);
}
}
};
diff --git a/library/cpp/actors/interconnect/interconnect_resolve.cpp b/library/cpp/actors/interconnect/interconnect_resolve.cpp
index d638ff830c..0b0b112628 100644
--- a/library/cpp/actors/interconnect/interconnect_resolve.cpp
+++ b/library/cpp/actors/interconnect/interconnect_resolve.cpp
@@ -120,7 +120,7 @@ namespace NActors {
LOG_DEBUG_IC("ICR03", "Host: %s, RESOLVED address", Host.c_str());
auto reply = new TEvAddressInfo;
reply->Address = std::move(addr);
- TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply));
+ TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply));
PassAway();
}
@@ -129,7 +129,7 @@ namespace NActors {
auto reply = std::make_unique<TEvLocalNodeInfo>();
reply->NodeId = *NodeId;
reply->Addresses = std::move(addresses);
- TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply.release()));
+ TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply.release()));
PassAway();
}
@@ -138,7 +138,7 @@ namespace NActors {
auto *event = new TEvResolveError;
event->Explain = errorText;
event->Host = Host;
- TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, event));
+ TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, event));
PassAway();
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index b1212b8914..44abbb06bf 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -330,7 +330,31 @@ namespace NActors {
TEventSerializationInfo serializationInfo{
.IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat),
};
- auto ev = std::make_unique<IEventHandle>(SessionId,
+ auto es = GetEventSpace(descr.Type);
+ if (es < TEventFactories::EventFactories.size() && TEventFactories::EventFactories[es] != nullptr) {
+ const auto& estvec(*TEventFactories::EventFactories[es]);
+ auto est = GetEventSubType(descr.Type);
+ if (est < estvec.size() && estvec[est] != nullptr) {
+ IEventFactory* factory = estvec[est];
+ TAutoPtr<IEventHandle> ev = factory->Construct({
+ .Session = SessionId,
+ .Type = descr.Type,
+ .Flags = descr.Flags,
+ .Recipient = descr.Recipient,
+ .Sender = descr.Sender,
+ .Cookie = descr.Cookie,
+ .OriginScopeId = Params.PeerScopeId,
+ .TraceId = std::move(descr.TraceId),
+ .Data = std::move(data),
+ });
+ if (ev) {
+ TActivationContext::Send(ev);
+ }
+ return;
+ }
+ }
+
+ auto ev = std::make_unique<IEventHandleFat>(SessionId,
descr.Type,
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 3edded47d6..8ee93dfc7a 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -47,7 +47,7 @@ namespace NActors {
void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) {
if (!DynamicPtr) {
// perform usual bootstrap for static nodes
- sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
+ sys->Send(new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
}
if (const auto& mon = Common->RegisterMonPage) {
TString path = Sprintf("peer%04" PRIu32, PeerNodeId);
@@ -509,7 +509,7 @@ namespace NActors {
if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie);
}
- TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
break;
case TEvInterconnect::TEvConnectNode::EventType:
@@ -591,7 +591,7 @@ namespace NActors {
// we have found cancellation request for the pending handshake request; so simply remove it from the
// deque, as we are not interested in failure reason; must likely it happens because of handshake timeout
if (pendingEvent->GetTypeRewrite() == TEvHandshakeFail::EventType) {
- TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(pendingEvent.Release()));
+ TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(pendingEvent.Release()));
LogHandshakeFail(tmp, true);
}
PendingIncomingHandshakeEvents.erase(it);
@@ -605,7 +605,7 @@ namespace NActors {
Y_VERIFY(Session && SessionID);
ValidateEvent(ev, "ForwardSessionEventToSession");
- InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID));
+ InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev);
}
void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) {
@@ -774,7 +774,7 @@ namespace NActors {
for (auto& ev : PendingIncomingHandshakeEvents) {
Send(ev->Sender, new TEvents::TEvPoisonPill);
if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) {
- TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release()));
+ TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(ev.Release()));
LogHandshakeFail(tmp, true);
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 71edfccbe2..ebf02c3f27 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -63,10 +63,10 @@ namespace NActors {
TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr = nullptr);
- STFUNC(StateInit) {
+ STATEFN(StateInit) {
Bootstrap();
if (ev->Type != TEvents::TSystem::Bootstrap) { // for dynamic nodes we do not receive Bootstrap event
- Receive(ev, ctx);
+ Receive(ev);
}
}
@@ -180,7 +180,7 @@ namespace NActors {
} else if (DynamicPtr) {
PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15);
if (!PassAwayScheduled) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(),
{}, nullptr, 0));
PassAwayScheduled = true;
}
@@ -205,7 +205,7 @@ namespace NActors {
if (now >= PassAwayTimestamp) {
PassAway();
} else if (PassAwayTimestamp != TMonotonic::Max()) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(),
{}, nullptr, 0));
} else {
PassAwayScheduled = false;
@@ -370,6 +370,21 @@ namespace NActors {
ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func);
}
+ void ValidateEvent(IEventHandle* ev, const char* func) {
+ if (SelfId().NodeId() == PeerNodeId) {
+ TString msg = Sprintf("Event Type# 0x%08" PRIx32 " TypeRewrite# 0x%08" PRIx32
+ " from Sender# %s sent to the proxy for the node itself via Interconnect;"
+ " THIS IS NOT A BUG IN INTERCONNECT, check the event sender instead",
+ ev->Type, ev->GetTypeRewrite(), ev->Sender.ToString().data());
+ LOG_ERROR_IC("ICP03", "%s", msg.data());
+ Y_VERIFY_DEBUG(false, "%s", msg.data());
+ }
+
+ Y_VERIFY(ev->GetTypeRewrite() != TEvInterconnect::EvForward || ev->Recipient.NodeId() == PeerNodeId,
+ "Recipient/Proxy NodeId mismatch Recipient# %s Type# 0x%08" PRIx32 " PeerNodeId# %" PRIu32 " Func# %s",
+ ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func);
+ }
+
// Common with helpers
// All proxy actors share the same information in the object
// read only
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
index aad8677ca4..316c233af3 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
@@ -25,7 +25,7 @@ namespace NActors {
}
TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
- return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
+ return new IEventHandleFat(self, parentId, new TEvents::TEvBootstrap, 0);
}
void TInterconnectListenerTCP::Die(const TActorContext& ctx) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index dfc4d411d3..3d6d18d274 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -112,8 +112,8 @@ namespace NActors {
Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
}
- void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
- Proxy->ValidateEvent(ev, "Forward");
+ void TInterconnectSessionTCP::Forward(TAutoPtr<IEventHandle>& ev) {
+ Proxy->ValidateEvent(ev.Get(), "Forward");
LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
++MessagesGot;
@@ -126,7 +126,7 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev);
+ const auto [dataSize, event] = oChannel.Push(*ev.Get());
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
@@ -167,7 +167,7 @@ namespace NActors {
} else if (!RamInQueue) {
Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
RamInQueue = new TEvRam(true);
- auto *ev = new IEventHandle(SelfId(), {}, RamInQueue);
+ auto *ev = new IEventHandleFat(SelfId(), {}, RamInQueue);
const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod;
if (batchPeriod != TDuration()) {
TActivationContext::Schedule(batchPeriod, ev);
@@ -179,7 +179,7 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
+ void TInterconnectSessionTCP::Subscribe(TAutoPtr<IEventHandle>& ev) {
LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
if (inserted) {
@@ -190,7 +190,7 @@ namespace NActors {
Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie);
}
- void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
+ void TInterconnectSessionTCP::Unsubscribe(TEvents::TEvUnsubscribe::TPtr ev) {
LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 598a5c9220..9d8bb90ecd 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -347,16 +347,16 @@ namespace NActors {
void Terminate(TDisconnectReason reason);
void PassAway() override;
- void Forward(STATEFN_SIG);
- void Subscribe(STATEFN_SIG);
- void Unsubscribe(STATEFN_SIG);
+ void Forward(LIGHTFN_SIG);
+ void Subscribe(TAutoPtr<IEventHandle>& ev);
+ void Unsubscribe(TEvents::TEvUnsubscribe::TPtr);
- STRICT_STFUNC(StateFunc,
+ STRICT_LIGHTFN(StateFunc,
fFunc(TEvInterconnect::EvForward, Forward)
cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison)
fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe)
fFunc(TEvents::TEvSubscribe::EventType, Subscribe)
- fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe)
+ hFunc(TEvents::TEvUnsubscribe, Unsubscribe)
cFunc(TEvFlush::EventType, HandleFlush)
hFunc(TEvPollerReady, Handle)
hFunc(TEvPollerRegisterResult, Handle)
@@ -531,7 +531,7 @@ namespace NActors {
auto sender = SelfId();
const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
auto ev = new TEvSessionBufferSizeRequest();
- return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
+ return new IEventHandleFat(recp, sender, ev, IEventHandle::FlagTrackDelivery);
};
RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);
Become(&TInterconnectSessionKiller::StateFunc);
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp
index d460903f35..65e1d202ff 100644
--- a/library/cpp/actors/interconnect/load.cpp
+++ b/library/cpp/actors/interconnect/load.cpp
@@ -82,7 +82,7 @@ namespace NInterconnect {
)
void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
- ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]));
+ ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]).Release());
if (++SlaveIndex == Slaves.size()) {
SlaveIndex = 0;
}
diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp
index fb96a7f2ea..5619f0f113 100644
--- a/library/cpp/actors/interconnect/mock/ic_mock.cpp
+++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp
@@ -60,11 +60,11 @@ namespace NActors {
TPeerInfo *peer = GetPeer(peerNodeId);
auto guard = TReadGuard(peer->Mutex);
if (peer->ActorSystem) {
- peer->ActorSystem->Send(new IEventHandle(peer->ProxyId, TActorId(), new TEvInject(std::move(messages),
+ peer->ActorSystem->Send(new IEventHandleFat(peer->ProxyId, TActorId(), new TEvInject(std::move(messages),
originScopeId, senderSessionId)));
} else {
for (auto&& ev : messages) {
- TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
}
}
}
@@ -78,7 +78,7 @@ namespace NActors {
TPeerInfo *peer = GetPeer(peerNodeId);
auto guard = TReadGuard(peer->Mutex);
if (peer->ActorSystem) {
- peer->ActorSystem->Send(new IEventHandle(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0));
+ peer->ActorSystem->Send(new IEventHandleFat(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0));
}
}
@@ -114,7 +114,7 @@ namespace NActors {
void Terminate() {
for (auto&& ev : std::exchange(Queue, {})) {
- TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
}
for (const auto& kv : Subscribers) {
Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
@@ -130,7 +130,7 @@ namespace NActors {
Subscribe(ev->Sender, ev->Cookie);
}
if (Queue.empty()) {
- TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0));
+ TActivationContext::Send(new IEventHandleFat(EvRam, 0, SelfId(), {}, {}, 0));
}
Queue.emplace_back(ev.Release());
}
@@ -193,7 +193,7 @@ namespace NActors {
}
template <typename TEvent>
- bool CheckNodeStatus(TAutoPtr<TEventHandle<TEvent>>& ev) {
+ bool CheckNodeStatus(TAutoPtr<TEventHandleFat<TEvent>>& ev) {
if (PeerNodeStatus != EPeerNodeStatus::EXISTS) {
std::unique_ptr<IEventHandle> tmp(ev.Release());
CheckNonexistentNode(tmp);
@@ -201,7 +201,7 @@ namespace NActors {
}
return true;
}
-
+
bool CheckNodeStatus(TAutoPtr<IEventHandle>& ev) {
if (PeerNodeStatus != EPeerNodeStatus::EXISTS) {
std::unique_ptr<IEventHandle> tmp(ev.Release());
@@ -210,7 +210,7 @@ namespace NActors {
}
return true;
}
-
+
void CheckNonexistentNode(std::unique_ptr<IEventHandle>& ev) {
if (PeerNodeStatus == EPeerNodeStatus::UNKNOWN) {
WaitingConnections.emplace_back(ev.release());
@@ -224,15 +224,15 @@ namespace NActors {
if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie);
}
- TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
break;
-
- case TEvents::TEvSubscribe::EventType:
+
+ case TEvents::TEvSubscribe::EventType:
case TEvInterconnect::TEvConnectNode::EventType:
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie);
break;
- case TEvents::TEvUnsubscribe::EventType:
+ case TEvents::TEvUnsubscribe::EventType:
break;
default:
@@ -252,7 +252,7 @@ namespace NActors {
while (!WaitingConnections.empty()) {
TAutoPtr<IEventHandle> tmp(WaitingConnections.front().release());
WaitingConnections.pop_front();
- Receive(tmp, TActivationContext::AsActorContext());
+ Receive(tmp);
}
}
};
@@ -287,20 +287,28 @@ namespace NActors {
return; // drop messages from other sessions
}
if (auto *session = GetSession()) {
- for (auto&& ev : ev->Get()->Messages) {
- auto fw = std::make_unique<IEventHandle>(
- session->SelfId(),
- ev->Type,
- ev->Flags & ~IEventHandle::FlagForwardOnNondelivery,
- ev->Recipient,
- ev->Sender,
- ev->ReleaseChainBuffer(),
- ev->Cookie,
- msg->OriginScopeId,
- std::move(ev->TraceId)
- );
- if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
- TActivationContext::Send(fw.release());
+ for (auto&& evb : ev->Get()->Messages) {
+ if (ev->IsEventLight()) {
+ // TODO(xenoxeno):
+ //if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
+ TActivationContext::Send(evb.release());
+ //}
+ } else {
+ auto* ev = IEventHandleFat::GetFat(evb);
+ auto fw = std::make_unique<IEventHandleFat>(
+ session->SelfId(),
+ ev->Type,
+ ev->Flags & ~IEventHandle::FlagForwardOnNondelivery,
+ ev->Recipient,
+ ev->Sender,
+ ev->ReleaseChainBuffer(),
+ ev->Cookie,
+ msg->OriginScopeId,
+ std::move(ev->TraceId)
+ );
+ if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
+ TActivationContext::Send(fw.release());
+ }
}
}
}
@@ -322,8 +330,7 @@ namespace NActors {
void HandleSessionEvent(TAutoPtr<IEventHandle> ev) {
auto *session = GetSession();
- InvokeOtherActor(*session, &TSessionMockActor::Receive, ev,
- TActivationContext::ActorContextFor(session->SelfId()));
+ InvokeOtherActor(*session, &TSessionMockActor::Receive, ev);
}
void Disconnect() {
@@ -344,7 +351,7 @@ namespace NActors {
return State.Inject(PeerNodeId, std::move(messages), Common->LocalScopeId, Session->SessionId);
}
- STRICT_STFUNC(StateFunc,
+ STRICT_LIGHTFN(StateFunc,
cFunc(TEvents::TSystem::Poison, PassAway)
fFunc(TEvInterconnect::EvForward, HandleSessionEvent)
fFunc(TEvInterconnect::EvConnectNode, HandleSessionEvent)
diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp
index 9ba173e330..7b0bdf3e17 100644
--- a/library/cpp/actors/interconnect/packet.cpp
+++ b/library/cpp/actors/interconnect/packet.cpp
@@ -17,14 +17,25 @@ ui32 TEventHolder::Fill(IEventHandle& ev) {
EventActuallySerialized = 0;
Descr.Checksum = 0;
- if (ev.HasBuffer()) {
- Buffer = ev.ReleaseChainBuffer();
- EventSerializedSize = Buffer->GetSize();
- } else if (ev.HasEvent()) {
- Event.Reset(ev.ReleaseBase());
- EventSerializedSize = Event->CalculateSerializedSize();
+ if (ev.IsEventLight()) {
+ if (ev.IsEventSerializable()) {
+ NActors::IEventHandleLightSerializable& serializable(*NActors::IEventHandleLightSerializable::GetLightSerializable(&ev));
+ EventSerializer = serializable.Serializer;
+ EventSerializedSize = 100;
+ } else {
+ EventSerializedSize = 0;
+ }
} else {
- EventSerializedSize = 0;
+ auto& evFat = *IEventHandleFat::GetFat(&ev);
+ if (evFat.HasBuffer()) {
+ Buffer = evFat.ReleaseChainBuffer();
+ EventSerializedSize = Buffer->GetSize();
+ } else if (evFat.HasEvent()) {
+ Event.Reset(evFat.ReleaseBase());
+ EventSerializedSize = Event->CalculateSerializedSize();
+ } else {
+ EventSerializedSize = 0;
+ }
}
return EventSerializedSize;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index c8909c08a7..c06e648541 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -113,6 +113,7 @@ struct TEventHolder : TNonCopyable {
TActorId ForwardRecipient;
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
+ NActors::TEventSerializer EventSerializer;
ui64 Serial;
ui32 EventSerializedSize;
ui32 EventActuallySerialized;
@@ -137,10 +138,11 @@ struct TEventHolder : TNonCopyable {
const TActorId& s = d.Sender;
const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
Span.EndError("nondelivery");
- auto ev = Event
- ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId())
- : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId());
- NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure));
+ TAutoPtr<IEventHandle> ev = Event
+ ? new IEventHandleFat(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId())
+ : new IEventHandleFat(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId());
+ ev = IEventHandle::ForwardOnNondelivery(ev, NActors::TEvents::TEvUndelivered::Disconnected, unsure);
+ NActors::TActivationContext::Send(ev);
}
void Clear() {
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index e75cbcaef4..7161c6ca90 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -104,7 +104,7 @@ namespace NActors {
protected:
void Notify(TSocketRecord *record, bool read, bool write) {
auto issue = [&](const TActorId& recipient) {
- ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write)));
+ ActorSystem->Send(new IEventHandleFat(recipient, {}, new TEvPollerReady(record->Socket, read, write)));
};
if (read && record->ReadActorId) {
issue(record->ReadActorId);
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index b1d2e02f49..9a541aeb86 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -64,6 +64,8 @@ namespace NActors {
using NActors::IEventBase;
using NActors::IEventHandle;
+using NActors::IEventHandleFat;
+using NActors::IEventHandleLight;
using NActors::TActorId;
using NActors::TConstIoVec;
using NActors::TEventSerializedData;
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 32c8237b59..561248c3e5 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
+ auto ev = MakeHolder<IEventHandleFat>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
index 3c474979dc..78158f07cc 100644
--- a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
@@ -106,7 +106,7 @@ void SenderThread(TMutex& lock, TActorSystem *as, ui32 nodeId, ui32 queueId, ui3
const TActorId target = MakeResponderServiceId(nodeId);
for (ui32 i = 0; i < count; ++i) {
const ui32 flags = IEventHandle::FlagTrackDelivery;
- as->Send(new IEventHandle(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i));
+ as->Send(new IEventHandleFat(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i));
}
}
diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
index 3596bffd5a..000f5d4b3e 100644
--- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
@@ -46,25 +46,30 @@ public:
}
const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie);
InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data)));
- TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
+ TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie));
// Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl);
++NextCookie;
}
}
- void HandlePong(TAutoPtr<IEventHandle> ev) {
+ void HandlePong(TAutoPtr<IEventHandle> e) {
// Cerr << (TStringBuilder() << "Receive# " << ev->Cookie << Endl);
- if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) {
- auto& [s2cIt, hash] = it->second;
- Y_VERIFY(hash == ev->GetChainBuffer()->GetString());
- SessionToCookie.erase(s2cIt);
- InFlight.erase(it);
- } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) {
- Y_VERIFY(it->second == ev->GetChainBuffer()->GetString());
- Tentative.erase(it);
+ if (e->IsEventFat()) {
+ auto ev = IEventHandleFat::GetFat(e);
+ if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) {
+ auto& [s2cIt, hash] = it->second;
+ Y_VERIFY(hash == ev->GetChainBuffer()->GetString());
+ SessionToCookie.erase(s2cIt);
+ InFlight.erase(it);
+ } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) {
+ Y_VERIFY(it->second == ev->GetChainBuffer()->GetString());
+ Tentative.erase(it);
+ } else {
+ Y_FAIL("Cookie# %" PRIu64, ev->Cookie);
+ }
} else {
- Y_FAIL("Cookie# %" PRIu64, ev->Cookie);
+ Y_FAIL("Pong is not fat");
}
IssueQueries();
}
@@ -123,10 +128,13 @@ public:
{}
void HandlePing(TAutoPtr<IEventHandle>& ev) {
- const TString& data = ev->GetChainBuffer()->GetString();
- const TString& response = MD5::CalcRaw(data);
- TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(),
- MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie));
+ if (ev->IsEventFat()) {
+ auto* evf = IEventHandleFat::GetFat(ev);
+ const TString& data = evf->GetChainBuffer()->GetString();
+ const TString& response = MD5::CalcRaw(data);
+ TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Pong, 0, evf->Sender, SelfId(),
+ MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), evf->Cookie));
+ }
}
STRICT_STFUNC(StateFunc,
diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
index 23d846a2fd..745a020d2a 100644
--- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
@@ -253,7 +253,7 @@ public:
private:
void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) {
auto ev = new TEvPollerRegister{socket, readActorId, writeActorId};
- ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
+ ActorSystem_->Send(new IEventHandleFat(PollerId_, TActorId{}, ev));
}
private: