aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/actors/helpers/flow_controlled_queue.cpp
blob: d75cc5402362c54e3e1e541b1b909069e420750f (plain) (tree)
1
2
3
4
                                  
                                                 














                                                              
                          


                            
                                                                                                                            























                                                                                              
                          







































































                                                                                                                                                 
                                                                                                                                             



























                                                                                                                                         
                                                                                                         




















































                                                                                                             
                                                                                                                      


                                                                       
#include "flow_controlled_queue.h"

#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/util/datetime.h>

#include <util/generic/deque.h>
#include <util/datetime/cputimer.h>
#include <util/generic/algorithm.h>

namespace NActors {

class TFlowControlledRequestQueue;

class TFlowControlledRequestActor : public IActor {
    TFlowControlledRequestQueue * const QueueActor;

    void HandleReply(TAutoPtr<IEventHandle> &ev);
    void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev);
public:
    const TActorId Source;
    const ui64 Cookie;
    const ui32 Flags;
    const ui64 StartCounter;

    TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags)
        : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity)
        , QueueActor(queue)
        , Source(source)
        , Cookie(cookie)
        , Flags(flags)
        , StartCounter(GetCycleCountFast())
    {}

    STATEFN(StateWait) {
        switch (ev->GetTypeRewrite()) {
            hFunc(TEvents::TEvUndelivered, HandleUndelivered);
        default:
            HandleReply(ev);
        }
    }

    TDuration AccumulatedLatency() const {
        const ui64 cc = GetCycleCountFast() - StartCounter;
        return CyclesToDuration(cc);
    }

    using IActor::PassAway;
};

class TFlowControlledRequestQueue : public IActor {
    const TActorId Target;
    const TFlowControlledQueueConfig Config;

    TDeque<THolder<IEventHandle>> UnhandledRequests;
    TDeque<TFlowControlledRequestActor *> RegisteredRequests;

    bool Subscribed = false;

    TDuration MinimalSeenLatency;

    bool CanRegister() {
        const ui64 inFly = RegisteredRequests.size();
        if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0
            return true;

        if (inFly >= Config.MaxAllowedInFly)
            return false;

        if (Config.TargetDynamicRate) {
            if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) {
                if (inFly >= dynMax)
                    return false;
            }
        }

        const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency();
        if (currentLatency <= Config.MinTrackedLatency)
            return true;

        if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor)
            return true;

        return false;
    }

    void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) {
        if (CanRegister()) {
            RegisterReqActor(ev);
        } else {
            UnhandledRequests.emplace_back(ev.Release());
        }
    }

    void RegisterReqActor(THolder<IEventHandle> ev) {
        TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags);
        const TActorId reqActorId = RegisterWithSameMailbox(reqActor);
        RegisteredRequests.emplace_back(reqActor);

        if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) {
            Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery);
            Subscribed = true;
        }

        TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
    }

    void PumpQueue() {
        while (RegisteredRequests && RegisteredRequests.front() == nullptr)
            RegisteredRequests.pop_front();

        while (UnhandledRequests && CanRegister()) {
            RegisterReqActor(std::move(UnhandledRequests.front()));
            UnhandledRequests.pop_front();
        }
    }

    void HandleDisconnected() {
        Subscribed = false;

        const ui32 nodeid = Target.NodeId();
        for (TFlowControlledRequestActor *reqActor : RegisteredRequests) {
            if (reqActor) {
                if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
                    TActivationContext::Send(
                        new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
                    );
                }
                reqActor->PassAway();
            }
        }

        RegisteredRequests.clear();

        for (auto &ev : UnhandledRequests) {
            const auto reason = TEvents::TEvUndelivered::Disconnected;
            if (ev->Flags & IEventHandle::FlagTrackDelivery) {
                TActivationContext::Send(
                    new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
                );
            }
        }

        UnhandledRequests.clear();
    }

    void HandlePoison() {
        HandleDisconnected();

        if (SelfId().NodeId() != Target.NodeId())
            Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe());

        PassAway();
    }
public:
    TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config)
        : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity)
        , Target(target)
        , Config(config)
        , MinimalSeenLatency(TDuration::Seconds(1))
    {}

    STATEFN(StateWork) {
        switch (ev->GetTypeRewrite()) {
            cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected);
            IgnoreFunc(TEvInterconnect::TEvNodeConnected);
            cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected);
            cFunc(TEvents::TEvPoison::EventType, HandlePoison);
        default:
            HandleForwardedEvent(ev);
        }
    }

    void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) {
        auto it = Find(RegisteredRequests, reqActor);
        if (it == RegisteredRequests.end())
            return;

        TActivationContext::Send(ev->Forward(reqActor->Source));
        const TDuration reqLatency = reqActor->AccumulatedLatency();
        if (reqLatency < MinimalSeenLatency)
            MinimalSeenLatency = reqLatency;

        *it = nullptr;
        PumpQueue();
    }

    void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) {
        auto it = Find(RegisteredRequests, reqActor);
        if (it == RegisteredRequests.end())
            return;

        TActivationContext::Send(ev->Forward(reqActor->Source));

        *it = nullptr;
        PumpQueue();
    }
};

void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) {
    QueueActor->HandleRequestReply(ev, this);
    PassAway();
}

void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) {
    QueueActor->HandleRequestUndelivered(ev, this);
    PassAway();
}


IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) {
    return new TFlowControlledRequestQueue(targetId, activity, config);
}

}