diff options
author | xenoxeno <xeno@ydb.tech> | 2023-03-28 14:31:05 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-03-28 14:31:05 +0300 |
commit | 33421d638103cc382ba851d2491740e2db576307 (patch) | |
tree | 166e19b62c40deb088b62651e2a0cb86d4ed8f5c /library/cpp/actors/interconnect | |
parent | 8cf3b1d08aa8791cd5cb7ee2a11fbb712cd72d16 (diff) | |
download | ydb-33421d638103cc382ba851d2491740e2db576307.tar.gz |
revert light events
Diffstat (limited to 'library/cpp/actors/interconnect')
21 files changed, 90 insertions, 161 deletions
diff --git a/library/cpp/actors/interconnect/event_filter.h b/library/cpp/actors/interconnect/event_filter.h index de3fdc2a04..47dabf5f16 100644 --- a/library/cpp/actors/interconnect/event_filter.h +++ b/library/cpp/actors/interconnect/event_filter.h @@ -31,7 +31,7 @@ namespace NActors { evSpaceIndex[subtype] = routes; } - bool CheckIncomingEvent(const IEventHandleFat& ev, const TScopeId& localScopeId) const { + bool CheckIncomingEvent(const IEventHandle& ev, const TScopeId& localScopeId) const { TRouteMask routes = 0; if (const auto& evSpaceIndex = ScopeRoutes[ev.Type >> 16]) { const ui16 subtype = ev.Type & 65535; diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h index 112a0b1b6e..e2e714a213 100644 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -12,14 +12,14 @@ namespace NActors { TBrokerLeaseHolder(TActorId waiterId, TActorId brokerId) : WaiterId(waiterId) , BrokerId(brokerId) { - if (TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) { + if (TActivationContext::Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) { LeaseRequested = true; } } ~TBrokerLeaseHolder() { if (LeaseRequested) { - TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerFree())); + TActivationContext::Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerFree())); } } diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index ce4adcdec2..312eff2666 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -59,12 +59,10 @@ namespace NActors { TEventHolder& event = Pool.Allocate(Queue); const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1)); OutputQueueSize += bytes; - if (ev.IsEventFat()) { - if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { - event.Span - .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) - .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); - } + if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { + event.Span + .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) + .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); } return std::make_pair(bytes, &event); } diff --git a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp index e917a27ea7..a450d16871 100644 --- a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp +++ b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp @@ -18,7 +18,7 @@ namespace NActors { , Mock(mock) {} - STATEFN(StateFunc) { + STFUNC(StateFunc) { if (ev->GetTypeRewrite() == TEvents::TSystem::Poison && !Proxy) { PassAway(); } else { @@ -32,7 +32,7 @@ namespace NActors { } Y_VERIFY(Proxy); } - InvokeOtherActor(*Proxy, &IActor::Receive, ev); + InvokeOtherActor(*Proxy, &IActor::Receive, ev, ctx); } } }; diff --git a/library/cpp/actors/interconnect/interconnect_resolve.cpp b/library/cpp/actors/interconnect/interconnect_resolve.cpp index 0b0b112628..d638ff830c 100644 --- a/library/cpp/actors/interconnect/interconnect_resolve.cpp +++ b/library/cpp/actors/interconnect/interconnect_resolve.cpp @@ -120,7 +120,7 @@ namespace NActors { LOG_DEBUG_IC("ICR03", "Host: %s, RESOLVED address", Host.c_str()); auto reply = new TEvAddressInfo; reply->Address = std::move(addr); - TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply)); + TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply)); PassAway(); } @@ -129,7 +129,7 @@ namespace NActors { auto reply = std::make_unique<TEvLocalNodeInfo>(); reply->NodeId = *NodeId; reply->Addresses = std::move(addresses); - TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply.release())); + TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply.release())); PassAway(); } @@ -138,7 +138,7 @@ namespace NActors { auto *event = new TEvResolveError; event->Explain = errorText; event->Host = Host; - TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, event)); + TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, event)); PassAway(); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 122f312fec..32f10c727b 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -334,31 +334,7 @@ namespace NActors { TEventSerializationInfo serializationInfo{ .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), }; - auto es = GetEventSpace(descr.Type); - if (es < TEventFactories::EventFactories.size() && TEventFactories::EventFactories[es] != nullptr) { - const auto& estvec(*TEventFactories::EventFactories[es]); - auto est = GetEventSubType(descr.Type); - if (est < estvec.size() && estvec[est] != nullptr) { - IEventFactory* factory = estvec[est]; - TAutoPtr<IEventHandle> ev = factory->Construct({ - .Session = SessionId, - .Type = descr.Type, - .Flags = descr.Flags, - .Recipient = descr.Recipient, - .Sender = descr.Sender, - .Cookie = descr.Cookie, - .OriginScopeId = Params.PeerScopeId, - .TraceId = std::move(descr.TraceId), - .Data = std::move(data), - }); - if (ev) { - TActivationContext::Send(ev); - } - return; - } - } - - auto ev = std::make_unique<IEventHandleFat>(SessionId, + auto ev = std::make_unique<IEventHandle>(SessionId, descr.Type, descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 8ee93dfc7a..8562f6a440 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -47,7 +47,7 @@ namespace NActors { void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) { if (!DynamicPtr) { // perform usual bootstrap for static nodes - sys->Send(new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0)); + sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0)); } if (const auto& mon = Common->RegisterMonPage) { TString path = Sprintf("peer%04" PRIu32, PeerNodeId); @@ -591,7 +591,7 @@ namespace NActors { // we have found cancellation request for the pending handshake request; so simply remove it from the // deque, as we are not interested in failure reason; must likely it happens because of handshake timeout if (pendingEvent->GetTypeRewrite() == TEvHandshakeFail::EventType) { - TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(pendingEvent.Release())); + TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(pendingEvent.Release())); LogHandshakeFail(tmp, true); } PendingIncomingHandshakeEvents.erase(it); @@ -605,7 +605,7 @@ namespace NActors { Y_VERIFY(Session && SessionID); ValidateEvent(ev, "ForwardSessionEventToSession"); - InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev); + InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID)); } void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) { @@ -774,7 +774,7 @@ namespace NActors { for (auto& ev : PendingIncomingHandshakeEvents) { Send(ev->Sender, new TEvents::TEvPoisonPill); if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) { - TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(ev.Release())); + TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release())); LogHandshakeFail(tmp, true); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index ebf02c3f27..71edfccbe2 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -63,10 +63,10 @@ namespace NActors { TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr = nullptr); - STATEFN(StateInit) { + STFUNC(StateInit) { Bootstrap(); if (ev->Type != TEvents::TSystem::Bootstrap) { // for dynamic nodes we do not receive Bootstrap event - Receive(ev); + Receive(ev, ctx); } } @@ -180,7 +180,7 @@ namespace NActors { } else if (DynamicPtr) { PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15); if (!PassAwayScheduled) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(), + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), {}, nullptr, 0)); PassAwayScheduled = true; } @@ -205,7 +205,7 @@ namespace NActors { if (now >= PassAwayTimestamp) { PassAway(); } else if (PassAwayTimestamp != TMonotonic::Max()) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(), + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), {}, nullptr, 0)); } else { PassAwayScheduled = false; @@ -370,21 +370,6 @@ namespace NActors { ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func); } - void ValidateEvent(IEventHandle* ev, const char* func) { - if (SelfId().NodeId() == PeerNodeId) { - TString msg = Sprintf("Event Type# 0x%08" PRIx32 " TypeRewrite# 0x%08" PRIx32 - " from Sender# %s sent to the proxy for the node itself via Interconnect;" - " THIS IS NOT A BUG IN INTERCONNECT, check the event sender instead", - ev->Type, ev->GetTypeRewrite(), ev->Sender.ToString().data()); - LOG_ERROR_IC("ICP03", "%s", msg.data()); - Y_VERIFY_DEBUG(false, "%s", msg.data()); - } - - Y_VERIFY(ev->GetTypeRewrite() != TEvInterconnect::EvForward || ev->Recipient.NodeId() == PeerNodeId, - "Recipient/Proxy NodeId mismatch Recipient# %s Type# 0x%08" PRIx32 " PeerNodeId# %" PRIu32 " Func# %s", - ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func); - } - // Common with helpers // All proxy actors share the same information in the object // read only diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index 316c233af3..aad8677ca4 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -25,7 +25,7 @@ namespace NActors { } TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { - return new IEventHandleFat(self, parentId, new TEvents::TEvBootstrap, 0); + return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); } void TInterconnectListenerTCP::Die(const TActorContext& ctx) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 5a93bc0cc8..a336e4a89f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -112,8 +112,8 @@ namespace NActors { Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly"); } - void TInterconnectSessionTCP::Forward(TAutoPtr<IEventHandle>& ev) { - Proxy->ValidateEvent(ev.Get(), "Forward"); + void TInterconnectSessionTCP::Forward(STATEFN_SIG) { + Proxy->ValidateEvent(ev, "Forward"); LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data()); ++MessagesGot; @@ -126,7 +126,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev.Get()); + const auto [dataSize, event] = oChannel.Push(*ev); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; @@ -167,7 +167,7 @@ namespace NActors { } else if (!RamInQueue) { Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1); RamInQueue = new TEvRam(true); - auto *ev = new IEventHandleFat(SelfId(), {}, RamInQueue); + auto *ev = new IEventHandle(SelfId(), {}, RamInQueue); const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod; if (batchPeriod != TDuration()) { TActivationContext::Schedule(batchPeriod, ev); @@ -179,7 +179,7 @@ namespace NActors { } } - void TInterconnectSessionTCP::Subscribe(TAutoPtr<IEventHandle>& ev) { + void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data()); const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); if (inserted) { @@ -190,7 +190,7 @@ namespace NActors { Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie); } - void TInterconnectSessionTCP::Unsubscribe(TEvents::TEvUnsubscribe::TPtr ev) { + void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 9d8bb90ecd..598a5c9220 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -347,16 +347,16 @@ namespace NActors { void Terminate(TDisconnectReason reason); void PassAway() override; - void Forward(LIGHTFN_SIG); - void Subscribe(TAutoPtr<IEventHandle>& ev); - void Unsubscribe(TEvents::TEvUnsubscribe::TPtr); + void Forward(STATEFN_SIG); + void Subscribe(STATEFN_SIG); + void Unsubscribe(STATEFN_SIG); - STRICT_LIGHTFN(StateFunc, + STRICT_STFUNC(StateFunc, fFunc(TEvInterconnect::EvForward, Forward) cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison) fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe) fFunc(TEvents::TEvSubscribe::EventType, Subscribe) - hFunc(TEvents::TEvUnsubscribe, Unsubscribe) + fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe) cFunc(TEvFlush::EventType, HandleFlush) hFunc(TEvPollerReady, Handle) hFunc(TEvPollerRegisterResult, Handle) @@ -531,7 +531,7 @@ namespace NActors { auto sender = SelfId(); const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { auto ev = new TEvSessionBufferSizeRequest(); - return new IEventHandleFat(recp, sender, ev, IEventHandle::FlagTrackDelivery); + return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); }; RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric); Become(&TInterconnectSessionKiller::StateFunc); diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp index 65e1d202ff..d460903f35 100644 --- a/library/cpp/actors/interconnect/load.cpp +++ b/library/cpp/actors/interconnect/load.cpp @@ -82,7 +82,7 @@ namespace NInterconnect { ) void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { - ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]).Release()); + ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex])); if (++SlaveIndex == Slaves.size()) { SlaveIndex = 0; } diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp index 5619f0f113..0aadc7ae35 100644 --- a/library/cpp/actors/interconnect/mock/ic_mock.cpp +++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp @@ -60,11 +60,11 @@ namespace NActors { TPeerInfo *peer = GetPeer(peerNodeId); auto guard = TReadGuard(peer->Mutex); if (peer->ActorSystem) { - peer->ActorSystem->Send(new IEventHandleFat(peer->ProxyId, TActorId(), new TEvInject(std::move(messages), + peer->ActorSystem->Send(new IEventHandle(peer->ProxyId, TActorId(), new TEvInject(std::move(messages), originScopeId, senderSessionId))); } else { for (auto&& ev : messages) { - TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected)); } } } @@ -78,7 +78,7 @@ namespace NActors { TPeerInfo *peer = GetPeer(peerNodeId); auto guard = TReadGuard(peer->Mutex); if (peer->ActorSystem) { - peer->ActorSystem->Send(new IEventHandleFat(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0)); + peer->ActorSystem->Send(new IEventHandle(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0)); } } @@ -114,7 +114,7 @@ namespace NActors { void Terminate() { for (auto&& ev : std::exchange(Queue, {})) { - TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected)); } for (const auto& kv : Subscribers) { Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second); @@ -130,7 +130,7 @@ namespace NActors { Subscribe(ev->Sender, ev->Cookie); } if (Queue.empty()) { - TActivationContext::Send(new IEventHandleFat(EvRam, 0, SelfId(), {}, {}, 0)); + TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0)); } Queue.emplace_back(ev.Release()); } @@ -193,7 +193,7 @@ namespace NActors { } template <typename TEvent> - bool CheckNodeStatus(TAutoPtr<TEventHandleFat<TEvent>>& ev) { + bool CheckNodeStatus(TAutoPtr<TEventHandle<TEvent>>& ev) { if (PeerNodeStatus != EPeerNodeStatus::EXISTS) { std::unique_ptr<IEventHandle> tmp(ev.Release()); CheckNonexistentNode(tmp); @@ -224,7 +224,7 @@ namespace NActors { if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie); } - TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected)); break; case TEvents::TEvSubscribe::EventType: @@ -252,7 +252,7 @@ namespace NActors { while (!WaitingConnections.empty()) { TAutoPtr<IEventHandle> tmp(WaitingConnections.front().release()); WaitingConnections.pop_front(); - Receive(tmp); + Receive(tmp, TActivationContext::AsActorContext()); } } }; @@ -287,28 +287,20 @@ namespace NActors { return; // drop messages from other sessions } if (auto *session = GetSession()) { - for (auto&& evb : ev->Get()->Messages) { - if (ev->IsEventLight()) { - // TODO(xenoxeno): - //if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { - TActivationContext::Send(evb.release()); - //} - } else { - auto* ev = IEventHandleFat::GetFat(evb); - auto fw = std::make_unique<IEventHandleFat>( - session->SelfId(), - ev->Type, - ev->Flags & ~IEventHandle::FlagForwardOnNondelivery, - ev->Recipient, - ev->Sender, - ev->ReleaseChainBuffer(), - ev->Cookie, - msg->OriginScopeId, - std::move(ev->TraceId) - ); - if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { - TActivationContext::Send(fw.release()); - } + for (auto&& ev : ev->Get()->Messages) { + auto fw = std::make_unique<IEventHandle>( + session->SelfId(), + ev->Type, + ev->Flags & ~IEventHandle::FlagForwardOnNondelivery, + ev->Recipient, + ev->Sender, + ev->ReleaseChainBuffer(), + ev->Cookie, + msg->OriginScopeId, + std::move(ev->TraceId) + ); + if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { + TActivationContext::Send(fw.release()); } } } @@ -330,7 +322,8 @@ namespace NActors { void HandleSessionEvent(TAutoPtr<IEventHandle> ev) { auto *session = GetSession(); - InvokeOtherActor(*session, &TSessionMockActor::Receive, ev); + InvokeOtherActor(*session, &TSessionMockActor::Receive, ev, + TActivationContext::ActorContextFor(session->SelfId())); } void Disconnect() { @@ -351,7 +344,7 @@ namespace NActors { return State.Inject(PeerNodeId, std::move(messages), Common->LocalScopeId, Session->SessionId); } - STRICT_LIGHTFN(StateFunc, + STRICT_STFUNC(StateFunc, cFunc(TEvents::TSystem::Poison, PassAway) fFunc(TEvInterconnect::EvForward, HandleSessionEvent) fFunc(TEvInterconnect::EvConnectNode, HandleSessionEvent) diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp index 7b0bdf3e17..9ba173e330 100644 --- a/library/cpp/actors/interconnect/packet.cpp +++ b/library/cpp/actors/interconnect/packet.cpp @@ -17,25 +17,14 @@ ui32 TEventHolder::Fill(IEventHandle& ev) { EventActuallySerialized = 0; Descr.Checksum = 0; - if (ev.IsEventLight()) { - if (ev.IsEventSerializable()) { - NActors::IEventHandleLightSerializable& serializable(*NActors::IEventHandleLightSerializable::GetLightSerializable(&ev)); - EventSerializer = serializable.Serializer; - EventSerializedSize = 100; - } else { - EventSerializedSize = 0; - } + if (ev.HasBuffer()) { + Buffer = ev.ReleaseChainBuffer(); + EventSerializedSize = Buffer->GetSize(); + } else if (ev.HasEvent()) { + Event.Reset(ev.ReleaseBase()); + EventSerializedSize = Event->CalculateSerializedSize(); } else { - auto& evFat = *IEventHandleFat::GetFat(&ev); - if (evFat.HasBuffer()) { - Buffer = evFat.ReleaseChainBuffer(); - EventSerializedSize = Buffer->GetSize(); - } else if (evFat.HasEvent()) { - Event.Reset(evFat.ReleaseBase()); - EventSerializedSize = Event->CalculateSerializedSize(); - } else { - EventSerializedSize = 0; - } + EventSerializedSize = 0; } return EventSerializedSize; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index c06e648541..f3c506a663 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -113,7 +113,6 @@ struct TEventHolder : TNonCopyable { TActorId ForwardRecipient; THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; - NActors::TEventSerializer EventSerializer; ui64 Serial; ui32 EventSerializedSize; ui32 EventActuallySerialized; @@ -138,11 +137,10 @@ struct TEventHolder : TNonCopyable { const TActorId& s = d.Sender; const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; Span.EndError("nondelivery"); - TAutoPtr<IEventHandle> ev = Event - ? new IEventHandleFat(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId()) - : new IEventHandleFat(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId()); - ev = IEventHandle::ForwardOnNondelivery(ev, NActors::TEvents::TEvUndelivered::Disconnected, unsure); - NActors::TActivationContext::Send(ev); + auto ev = Event + ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId()) + : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId()); + NActors::TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), NActors::TEvents::TEvUndelivered::Disconnected, unsure)); } void Clear() { diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index 7161c6ca90..e75cbcaef4 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -104,7 +104,7 @@ namespace NActors { protected: void Notify(TSocketRecord *record, bool read, bool write) { auto issue = [&](const TActorId& recipient) { - ActorSystem->Send(new IEventHandleFat(recipient, {}, new TEvPollerReady(record->Socket, read, write))); + ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write))); }; if (read && record->ReadActorId) { issue(record->ReadActorId); diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index 9a541aeb86..b1d2e02f49 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -64,8 +64,6 @@ namespace NActors { using NActors::IEventBase; using NActors::IEventHandle; -using NActors::IEventHandleFat; -using NActors::IEventHandleLight; using NActors::TActorId; using NActors::TConstIoVec; using NActors::TEventSerializedData; diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 561248c3e5..32c8237b59 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandleFat>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp index 78158f07cc..3c474979dc 100644 --- a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp +++ b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp @@ -106,7 +106,7 @@ void SenderThread(TMutex& lock, TActorSystem *as, ui32 nodeId, ui32 queueId, ui3 const TActorId target = MakeResponderServiceId(nodeId); for (ui32 i = 0; i < count; ++i) { const ui32 flags = IEventHandle::FlagTrackDelivery; - as->Send(new IEventHandleFat(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i)); + as->Send(new IEventHandle(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i)); } } diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp index 000f5d4b3e..3596bffd5a 100644 --- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -46,30 +46,25 @@ public: } const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie); InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data))); - TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, + TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie)); // Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl); ++NextCookie; } } - void HandlePong(TAutoPtr<IEventHandle> e) { + void HandlePong(TAutoPtr<IEventHandle> ev) { // Cerr << (TStringBuilder() << "Receive# " << ev->Cookie << Endl); - if (e->IsEventFat()) { - auto ev = IEventHandleFat::GetFat(e); - if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) { - auto& [s2cIt, hash] = it->second; - Y_VERIFY(hash == ev->GetChainBuffer()->GetString()); - SessionToCookie.erase(s2cIt); - InFlight.erase(it); - } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) { - Y_VERIFY(it->second == ev->GetChainBuffer()->GetString()); - Tentative.erase(it); - } else { - Y_FAIL("Cookie# %" PRIu64, ev->Cookie); - } + if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) { + auto& [s2cIt, hash] = it->second; + Y_VERIFY(hash == ev->GetChainBuffer()->GetString()); + SessionToCookie.erase(s2cIt); + InFlight.erase(it); + } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) { + Y_VERIFY(it->second == ev->GetChainBuffer()->GetString()); + Tentative.erase(it); } else { - Y_FAIL("Pong is not fat"); + Y_FAIL("Cookie# %" PRIu64, ev->Cookie); } IssueQueries(); } @@ -128,13 +123,10 @@ public: {} void HandlePing(TAutoPtr<IEventHandle>& ev) { - if (ev->IsEventFat()) { - auto* evf = IEventHandleFat::GetFat(ev); - const TString& data = evf->GetChainBuffer()->GetString(); - const TString& response = MD5::CalcRaw(data); - TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Pong, 0, evf->Sender, SelfId(), - MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), evf->Cookie)); - } + const TString& data = ev->GetChainBuffer()->GetString(); + const TString& response = MD5::CalcRaw(data); + TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), + MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie)); } STRICT_STFUNC(StateFunc, diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp index 745a020d2a..23d846a2fd 100644 --- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp +++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp @@ -253,7 +253,7 @@ public: private: void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) { auto ev = new TEvPollerRegister{socket, readActorId, writeActorId}; - ActorSystem_->Send(new IEventHandleFat(PollerId_, TActorId{}, ev)); + ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); } private: |