diff options
author | xenoxeno <xeno@ydb.tech> | 2023-03-09 12:10:01 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-03-09 12:10:01 +0300 |
commit | ad607bb887619f321dec03b02df8220e01b7f5aa (patch) | |
tree | 7d5c87352cbe835b56bb2bdac93b37cbdf8ead21 /library/cpp/actors/helpers | |
parent | 6324d075a5e80b6943b5de6b465b775050fe83df (diff) | |
download | ydb-ad607bb887619f321dec03b02df8220e01b7f5aa.tar.gz |
light events for actor system
Diffstat (limited to 'library/cpp/actors/helpers')
-rw-r--r-- | library/cpp/actors/helpers/flow_controlled_queue.cpp | 12 | ||||
-rw-r--r-- | library/cpp/actors/helpers/future_callback.h | 27 | ||||
-rw-r--r-- | library/cpp/actors/helpers/selfping_actor_ut.cpp | 2 |
3 files changed, 33 insertions, 8 deletions
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp index 41e0944c628..4f67f85a3ba 100644 --- a/library/cpp/actors/helpers/flow_controlled_queue.cpp +++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp @@ -102,7 +102,7 @@ class TFlowControlledRequestQueue : public IActorCallback { Subscribed = true; } - TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie)); + TActivationContext::Send(new IEventHandleFat(Target, reqActorId, IEventHandleFat::GetFat(ev.Get())->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie)); } void PumpQueue() { @@ -123,7 +123,7 @@ class TFlowControlledRequestQueue : public IActorCallback { if (reqActor) { if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) { TActivationContext::Send( - new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie) + new IEventHandleFat(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie) ); } reqActor->PassAway(); @@ -136,7 +136,7 @@ class TFlowControlledRequestQueue : public IActorCallback { const auto reason = TEvents::TEvUndelivered::Disconnected; if (ev->Flags & IEventHandle::FlagTrackDelivery) { TActivationContext::Send( - new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie) + new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie) ); } } @@ -175,8 +175,8 @@ public: auto it = Find(RegisteredRequests, reqActor); if (it == RegisteredRequests.end()) return; - - TActivationContext::Send(ev->Forward(reqActor->Source)); + IEventHandle::Forward(ev, reqActor->Source); + TActivationContext::Send(ev); const TDuration reqLatency = reqActor->AccumulatedLatency(); if (reqLatency < MinimalSeenLatency) MinimalSeenLatency = reqLatency; @@ -190,7 +190,7 @@ public: if (it == RegisteredRequests.end()) return; - TActivationContext::Send(ev->Forward(reqActor->Source)); + TActivationContext::Send(ev->Forward(reqActor->Source).Release()); *it = nullptr; PumpQueue(); diff --git a/library/cpp/actors/helpers/future_callback.h b/library/cpp/actors/helpers/future_callback.h index 8ca0d99fdae..4db11c73132 100644 --- a/library/cpp/actors/helpers/future_callback.h +++ b/library/cpp/actors/helpers/future_callback.h @@ -7,7 +7,7 @@ namespace NActors { template <typename EventType> struct TActorFutureCallback : TActor<TActorFutureCallback<EventType>> { - using TCallback = std::function<void(TAutoPtr<TEventHandle<EventType>>&)>; + using TCallback = std::function<void(TAutoPtr<TEventHandleFat<EventType>>&)>; using TBase = TActor<TActorFutureCallback<EventType>>; TCallback Callback; @@ -30,4 +30,29 @@ struct TActorFutureCallback : TActor<TActorFutureCallback<EventType>> { } }; +template <typename EventType> +struct TActorFutureCallbackLight : TActor<TActorFutureCallbackLight<EventType>> { + using TCallback = std::function<void(TAutoPtr<EventType>&)>; + using TBase = TActor<TActorFutureCallbackLight<EventType>>; + TCallback Callback; + + static constexpr IActor::EActivityType ActorActivityType() { + return IActor::ACTOR_FUTURE_CALLBACK; + } + + TActorFutureCallbackLight(TCallback&& callback) + : TBase(&TActorFutureCallbackLight::StateWaitForEvent) + , Callback(std::move(callback)) + {} + + STRICT_LIGHTFN(StateWaitForEvent, + hFunc(EventType, Handle) + ) + + void Handle(typename EventType::TPtr ev) { + Callback(ev); + TBase::PassAway(); + } +}; + } // NActors diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp index 542f817755c..ed4c0972fb1 100644 --- a/library/cpp/actors/helpers/selfping_actor_ut.cpp +++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp @@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) { const TActorId actorId = runtime->Register(actor); Y_UNUSED(actorId); - //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0))); + //runtime.Send(new IEventHandleFat(actorId, sender, new TEvSelfPing::TEvPing(0.0))); // TODO check after events are handled //Sleep(TDuration::Seconds(1)); |