diff options
author | xenoxeno <xeno@ydb.tech> | 2023-03-09 12:10:01 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-03-09 12:10:01 +0300 |
commit | ad607bb887619f321dec03b02df8220e01b7f5aa (patch) | |
tree | 7d5c87352cbe835b56bb2bdac93b37cbdf8ead21 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp | |
parent | 6324d075a5e80b6943b5de6b465b775050fe83df (diff) | |
download | ydb-ad607bb887619f321dec03b02df8220e01b7f5aa.tar.gz |
light events for actor system
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index dfc4d411d3..3d6d18d274 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -112,8 +112,8 @@ namespace NActors { Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly"); } - void TInterconnectSessionTCP::Forward(STATEFN_SIG) { - Proxy->ValidateEvent(ev, "Forward"); + void TInterconnectSessionTCP::Forward(TAutoPtr<IEventHandle>& ev) { + Proxy->ValidateEvent(ev.Get(), "Forward"); LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data()); ++MessagesGot; @@ -126,7 +126,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev); + const auto [dataSize, event] = oChannel.Push(*ev.Get()); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; @@ -167,7 +167,7 @@ namespace NActors { } else if (!RamInQueue) { Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1); RamInQueue = new TEvRam(true); - auto *ev = new IEventHandle(SelfId(), {}, RamInQueue); + auto *ev = new IEventHandleFat(SelfId(), {}, RamInQueue); const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod; if (batchPeriod != TDuration()) { TActivationContext::Schedule(batchPeriod, ev); @@ -179,7 +179,7 @@ namespace NActors { } } - void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) { + void TInterconnectSessionTCP::Subscribe(TAutoPtr<IEventHandle>& ev) { LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data()); const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); if (inserted) { @@ -190,7 +190,7 @@ namespace NActors { Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie); } - void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { + void TInterconnectSessionTCP::Unsubscribe(TEvents::TEvUnsubscribe::TPtr ev) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); } |