diff options
author | xenoxeno <xeno@ydb.tech> | 2023-03-09 12:10:01 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-03-09 12:10:01 +0300 |
commit | ad607bb887619f321dec03b02df8220e01b7f5aa (patch) | |
tree | 7d5c87352cbe835b56bb2bdac93b37cbdf8ead21 /library/cpp/actors/interconnect | |
parent | 6324d075a5e80b6943b5de6b465b775050fe83df (diff) | |
download | ydb-ad607bb887619f321dec03b02df8220e01b7f5aa.tar.gz |
light events for actor system
Diffstat (limited to 'library/cpp/actors/interconnect')
20 files changed, 165 insertions, 94 deletions
diff --git a/library/cpp/actors/interconnect/event_filter.h b/library/cpp/actors/interconnect/event_filter.h index 47dabf5f16..de3fdc2a04 100644 --- a/library/cpp/actors/interconnect/event_filter.h +++ b/library/cpp/actors/interconnect/event_filter.h @@ -31,7 +31,7 @@ namespace NActors { evSpaceIndex[subtype] = routes; } - bool CheckIncomingEvent(const IEventHandle& ev, const TScopeId& localScopeId) const { + bool CheckIncomingEvent(const IEventHandleFat& ev, const TScopeId& localScopeId) const { TRouteMask routes = 0; if (const auto& evSpaceIndex = ScopeRoutes[ev.Type >> 16]) { const ui16 subtype = ev.Type & 65535; diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 312eff2666..ce4adcdec2 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -59,10 +59,12 @@ namespace NActors { TEventHolder& event = Pool.Allocate(Queue); const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1)); OutputQueueSize += bytes; - if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { - event.Span - .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) - .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); + if (ev.IsEventFat()) { + if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { + event.Span + .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) + .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); + } } return std::make_pair(bytes, &event); } diff --git a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp index a450d16871..e917a27ea7 100644 --- a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp +++ b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp @@ -18,7 +18,7 @@ namespace NActors { , Mock(mock) {} - STFUNC(StateFunc) { + STATEFN(StateFunc) { if (ev->GetTypeRewrite() == TEvents::TSystem::Poison && !Proxy) { PassAway(); } else { @@ -32,7 +32,7 @@ namespace NActors { } Y_VERIFY(Proxy); } - InvokeOtherActor(*Proxy, &IActor::Receive, ev, ctx); + InvokeOtherActor(*Proxy, &IActor::Receive, ev); } } }; diff --git a/library/cpp/actors/interconnect/interconnect_resolve.cpp b/library/cpp/actors/interconnect/interconnect_resolve.cpp index d638ff830c..0b0b112628 100644 --- a/library/cpp/actors/interconnect/interconnect_resolve.cpp +++ b/library/cpp/actors/interconnect/interconnect_resolve.cpp @@ -120,7 +120,7 @@ namespace NActors { LOG_DEBUG_IC("ICR03", "Host: %s, RESOLVED address", Host.c_str()); auto reply = new TEvAddressInfo; reply->Address = std::move(addr); - TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply)); + TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply)); PassAway(); } @@ -129,7 +129,7 @@ namespace NActors { auto reply = std::make_unique<TEvLocalNodeInfo>(); reply->NodeId = *NodeId; reply->Addresses = std::move(addresses); - TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply.release())); + TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply.release())); PassAway(); } @@ -138,7 +138,7 @@ namespace NActors { auto *event = new TEvResolveError; event->Explain = errorText; event->Host = Host; - TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, event)); + TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, event)); PassAway(); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index b1212b8914..44abbb06bf 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -330,7 +330,31 @@ namespace NActors { TEventSerializationInfo serializationInfo{ .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), }; - auto ev = std::make_unique<IEventHandle>(SessionId, + auto es = GetEventSpace(descr.Type); + if (es < TEventFactories::EventFactories.size() && TEventFactories::EventFactories[es] != nullptr) { + const auto& estvec(*TEventFactories::EventFactories[es]); + auto est = GetEventSubType(descr.Type); + if (est < estvec.size() && estvec[est] != nullptr) { + IEventFactory* factory = estvec[est]; + TAutoPtr<IEventHandle> ev = factory->Construct({ + .Session = SessionId, + .Type = descr.Type, + .Flags = descr.Flags, + .Recipient = descr.Recipient, + .Sender = descr.Sender, + .Cookie = descr.Cookie, + .OriginScopeId = Params.PeerScopeId, + .TraceId = std::move(descr.TraceId), + .Data = std::move(data), + }); + if (ev) { + TActivationContext::Send(ev); + } + return; + } + } + + auto ev = std::make_unique<IEventHandleFat>(SessionId, descr.Type, descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 3edded47d6..8ee93dfc7a 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -47,7 +47,7 @@ namespace NActors { void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) { if (!DynamicPtr) { // perform usual bootstrap for static nodes - sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0)); + sys->Send(new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0)); } if (const auto& mon = Common->RegisterMonPage) { TString path = Sprintf("peer%04" PRIu32, PeerNodeId); @@ -509,7 +509,7 @@ namespace NActors { if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie); } - TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); break; case TEvInterconnect::TEvConnectNode::EventType: @@ -591,7 +591,7 @@ namespace NActors { // we have found cancellation request for the pending handshake request; so simply remove it from the // deque, as we are not interested in failure reason; must likely it happens because of handshake timeout if (pendingEvent->GetTypeRewrite() == TEvHandshakeFail::EventType) { - TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(pendingEvent.Release())); + TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(pendingEvent.Release())); LogHandshakeFail(tmp, true); } PendingIncomingHandshakeEvents.erase(it); @@ -605,7 +605,7 @@ namespace NActors { Y_VERIFY(Session && SessionID); ValidateEvent(ev, "ForwardSessionEventToSession"); - InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID)); + InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev); } void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) { @@ -774,7 +774,7 @@ namespace NActors { for (auto& ev : PendingIncomingHandshakeEvents) { Send(ev->Sender, new TEvents::TEvPoisonPill); if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) { - TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release())); + TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(ev.Release())); LogHandshakeFail(tmp, true); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 71edfccbe2..ebf02c3f27 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -63,10 +63,10 @@ namespace NActors { TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr = nullptr); - STFUNC(StateInit) { + STATEFN(StateInit) { Bootstrap(); if (ev->Type != TEvents::TSystem::Bootstrap) { // for dynamic nodes we do not receive Bootstrap event - Receive(ev, ctx); + Receive(ev); } } @@ -180,7 +180,7 @@ namespace NActors { } else if (DynamicPtr) { PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15); if (!PassAwayScheduled) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(), {}, nullptr, 0)); PassAwayScheduled = true; } @@ -205,7 +205,7 @@ namespace NActors { if (now >= PassAwayTimestamp) { PassAway(); } else if (PassAwayTimestamp != TMonotonic::Max()) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(), {}, nullptr, 0)); } else { PassAwayScheduled = false; @@ -370,6 +370,21 @@ namespace NActors { ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func); } + void ValidateEvent(IEventHandle* ev, const char* func) { + if (SelfId().NodeId() == PeerNodeId) { + TString msg = Sprintf("Event Type# 0x%08" PRIx32 " TypeRewrite# 0x%08" PRIx32 + " from Sender# %s sent to the proxy for the node itself via Interconnect;" + " THIS IS NOT A BUG IN INTERCONNECT, check the event sender instead", + ev->Type, ev->GetTypeRewrite(), ev->Sender.ToString().data()); + LOG_ERROR_IC("ICP03", "%s", msg.data()); + Y_VERIFY_DEBUG(false, "%s", msg.data()); + } + + Y_VERIFY(ev->GetTypeRewrite() != TEvInterconnect::EvForward || ev->Recipient.NodeId() == PeerNodeId, + "Recipient/Proxy NodeId mismatch Recipient# %s Type# 0x%08" PRIx32 " PeerNodeId# %" PRIu32 " Func# %s", + ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func); + } + // Common with helpers // All proxy actors share the same information in the object // read only diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index aad8677ca4..316c233af3 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -25,7 +25,7 @@ namespace NActors { } TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { - return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); + return new IEventHandleFat(self, parentId, new TEvents::TEvBootstrap, 0); } void TInterconnectListenerTCP::Die(const TActorContext& ctx) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index dfc4d411d3..3d6d18d274 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -112,8 +112,8 @@ namespace NActors { Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly"); } - void TInterconnectSessionTCP::Forward(STATEFN_SIG) { - Proxy->ValidateEvent(ev, "Forward"); + void TInterconnectSessionTCP::Forward(TAutoPtr<IEventHandle>& ev) { + Proxy->ValidateEvent(ev.Get(), "Forward"); LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data()); ++MessagesGot; @@ -126,7 +126,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev); + const auto [dataSize, event] = oChannel.Push(*ev.Get()); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; @@ -167,7 +167,7 @@ namespace NActors { } else if (!RamInQueue) { Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1); RamInQueue = new TEvRam(true); - auto *ev = new IEventHandle(SelfId(), {}, RamInQueue); + auto *ev = new IEventHandleFat(SelfId(), {}, RamInQueue); const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod; if (batchPeriod != TDuration()) { TActivationContext::Schedule(batchPeriod, ev); @@ -179,7 +179,7 @@ namespace NActors { } } - void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) { + void TInterconnectSessionTCP::Subscribe(TAutoPtr<IEventHandle>& ev) { LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data()); const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); if (inserted) { @@ -190,7 +190,7 @@ namespace NActors { Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie); } - void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { + void TInterconnectSessionTCP::Unsubscribe(TEvents::TEvUnsubscribe::TPtr ev) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 598a5c9220..9d8bb90ecd 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -347,16 +347,16 @@ namespace NActors { void Terminate(TDisconnectReason reason); void PassAway() override; - void Forward(STATEFN_SIG); - void Subscribe(STATEFN_SIG); - void Unsubscribe(STATEFN_SIG); + void Forward(LIGHTFN_SIG); + void Subscribe(TAutoPtr<IEventHandle>& ev); + void Unsubscribe(TEvents::TEvUnsubscribe::TPtr); - STRICT_STFUNC(StateFunc, + STRICT_LIGHTFN(StateFunc, fFunc(TEvInterconnect::EvForward, Forward) cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison) fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe) fFunc(TEvents::TEvSubscribe::EventType, Subscribe) - fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe) + hFunc(TEvents::TEvUnsubscribe, Unsubscribe) cFunc(TEvFlush::EventType, HandleFlush) hFunc(TEvPollerReady, Handle) hFunc(TEvPollerRegisterResult, Handle) @@ -531,7 +531,7 @@ namespace NActors { auto sender = SelfId(); const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { auto ev = new TEvSessionBufferSizeRequest(); - return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); + return new IEventHandleFat(recp, sender, ev, IEventHandle::FlagTrackDelivery); }; RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric); Become(&TInterconnectSessionKiller::StateFunc); diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp index d460903f35..65e1d202ff 100644 --- a/library/cpp/actors/interconnect/load.cpp +++ b/library/cpp/actors/interconnect/load.cpp @@ -82,7 +82,7 @@ namespace NInterconnect { ) void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { - ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex])); + ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]).Release()); if (++SlaveIndex == Slaves.size()) { SlaveIndex = 0; } diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp index fb96a7f2ea..5619f0f113 100644 --- a/library/cpp/actors/interconnect/mock/ic_mock.cpp +++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp @@ -60,11 +60,11 @@ namespace NActors { TPeerInfo *peer = GetPeer(peerNodeId); auto guard = TReadGuard(peer->Mutex); if (peer->ActorSystem) { - peer->ActorSystem->Send(new IEventHandle(peer->ProxyId, TActorId(), new TEvInject(std::move(messages), + peer->ActorSystem->Send(new IEventHandleFat(peer->ProxyId, TActorId(), new TEvInject(std::move(messages), originScopeId, senderSessionId))); } else { for (auto&& ev : messages) { - TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); } } } @@ -78,7 +78,7 @@ namespace NActors { TPeerInfo *peer = GetPeer(peerNodeId); auto guard = TReadGuard(peer->Mutex); if (peer->ActorSystem) { - peer->ActorSystem->Send(new IEventHandle(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0)); + peer->ActorSystem->Send(new IEventHandleFat(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0)); } } @@ -114,7 +114,7 @@ namespace NActors { void Terminate() { for (auto&& ev : std::exchange(Queue, {})) { - TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); } for (const auto& kv : Subscribers) { Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second); @@ -130,7 +130,7 @@ namespace NActors { Subscribe(ev->Sender, ev->Cookie); } if (Queue.empty()) { - TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0)); + TActivationContext::Send(new IEventHandleFat(EvRam, 0, SelfId(), {}, {}, 0)); } Queue.emplace_back(ev.Release()); } @@ -193,7 +193,7 @@ namespace NActors { } template <typename TEvent> - bool CheckNodeStatus(TAutoPtr<TEventHandle<TEvent>>& ev) { + bool CheckNodeStatus(TAutoPtr<TEventHandleFat<TEvent>>& ev) { if (PeerNodeStatus != EPeerNodeStatus::EXISTS) { std::unique_ptr<IEventHandle> tmp(ev.Release()); CheckNonexistentNode(tmp); @@ -201,7 +201,7 @@ namespace NActors { } return true; } - + bool CheckNodeStatus(TAutoPtr<IEventHandle>& ev) { if (PeerNodeStatus != EPeerNodeStatus::EXISTS) { std::unique_ptr<IEventHandle> tmp(ev.Release()); @@ -210,7 +210,7 @@ namespace NActors { } return true; } - + void CheckNonexistentNode(std::unique_ptr<IEventHandle>& ev) { if (PeerNodeStatus == EPeerNodeStatus::UNKNOWN) { WaitingConnections.emplace_back(ev.release()); @@ -224,15 +224,15 @@ namespace NActors { if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie); } - TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); break; - - case TEvents::TEvSubscribe::EventType: + + case TEvents::TEvSubscribe::EventType: case TEvInterconnect::TEvConnectNode::EventType: Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie); break; - case TEvents::TEvUnsubscribe::EventType: + case TEvents::TEvUnsubscribe::EventType: break; default: @@ -252,7 +252,7 @@ namespace NActors { while (!WaitingConnections.empty()) { TAutoPtr<IEventHandle> tmp(WaitingConnections.front().release()); WaitingConnections.pop_front(); - Receive(tmp, TActivationContext::AsActorContext()); + Receive(tmp); } } }; @@ -287,20 +287,28 @@ namespace NActors { return; // drop messages from other sessions } if (auto *session = GetSession()) { - for (auto&& ev : ev->Get()->Messages) { - auto fw = std::make_unique<IEventHandle>( - session->SelfId(), - ev->Type, - ev->Flags & ~IEventHandle::FlagForwardOnNondelivery, - ev->Recipient, - ev->Sender, - ev->ReleaseChainBuffer(), - ev->Cookie, - msg->OriginScopeId, - std::move(ev->TraceId) - ); - if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { - TActivationContext::Send(fw.release()); + for (auto&& evb : ev->Get()->Messages) { + if (ev->IsEventLight()) { + // TODO(xenoxeno): + //if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { + TActivationContext::Send(evb.release()); + //} + } else { + auto* ev = IEventHandleFat::GetFat(evb); + auto fw = std::make_unique<IEventHandleFat>( + session->SelfId(), + ev->Type, + ev->Flags & ~IEventHandle::FlagForwardOnNondelivery, + ev->Recipient, + ev->Sender, + ev->ReleaseChainBuffer(), + ev->Cookie, + msg->OriginScopeId, + std::move(ev->TraceId) + ); + if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { + TActivationContext::Send(fw.release()); + } } } } @@ -322,8 +330,7 @@ namespace NActors { void HandleSessionEvent(TAutoPtr<IEventHandle> ev) { auto *session = GetSession(); - InvokeOtherActor(*session, &TSessionMockActor::Receive, ev, - TActivationContext::ActorContextFor(session->SelfId())); + InvokeOtherActor(*session, &TSessionMockActor::Receive, ev); } void Disconnect() { @@ -344,7 +351,7 @@ namespace NActors { return State.Inject(PeerNodeId, std::move(messages), Common->LocalScopeId, Session->SessionId); } - STRICT_STFUNC(StateFunc, + STRICT_LIGHTFN(StateFunc, cFunc(TEvents::TSystem::Poison, PassAway) fFunc(TEvInterconnect::EvForward, HandleSessionEvent) fFunc(TEvInterconnect::EvConnectNode, HandleSessionEvent) diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp index 9ba173e330..7b0bdf3e17 100644 --- a/library/cpp/actors/interconnect/packet.cpp +++ b/library/cpp/actors/interconnect/packet.cpp @@ -17,14 +17,25 @@ ui32 TEventHolder::Fill(IEventHandle& ev) { EventActuallySerialized = 0; Descr.Checksum = 0; - if (ev.HasBuffer()) { - Buffer = ev.ReleaseChainBuffer(); - EventSerializedSize = Buffer->GetSize(); - } else if (ev.HasEvent()) { - Event.Reset(ev.ReleaseBase()); - EventSerializedSize = Event->CalculateSerializedSize(); + if (ev.IsEventLight()) { + if (ev.IsEventSerializable()) { + NActors::IEventHandleLightSerializable& serializable(*NActors::IEventHandleLightSerializable::GetLightSerializable(&ev)); + EventSerializer = serializable.Serializer; + EventSerializedSize = 100; + } else { + EventSerializedSize = 0; + } } else { - EventSerializedSize = 0; + auto& evFat = *IEventHandleFat::GetFat(&ev); + if (evFat.HasBuffer()) { + Buffer = evFat.ReleaseChainBuffer(); + EventSerializedSize = Buffer->GetSize(); + } else if (evFat.HasEvent()) { + Event.Reset(evFat.ReleaseBase()); + EventSerializedSize = Event->CalculateSerializedSize(); + } else { + EventSerializedSize = 0; + } } return EventSerializedSize; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index c8909c08a7..c06e648541 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -113,6 +113,7 @@ struct TEventHolder : TNonCopyable { TActorId ForwardRecipient; THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; + NActors::TEventSerializer EventSerializer; ui64 Serial; ui32 EventSerializedSize; ui32 EventActuallySerialized; @@ -137,10 +138,11 @@ struct TEventHolder : TNonCopyable { const TActorId& s = d.Sender; const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; Span.EndError("nondelivery"); - auto ev = Event - ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId()) - : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId()); - NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure)); + TAutoPtr<IEventHandle> ev = Event + ? new IEventHandleFat(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId()) + : new IEventHandleFat(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId()); + ev = IEventHandle::ForwardOnNondelivery(ev, NActors::TEvents::TEvUndelivered::Disconnected, unsure); + NActors::TActivationContext::Send(ev); } void Clear() { diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index e75cbcaef4..7161c6ca90 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -104,7 +104,7 @@ namespace NActors { protected: void Notify(TSocketRecord *record, bool read, bool write) { auto issue = [&](const TActorId& recipient) { - ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write))); + ActorSystem->Send(new IEventHandleFat(recipient, {}, new TEvPollerReady(record->Socket, read, write))); }; if (read && record->ReadActorId) { issue(record->ReadActorId); diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index b1d2e02f49..9a541aeb86 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -64,6 +64,8 @@ namespace NActors { using NActors::IEventBase; using NActors::IEventHandle; +using NActors::IEventHandleFat; +using NActors::IEventHandleLight; using NActors::TActorId; using NActors::TConstIoVec; using NActors::TEventSerializedData; diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 32c8237b59..561248c3e5 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); + auto ev = MakeHolder<IEventHandleFat>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp index 3c474979dc..78158f07cc 100644 --- a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp +++ b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp @@ -106,7 +106,7 @@ void SenderThread(TMutex& lock, TActorSystem *as, ui32 nodeId, ui32 queueId, ui3 const TActorId target = MakeResponderServiceId(nodeId); for (ui32 i = 0; i < count; ++i) { const ui32 flags = IEventHandle::FlagTrackDelivery; - as->Send(new IEventHandle(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i)); + as->Send(new IEventHandleFat(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i)); } } diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp index 3596bffd5a..000f5d4b3e 100644 --- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -46,25 +46,30 @@ public: } const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie); InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data))); - TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, + TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie)); // Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl); ++NextCookie; } } - void HandlePong(TAutoPtr<IEventHandle> ev) { + void HandlePong(TAutoPtr<IEventHandle> e) { // Cerr << (TStringBuilder() << "Receive# " << ev->Cookie << Endl); - if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) { - auto& [s2cIt, hash] = it->second; - Y_VERIFY(hash == ev->GetChainBuffer()->GetString()); - SessionToCookie.erase(s2cIt); - InFlight.erase(it); - } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) { - Y_VERIFY(it->second == ev->GetChainBuffer()->GetString()); - Tentative.erase(it); + if (e->IsEventFat()) { + auto ev = IEventHandleFat::GetFat(e); + if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) { + auto& [s2cIt, hash] = it->second; + Y_VERIFY(hash == ev->GetChainBuffer()->GetString()); + SessionToCookie.erase(s2cIt); + InFlight.erase(it); + } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) { + Y_VERIFY(it->second == ev->GetChainBuffer()->GetString()); + Tentative.erase(it); + } else { + Y_FAIL("Cookie# %" PRIu64, ev->Cookie); + } } else { - Y_FAIL("Cookie# %" PRIu64, ev->Cookie); + Y_FAIL("Pong is not fat"); } IssueQueries(); } @@ -123,10 +128,13 @@ public: {} void HandlePing(TAutoPtr<IEventHandle>& ev) { - const TString& data = ev->GetChainBuffer()->GetString(); - const TString& response = MD5::CalcRaw(data); - TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), - MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie)); + if (ev->IsEventFat()) { + auto* evf = IEventHandleFat::GetFat(ev); + const TString& data = evf->GetChainBuffer()->GetString(); + const TString& response = MD5::CalcRaw(data); + TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Pong, 0, evf->Sender, SelfId(), + MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), evf->Cookie)); + } } STRICT_STFUNC(StateFunc, diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp index 23d846a2fd..745a020d2a 100644 --- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp +++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp @@ -253,7 +253,7 @@ public: private: void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) { auto ev = new TEvPollerRegister{socket, readActorId, writeActorId}; - ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); + ActorSystem_->Send(new IEventHandleFat(PollerId_, TActorId{}, ev)); } private: |