aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/helpers
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-03-09 12:10:01 +0300
committerxenoxeno <xeno@ydb.tech>2023-03-09 12:10:01 +0300
commitad607bb887619f321dec03b02df8220e01b7f5aa (patch)
tree7d5c87352cbe835b56bb2bdac93b37cbdf8ead21 /library/cpp/actors/helpers
parent6324d075a5e80b6943b5de6b465b775050fe83df (diff)
downloadydb-ad607bb887619f321dec03b02df8220e01b7f5aa.tar.gz
light events for actor system
Diffstat (limited to 'library/cpp/actors/helpers')
-rw-r--r--library/cpp/actors/helpers/flow_controlled_queue.cpp12
-rw-r--r--library/cpp/actors/helpers/future_callback.h27
-rw-r--r--library/cpp/actors/helpers/selfping_actor_ut.cpp2
3 files changed, 33 insertions, 8 deletions
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp
index 41e0944c628..4f67f85a3ba 100644
--- a/library/cpp/actors/helpers/flow_controlled_queue.cpp
+++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp
@@ -102,7 +102,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
Subscribed = true;
}
- TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
+ TActivationContext::Send(new IEventHandleFat(Target, reqActorId, IEventHandleFat::GetFat(ev.Get())->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
}
void PumpQueue() {
@@ -123,7 +123,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
if (reqActor) {
if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
TActivationContext::Send(
- new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
+ new IEventHandleFat(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
);
}
reqActor->PassAway();
@@ -136,7 +136,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
const auto reason = TEvents::TEvUndelivered::Disconnected;
if (ev->Flags & IEventHandle::FlagTrackDelivery) {
TActivationContext::Send(
- new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
+ new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
);
}
}
@@ -175,8 +175,8 @@ public:
auto it = Find(RegisteredRequests, reqActor);
if (it == RegisteredRequests.end())
return;
-
- TActivationContext::Send(ev->Forward(reqActor->Source));
+ IEventHandle::Forward(ev, reqActor->Source);
+ TActivationContext::Send(ev);
const TDuration reqLatency = reqActor->AccumulatedLatency();
if (reqLatency < MinimalSeenLatency)
MinimalSeenLatency = reqLatency;
@@ -190,7 +190,7 @@ public:
if (it == RegisteredRequests.end())
return;
- TActivationContext::Send(ev->Forward(reqActor->Source));
+ TActivationContext::Send(ev->Forward(reqActor->Source).Release());
*it = nullptr;
PumpQueue();
diff --git a/library/cpp/actors/helpers/future_callback.h b/library/cpp/actors/helpers/future_callback.h
index 8ca0d99fdae..4db11c73132 100644
--- a/library/cpp/actors/helpers/future_callback.h
+++ b/library/cpp/actors/helpers/future_callback.h
@@ -7,7 +7,7 @@ namespace NActors {
template <typename EventType>
struct TActorFutureCallback : TActor<TActorFutureCallback<EventType>> {
- using TCallback = std::function<void(TAutoPtr<TEventHandle<EventType>>&)>;
+ using TCallback = std::function<void(TAutoPtr<TEventHandleFat<EventType>>&)>;
using TBase = TActor<TActorFutureCallback<EventType>>;
TCallback Callback;
@@ -30,4 +30,29 @@ struct TActorFutureCallback : TActor<TActorFutureCallback<EventType>> {
}
};
+template <typename EventType>
+struct TActorFutureCallbackLight : TActor<TActorFutureCallbackLight<EventType>> {
+ using TCallback = std::function<void(TAutoPtr<EventType>&)>;
+ using TBase = TActor<TActorFutureCallbackLight<EventType>>;
+ TCallback Callback;
+
+ static constexpr IActor::EActivityType ActorActivityType() {
+ return IActor::ACTOR_FUTURE_CALLBACK;
+ }
+
+ TActorFutureCallbackLight(TCallback&& callback)
+ : TBase(&TActorFutureCallbackLight::StateWaitForEvent)
+ , Callback(std::move(callback))
+ {}
+
+ STRICT_LIGHTFN(StateWaitForEvent,
+ hFunc(EventType, Handle)
+ )
+
+ void Handle(typename EventType::TPtr ev) {
+ Callback(ev);
+ TBase::PassAway();
+ }
+};
+
} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp
index 542f817755c..ed4c0972fb1 100644
--- a/library/cpp/actors/helpers/selfping_actor_ut.cpp
+++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp
@@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) {
const TActorId actorId = runtime->Register(actor);
Y_UNUSED(actorId);
- //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0)));
+ //runtime.Send(new IEventHandleFat(actorId, sender, new TEvSelfPing::TEvPing(0.0)));
// TODO check after events are handled
//Sleep(TDuration::Seconds(1));