summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/helpers/flow_controlled_queue.cpp
diff options
context:
space:
mode:
authorxenoxeno <[email protected]>2023-03-09 12:10:01 +0300
committerxenoxeno <[email protected]>2023-03-09 12:10:01 +0300
commitad607bb887619f321dec03b02df8220e01b7f5aa (patch)
tree7d5c87352cbe835b56bb2bdac93b37cbdf8ead21 /library/cpp/actors/helpers/flow_controlled_queue.cpp
parent6324d075a5e80b6943b5de6b465b775050fe83df (diff)
light events for actor system
Diffstat (limited to 'library/cpp/actors/helpers/flow_controlled_queue.cpp')
-rw-r--r--library/cpp/actors/helpers/flow_controlled_queue.cpp12
1 files changed, 6 insertions, 6 deletions
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp
index 41e0944c628..4f67f85a3ba 100644
--- a/library/cpp/actors/helpers/flow_controlled_queue.cpp
+++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp
@@ -102,7 +102,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
Subscribed = true;
}
- TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
+ TActivationContext::Send(new IEventHandleFat(Target, reqActorId, IEventHandleFat::GetFat(ev.Get())->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
}
void PumpQueue() {
@@ -123,7 +123,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
if (reqActor) {
if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
TActivationContext::Send(
- new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
+ new IEventHandleFat(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
);
}
reqActor->PassAway();
@@ -136,7 +136,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
const auto reason = TEvents::TEvUndelivered::Disconnected;
if (ev->Flags & IEventHandle::FlagTrackDelivery) {
TActivationContext::Send(
- new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
+ new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
);
}
}
@@ -175,8 +175,8 @@ public:
auto it = Find(RegisteredRequests, reqActor);
if (it == RegisteredRequests.end())
return;
-
- TActivationContext::Send(ev->Forward(reqActor->Source));
+ IEventHandle::Forward(ev, reqActor->Source);
+ TActivationContext::Send(ev);
const TDuration reqLatency = reqActor->AccumulatedLatency();
if (reqLatency < MinimalSeenLatency)
MinimalSeenLatency = reqLatency;
@@ -190,7 +190,7 @@ public:
if (it == RegisteredRequests.end())
return;
- TActivationContext::Send(ev->Forward(reqActor->Source));
+ TActivationContext::Send(ev->Forward(reqActor->Source).Release());
*it = nullptr;
PumpQueue();