aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-03-28 14:31:05 +0300
committerxenoxeno <xeno@ydb.tech>2023-03-28 14:31:05 +0300
commit33421d638103cc382ba851d2491740e2db576307 (patch)
tree166e19b62c40deb088b62651e2a0cb86d4ed8f5c /library/cpp/actors/interconnect
parent8cf3b1d08aa8791cd5cb7ee2a11fbb712cd72d16 (diff)
downloadydb-33421d638103cc382ba851d2491740e2db576307.tar.gz
revert light events
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r--library/cpp/actors/interconnect/event_filter.h2
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h10
-rw-r--r--library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_resolve.cpp6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp26
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h23
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h12
-rw-r--r--library/cpp/actors/interconnect/load.cpp2
-rw-r--r--library/cpp/actors/interconnect/mock/ic_mock.cpp57
-rw-r--r--library/cpp/actors/interconnect/packet.cpp25
-rw-r--r--library/cpp/actors/interconnect/packet.h10
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp2
-rw-r--r--library/cpp/actors/interconnect/types.h2
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/interconnect_ut.cpp38
-rw-r--r--library/cpp/actors/interconnect/ut/poller_actor_ut.cpp2
21 files changed, 90 insertions, 161 deletions
diff --git a/library/cpp/actors/interconnect/event_filter.h b/library/cpp/actors/interconnect/event_filter.h
index de3fdc2a04..47dabf5f16 100644
--- a/library/cpp/actors/interconnect/event_filter.h
+++ b/library/cpp/actors/interconnect/event_filter.h
@@ -31,7 +31,7 @@ namespace NActors {
evSpaceIndex[subtype] = routes;
}
- bool CheckIncomingEvent(const IEventHandleFat& ev, const TScopeId& localScopeId) const {
+ bool CheckIncomingEvent(const IEventHandle& ev, const TScopeId& localScopeId) const {
TRouteMask routes = 0;
if (const auto& evSpaceIndex = ScopeRoutes[ev.Type >> 16]) {
const ui16 subtype = ev.Type & 65535;
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
index 112a0b1b6e..e2e714a213 100644
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -12,14 +12,14 @@ namespace NActors {
TBrokerLeaseHolder(TActorId waiterId, TActorId brokerId)
: WaiterId(waiterId)
, BrokerId(brokerId) {
- if (TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) {
+ if (TActivationContext::Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) {
LeaseRequested = true;
}
}
~TBrokerLeaseHolder() {
if (LeaseRequested) {
- TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerFree()));
+ TActivationContext::Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerFree()));
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index ce4adcdec2..312eff2666 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -59,12 +59,10 @@ namespace NActors {
TEventHolder& event = Pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));
OutputQueueSize += bytes;
- if (ev.IsEventFat()) {
- if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
- event.Span
- .Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
- .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
- }
+ if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
+ event.Span
+ .Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
+ .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
}
return std::make_pair(bytes, &event);
}
diff --git a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
index e917a27ea7..a450d16871 100644
--- a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
+++ b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
@@ -18,7 +18,7 @@ namespace NActors {
, Mock(mock)
{}
- STATEFN(StateFunc) {
+ STFUNC(StateFunc) {
if (ev->GetTypeRewrite() == TEvents::TSystem::Poison && !Proxy) {
PassAway();
} else {
@@ -32,7 +32,7 @@ namespace NActors {
}
Y_VERIFY(Proxy);
}
- InvokeOtherActor(*Proxy, &IActor::Receive, ev);
+ InvokeOtherActor(*Proxy, &IActor::Receive, ev, ctx);
}
}
};
diff --git a/library/cpp/actors/interconnect/interconnect_resolve.cpp b/library/cpp/actors/interconnect/interconnect_resolve.cpp
index 0b0b112628..d638ff830c 100644
--- a/library/cpp/actors/interconnect/interconnect_resolve.cpp
+++ b/library/cpp/actors/interconnect/interconnect_resolve.cpp
@@ -120,7 +120,7 @@ namespace NActors {
LOG_DEBUG_IC("ICR03", "Host: %s, RESOLVED address", Host.c_str());
auto reply = new TEvAddressInfo;
reply->Address = std::move(addr);
- TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply));
+ TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply));
PassAway();
}
@@ -129,7 +129,7 @@ namespace NActors {
auto reply = std::make_unique<TEvLocalNodeInfo>();
reply->NodeId = *NodeId;
reply->Addresses = std::move(addresses);
- TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply.release()));
+ TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply.release()));
PassAway();
}
@@ -138,7 +138,7 @@ namespace NActors {
auto *event = new TEvResolveError;
event->Explain = errorText;
event->Host = Host;
- TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, event));
+ TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, event));
PassAway();
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 122f312fec..32f10c727b 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -334,31 +334,7 @@ namespace NActors {
TEventSerializationInfo serializationInfo{
.IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat),
};
- auto es = GetEventSpace(descr.Type);
- if (es < TEventFactories::EventFactories.size() && TEventFactories::EventFactories[es] != nullptr) {
- const auto& estvec(*TEventFactories::EventFactories[es]);
- auto est = GetEventSubType(descr.Type);
- if (est < estvec.size() && estvec[est] != nullptr) {
- IEventFactory* factory = estvec[est];
- TAutoPtr<IEventHandle> ev = factory->Construct({
- .Session = SessionId,
- .Type = descr.Type,
- .Flags = descr.Flags,
- .Recipient = descr.Recipient,
- .Sender = descr.Sender,
- .Cookie = descr.Cookie,
- .OriginScopeId = Params.PeerScopeId,
- .TraceId = std::move(descr.TraceId),
- .Data = std::move(data),
- });
- if (ev) {
- TActivationContext::Send(ev);
- }
- return;
- }
- }
-
- auto ev = std::make_unique<IEventHandleFat>(SessionId,
+ auto ev = std::make_unique<IEventHandle>(SessionId,
descr.Type,
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 8ee93dfc7a..8562f6a440 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -47,7 +47,7 @@ namespace NActors {
void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) {
if (!DynamicPtr) {
// perform usual bootstrap for static nodes
- sys->Send(new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
+ sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
}
if (const auto& mon = Common->RegisterMonPage) {
TString path = Sprintf("peer%04" PRIu32, PeerNodeId);
@@ -591,7 +591,7 @@ namespace NActors {
// we have found cancellation request for the pending handshake request; so simply remove it from the
// deque, as we are not interested in failure reason; must likely it happens because of handshake timeout
if (pendingEvent->GetTypeRewrite() == TEvHandshakeFail::EventType) {
- TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(pendingEvent.Release()));
+ TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(pendingEvent.Release()));
LogHandshakeFail(tmp, true);
}
PendingIncomingHandshakeEvents.erase(it);
@@ -605,7 +605,7 @@ namespace NActors {
Y_VERIFY(Session && SessionID);
ValidateEvent(ev, "ForwardSessionEventToSession");
- InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev);
+ InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID));
}
void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) {
@@ -774,7 +774,7 @@ namespace NActors {
for (auto& ev : PendingIncomingHandshakeEvents) {
Send(ev->Sender, new TEvents::TEvPoisonPill);
if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) {
- TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(ev.Release()));
+ TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release()));
LogHandshakeFail(tmp, true);
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index ebf02c3f27..71edfccbe2 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -63,10 +63,10 @@ namespace NActors {
TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr = nullptr);
- STATEFN(StateInit) {
+ STFUNC(StateInit) {
Bootstrap();
if (ev->Type != TEvents::TSystem::Bootstrap) { // for dynamic nodes we do not receive Bootstrap event
- Receive(ev);
+ Receive(ev, ctx);
}
}
@@ -180,7 +180,7 @@ namespace NActors {
} else if (DynamicPtr) {
PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15);
if (!PassAwayScheduled) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(),
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
{}, nullptr, 0));
PassAwayScheduled = true;
}
@@ -205,7 +205,7 @@ namespace NActors {
if (now >= PassAwayTimestamp) {
PassAway();
} else if (PassAwayTimestamp != TMonotonic::Max()) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(),
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
{}, nullptr, 0));
} else {
PassAwayScheduled = false;
@@ -370,21 +370,6 @@ namespace NActors {
ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func);
}
- void ValidateEvent(IEventHandle* ev, const char* func) {
- if (SelfId().NodeId() == PeerNodeId) {
- TString msg = Sprintf("Event Type# 0x%08" PRIx32 " TypeRewrite# 0x%08" PRIx32
- " from Sender# %s sent to the proxy for the node itself via Interconnect;"
- " THIS IS NOT A BUG IN INTERCONNECT, check the event sender instead",
- ev->Type, ev->GetTypeRewrite(), ev->Sender.ToString().data());
- LOG_ERROR_IC("ICP03", "%s", msg.data());
- Y_VERIFY_DEBUG(false, "%s", msg.data());
- }
-
- Y_VERIFY(ev->GetTypeRewrite() != TEvInterconnect::EvForward || ev->Recipient.NodeId() == PeerNodeId,
- "Recipient/Proxy NodeId mismatch Recipient# %s Type# 0x%08" PRIx32 " PeerNodeId# %" PRIu32 " Func# %s",
- ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func);
- }
-
// Common with helpers
// All proxy actors share the same information in the object
// read only
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
index 316c233af3..aad8677ca4 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
@@ -25,7 +25,7 @@ namespace NActors {
}
TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
- return new IEventHandleFat(self, parentId, new TEvents::TEvBootstrap, 0);
+ return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
}
void TInterconnectListenerTCP::Die(const TActorContext& ctx) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 5a93bc0cc8..a336e4a89f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -112,8 +112,8 @@ namespace NActors {
Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
}
- void TInterconnectSessionTCP::Forward(TAutoPtr<IEventHandle>& ev) {
- Proxy->ValidateEvent(ev.Get(), "Forward");
+ void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
+ Proxy->ValidateEvent(ev, "Forward");
LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
++MessagesGot;
@@ -126,7 +126,7 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev.Get());
+ const auto [dataSize, event] = oChannel.Push(*ev);
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
@@ -167,7 +167,7 @@ namespace NActors {
} else if (!RamInQueue) {
Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
RamInQueue = new TEvRam(true);
- auto *ev = new IEventHandleFat(SelfId(), {}, RamInQueue);
+ auto *ev = new IEventHandle(SelfId(), {}, RamInQueue);
const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod;
if (batchPeriod != TDuration()) {
TActivationContext::Schedule(batchPeriod, ev);
@@ -179,7 +179,7 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::Subscribe(TAutoPtr<IEventHandle>& ev) {
+ void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
if (inserted) {
@@ -190,7 +190,7 @@ namespace NActors {
Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie);
}
- void TInterconnectSessionTCP::Unsubscribe(TEvents::TEvUnsubscribe::TPtr ev) {
+ void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 9d8bb90ecd..598a5c9220 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -347,16 +347,16 @@ namespace NActors {
void Terminate(TDisconnectReason reason);
void PassAway() override;
- void Forward(LIGHTFN_SIG);
- void Subscribe(TAutoPtr<IEventHandle>& ev);
- void Unsubscribe(TEvents::TEvUnsubscribe::TPtr);
+ void Forward(STATEFN_SIG);
+ void Subscribe(STATEFN_SIG);
+ void Unsubscribe(STATEFN_SIG);
- STRICT_LIGHTFN(StateFunc,
+ STRICT_STFUNC(StateFunc,
fFunc(TEvInterconnect::EvForward, Forward)
cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison)
fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe)
fFunc(TEvents::TEvSubscribe::EventType, Subscribe)
- hFunc(TEvents::TEvUnsubscribe, Unsubscribe)
+ fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe)
cFunc(TEvFlush::EventType, HandleFlush)
hFunc(TEvPollerReady, Handle)
hFunc(TEvPollerRegisterResult, Handle)
@@ -531,7 +531,7 @@ namespace NActors {
auto sender = SelfId();
const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
auto ev = new TEvSessionBufferSizeRequest();
- return new IEventHandleFat(recp, sender, ev, IEventHandle::FlagTrackDelivery);
+ return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
};
RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);
Become(&TInterconnectSessionKiller::StateFunc);
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp
index 65e1d202ff..d460903f35 100644
--- a/library/cpp/actors/interconnect/load.cpp
+++ b/library/cpp/actors/interconnect/load.cpp
@@ -82,7 +82,7 @@ namespace NInterconnect {
)
void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
- ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]).Release());
+ ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]));
if (++SlaveIndex == Slaves.size()) {
SlaveIndex = 0;
}
diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp
index 5619f0f113..0aadc7ae35 100644
--- a/library/cpp/actors/interconnect/mock/ic_mock.cpp
+++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp
@@ -60,11 +60,11 @@ namespace NActors {
TPeerInfo *peer = GetPeer(peerNodeId);
auto guard = TReadGuard(peer->Mutex);
if (peer->ActorSystem) {
- peer->ActorSystem->Send(new IEventHandleFat(peer->ProxyId, TActorId(), new TEvInject(std::move(messages),
+ peer->ActorSystem->Send(new IEventHandle(peer->ProxyId, TActorId(), new TEvInject(std::move(messages),
originScopeId, senderSessionId)));
} else {
for (auto&& ev : messages) {
- TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected));
}
}
}
@@ -78,7 +78,7 @@ namespace NActors {
TPeerInfo *peer = GetPeer(peerNodeId);
auto guard = TReadGuard(peer->Mutex);
if (peer->ActorSystem) {
- peer->ActorSystem->Send(new IEventHandleFat(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0));
+ peer->ActorSystem->Send(new IEventHandle(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0));
}
}
@@ -114,7 +114,7 @@ namespace NActors {
void Terminate() {
for (auto&& ev : std::exchange(Queue, {})) {
- TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected));
}
for (const auto& kv : Subscribers) {
Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
@@ -130,7 +130,7 @@ namespace NActors {
Subscribe(ev->Sender, ev->Cookie);
}
if (Queue.empty()) {
- TActivationContext::Send(new IEventHandleFat(EvRam, 0, SelfId(), {}, {}, 0));
+ TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0));
}
Queue.emplace_back(ev.Release());
}
@@ -193,7 +193,7 @@ namespace NActors {
}
template <typename TEvent>
- bool CheckNodeStatus(TAutoPtr<TEventHandleFat<TEvent>>& ev) {
+ bool CheckNodeStatus(TAutoPtr<TEventHandle<TEvent>>& ev) {
if (PeerNodeStatus != EPeerNodeStatus::EXISTS) {
std::unique_ptr<IEventHandle> tmp(ev.Release());
CheckNonexistentNode(tmp);
@@ -224,7 +224,7 @@ namespace NActors {
if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie);
}
- TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected));
break;
case TEvents::TEvSubscribe::EventType:
@@ -252,7 +252,7 @@ namespace NActors {
while (!WaitingConnections.empty()) {
TAutoPtr<IEventHandle> tmp(WaitingConnections.front().release());
WaitingConnections.pop_front();
- Receive(tmp);
+ Receive(tmp, TActivationContext::AsActorContext());
}
}
};
@@ -287,28 +287,20 @@ namespace NActors {
return; // drop messages from other sessions
}
if (auto *session = GetSession()) {
- for (auto&& evb : ev->Get()->Messages) {
- if (ev->IsEventLight()) {
- // TODO(xenoxeno):
- //if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
- TActivationContext::Send(evb.release());
- //}
- } else {
- auto* ev = IEventHandleFat::GetFat(evb);
- auto fw = std::make_unique<IEventHandleFat>(
- session->SelfId(),
- ev->Type,
- ev->Flags & ~IEventHandle::FlagForwardOnNondelivery,
- ev->Recipient,
- ev->Sender,
- ev->ReleaseChainBuffer(),
- ev->Cookie,
- msg->OriginScopeId,
- std::move(ev->TraceId)
- );
- if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
- TActivationContext::Send(fw.release());
- }
+ for (auto&& ev : ev->Get()->Messages) {
+ auto fw = std::make_unique<IEventHandle>(
+ session->SelfId(),
+ ev->Type,
+ ev->Flags & ~IEventHandle::FlagForwardOnNondelivery,
+ ev->Recipient,
+ ev->Sender,
+ ev->ReleaseChainBuffer(),
+ ev->Cookie,
+ msg->OriginScopeId,
+ std::move(ev->TraceId)
+ );
+ if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
+ TActivationContext::Send(fw.release());
}
}
}
@@ -330,7 +322,8 @@ namespace NActors {
void HandleSessionEvent(TAutoPtr<IEventHandle> ev) {
auto *session = GetSession();
- InvokeOtherActor(*session, &TSessionMockActor::Receive, ev);
+ InvokeOtherActor(*session, &TSessionMockActor::Receive, ev,
+ TActivationContext::ActorContextFor(session->SelfId()));
}
void Disconnect() {
@@ -351,7 +344,7 @@ namespace NActors {
return State.Inject(PeerNodeId, std::move(messages), Common->LocalScopeId, Session->SessionId);
}
- STRICT_LIGHTFN(StateFunc,
+ STRICT_STFUNC(StateFunc,
cFunc(TEvents::TSystem::Poison, PassAway)
fFunc(TEvInterconnect::EvForward, HandleSessionEvent)
fFunc(TEvInterconnect::EvConnectNode, HandleSessionEvent)
diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp
index 7b0bdf3e17..9ba173e330 100644
--- a/library/cpp/actors/interconnect/packet.cpp
+++ b/library/cpp/actors/interconnect/packet.cpp
@@ -17,25 +17,14 @@ ui32 TEventHolder::Fill(IEventHandle& ev) {
EventActuallySerialized = 0;
Descr.Checksum = 0;
- if (ev.IsEventLight()) {
- if (ev.IsEventSerializable()) {
- NActors::IEventHandleLightSerializable& serializable(*NActors::IEventHandleLightSerializable::GetLightSerializable(&ev));
- EventSerializer = serializable.Serializer;
- EventSerializedSize = 100;
- } else {
- EventSerializedSize = 0;
- }
+ if (ev.HasBuffer()) {
+ Buffer = ev.ReleaseChainBuffer();
+ EventSerializedSize = Buffer->GetSize();
+ } else if (ev.HasEvent()) {
+ Event.Reset(ev.ReleaseBase());
+ EventSerializedSize = Event->CalculateSerializedSize();
} else {
- auto& evFat = *IEventHandleFat::GetFat(&ev);
- if (evFat.HasBuffer()) {
- Buffer = evFat.ReleaseChainBuffer();
- EventSerializedSize = Buffer->GetSize();
- } else if (evFat.HasEvent()) {
- Event.Reset(evFat.ReleaseBase());
- EventSerializedSize = Event->CalculateSerializedSize();
- } else {
- EventSerializedSize = 0;
- }
+ EventSerializedSize = 0;
}
return EventSerializedSize;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index c06e648541..f3c506a663 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -113,7 +113,6 @@ struct TEventHolder : TNonCopyable {
TActorId ForwardRecipient;
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
- NActors::TEventSerializer EventSerializer;
ui64 Serial;
ui32 EventSerializedSize;
ui32 EventActuallySerialized;
@@ -138,11 +137,10 @@ struct TEventHolder : TNonCopyable {
const TActorId& s = d.Sender;
const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
Span.EndError("nondelivery");
- TAutoPtr<IEventHandle> ev = Event
- ? new IEventHandleFat(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId())
- : new IEventHandleFat(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId());
- ev = IEventHandle::ForwardOnNondelivery(ev, NActors::TEvents::TEvUndelivered::Disconnected, unsure);
- NActors::TActivationContext::Send(ev);
+ auto ev = Event
+ ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId())
+ : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId());
+ NActors::TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), NActors::TEvents::TEvUndelivered::Disconnected, unsure));
}
void Clear() {
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index 7161c6ca90..e75cbcaef4 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -104,7 +104,7 @@ namespace NActors {
protected:
void Notify(TSocketRecord *record, bool read, bool write) {
auto issue = [&](const TActorId& recipient) {
- ActorSystem->Send(new IEventHandleFat(recipient, {}, new TEvPollerReady(record->Socket, read, write)));
+ ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write)));
};
if (read && record->ReadActorId) {
issue(record->ReadActorId);
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index 9a541aeb86..b1d2e02f49 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -64,8 +64,6 @@ namespace NActors {
using NActors::IEventBase;
using NActors::IEventHandle;
-using NActors::IEventHandleFat;
-using NActors::IEventHandleLight;
using NActors::TActorId;
using NActors::TConstIoVec;
using NActors::TEventSerializedData;
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 561248c3e5..32c8237b59 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandleFat>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
index 78158f07cc..3c474979dc 100644
--- a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
@@ -106,7 +106,7 @@ void SenderThread(TMutex& lock, TActorSystem *as, ui32 nodeId, ui32 queueId, ui3
const TActorId target = MakeResponderServiceId(nodeId);
for (ui32 i = 0; i < count; ++i) {
const ui32 flags = IEventHandle::FlagTrackDelivery;
- as->Send(new IEventHandleFat(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i));
+ as->Send(new IEventHandle(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i));
}
}
diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
index 000f5d4b3e..3596bffd5a 100644
--- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
@@ -46,30 +46,25 @@ public:
}
const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie);
InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data)));
- TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
+ TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie));
// Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl);
++NextCookie;
}
}
- void HandlePong(TAutoPtr<IEventHandle> e) {
+ void HandlePong(TAutoPtr<IEventHandle> ev) {
// Cerr << (TStringBuilder() << "Receive# " << ev->Cookie << Endl);
- if (e->IsEventFat()) {
- auto ev = IEventHandleFat::GetFat(e);
- if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) {
- auto& [s2cIt, hash] = it->second;
- Y_VERIFY(hash == ev->GetChainBuffer()->GetString());
- SessionToCookie.erase(s2cIt);
- InFlight.erase(it);
- } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) {
- Y_VERIFY(it->second == ev->GetChainBuffer()->GetString());
- Tentative.erase(it);
- } else {
- Y_FAIL("Cookie# %" PRIu64, ev->Cookie);
- }
+ if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) {
+ auto& [s2cIt, hash] = it->second;
+ Y_VERIFY(hash == ev->GetChainBuffer()->GetString());
+ SessionToCookie.erase(s2cIt);
+ InFlight.erase(it);
+ } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) {
+ Y_VERIFY(it->second == ev->GetChainBuffer()->GetString());
+ Tentative.erase(it);
} else {
- Y_FAIL("Pong is not fat");
+ Y_FAIL("Cookie# %" PRIu64, ev->Cookie);
}
IssueQueries();
}
@@ -128,13 +123,10 @@ public:
{}
void HandlePing(TAutoPtr<IEventHandle>& ev) {
- if (ev->IsEventFat()) {
- auto* evf = IEventHandleFat::GetFat(ev);
- const TString& data = evf->GetChainBuffer()->GetString();
- const TString& response = MD5::CalcRaw(data);
- TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Pong, 0, evf->Sender, SelfId(),
- MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), evf->Cookie));
- }
+ const TString& data = ev->GetChainBuffer()->GetString();
+ const TString& response = MD5::CalcRaw(data);
+ TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(),
+ MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie));
}
STRICT_STFUNC(StateFunc,
diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
index 745a020d2a..23d846a2fd 100644
--- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
@@ -253,7 +253,7 @@ public:
private:
void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) {
auto ev = new TEvPollerRegister{socket, readActorId, writeActorId};
- ActorSystem_->Send(new IEventHandleFat(PollerId_, TActorId{}, ev));
+ ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
}
private: