#include "multiclient.h"
#include "utils.h"

#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>

#include <atomic>

namespace {
    using namespace NNeh;

    struct TCompareDeadline {
        template <class T>
        static inline bool Compare(const T& l, const T& r) noexcept {
            return l.Deadline() < r.Deadline() || (l.Deadline() == r.Deadline() && &l < &r);
        }
    };

    class TMultiClient: public IMultiClient, public TThrRefBase {
        class TRequestSupervisor: public TRbTreeItem<TRequestSupervisor, TCompareDeadline>, public IOnRecv, public TThrRefBase, public TNonCopyable {
        private:
            TRequestSupervisor() {
            } //disable

        public:
            inline TRequestSupervisor(const TRequest& request, TMultiClient* mc) noexcept
                : MC_(mc)
                , Request_(request)
                , Maked_(0)
                , FinishOnMakeRequest_(0)
                , Handled_(0)
                , Dequeued_(false)
            {
            }

            inline TInstant Deadline() const noexcept {
                return Request_.Deadline;
            }

            //not thread safe (can be called at some time from TMultiClient::Request() and TRequestSupervisor::OnNotify())
            void OnMakeRequest(THandleRef h) noexcept {
                //request can be mark as maked only once, so only one/first call set handle
                if (AtomicCas(&Maked_, 1, 0)) {
                    H_.Swap(h);
                    //[paranoid mode on] make sure handle be initiated before return
                    AtomicSet(FinishOnMakeRequest_, 1);
                } else {
                    while (!AtomicGet(FinishOnMakeRequest_)) {
                        SpinLockPause();
                    }
                    //[paranoid mode off]
                }
            }

            void FillEvent(TEvent& ev) noexcept {
                ev.Hndl = H_;
                FillEventUserData(ev);
            }

            void FillEventUserData(TEvent& ev) noexcept {
                ev.UserData = Request_.UserData;
            }

            void ResetRequest() noexcept { //destroy keepaliving cross-ref TRequestSupervisor<->THandle
                H_.Drop();
            }

            //method OnProcessRequest() & OnProcessResponse() executed from Wait() context (thread)
            void OnEndProcessRequest() {
                Dequeued_ = true;
                if (Y_UNLIKELY(IsHandled())) {
                    ResetRequest(); //race - response already handled before processing request from queue
                } else {
                    MC_->RegisterRequest(this);
                }
            }

            void OnEndProcessResponse() {
                if (Y_LIKELY(Dequeued_)) {
                    UnLink();
                    ResetRequest();
                } //else request yet not dequeued/registered, so we not need unlink request
                  //(when we later dequeue request OnEndProcessRequest()...IsHandled() return true and we reset request)
            }

            //IOnRecv interface
            void OnNotify(THandle& h) override {
                if (Y_LIKELY(MarkAsHandled())) {
                    THandleRef hr(&h);
                    OnMakeRequest(hr); //fix race with receiving response before return control from NNeh::Request()
                    MC_->ScheduleResponse(this, hr);
                }
            }

            void OnRecv(THandle&) noexcept override {
                UnRef();
            }

            void OnEnd() noexcept override {
                UnRef();
            }
            //

            //request can be handled only once, so only one/first call MarkAsHandled() return true
            bool MarkAsHandled() noexcept {
                return AtomicCas(&Handled_, 1, 0);
            }

            bool IsHandled() const noexcept {
                return AtomicGet(Handled_);
            }

        private:
            TIntrusivePtr<TMultiClient> MC_;
            TRequest Request_;
            THandleRef H_;
            TAtomic Maked_;
            TAtomic FinishOnMakeRequest_;
            TAtomic Handled_;
            bool Dequeued_;
        };

        typedef TRbTree<TRequestSupervisor, TCompareDeadline> TRequestsSupervisors;
        typedef TIntrusivePtr<TRequestSupervisor> TRequestSupervisorRef;

    public:
        TMultiClient()
            : Interrupt_(false)
            , NearDeadline_(TInstant::Max().GetValue())
            , E_(::TSystemEvent::rAuto)
            , Shutdown_(false)
        {
        }

        struct TResetRequest {
            inline void operator()(TRequestSupervisor& rs) const noexcept {
                rs.ResetRequest();
            }
        };

        void Shutdown() {
            //reset THandleRef's for all exist supervisors and jobs queue (+prevent creating new)
            //- so we break crossref-chain, which prevent destroy this object THande->TRequestSupervisor->TMultiClient)
            Shutdown_ = true;
            RS_.ForEachNoOrder(TResetRequest());
            RS_.Clear();
            CleanQueue();
        }

    private:
        class IJob {
        public:
            virtual ~IJob() {
            }
            virtual bool Process(TEvent&) = 0;
            virtual void Cancel() = 0;
        };
        typedef TAutoPtr<IJob> TJobPtr;

        class TNewRequest: public IJob {
        public:
            TNewRequest(TRequestSupervisorRef& rs)
                : RS_(rs)
            {
            }

        private:
            bool Process(TEvent&) override {
                RS_->OnEndProcessRequest();
                return false;
            }

            void Cancel() override {
                RS_->ResetRequest();
            }

            TRequestSupervisorRef RS_;
        };

        class TNewResponse: public IJob {
        public:
            TNewResponse(TRequestSupervisor* rs, THandleRef& h) noexcept
                : RS_(rs)
                , H_(h)
            {
            }

        private:
            bool Process(TEvent& ev) override {
                ev.Type = TEvent::Response;
                ev.Hndl = H_;
                RS_->FillEventUserData(ev);
                RS_->OnEndProcessResponse();
                return true;
            }

            void Cancel() override {
                RS_->ResetRequest();
            }

            TRequestSupervisorRef RS_;
            THandleRef H_;
        };

    public:
        THandleRef Request(const TRequest& request) override {
            TIntrusivePtr<TRequestSupervisor> rs(new TRequestSupervisor(request, this));
            THandleRef h;
            try {
                rs->Ref();
                h = NNeh::Request(request.Msg, rs.Get());
                //accurately handle race when processing new request event
                //(we already can receive response (call OnNotify) before we schedule info about new request here)
            } catch (...) {
                rs->UnRef();
                throw;
            }
            rs->OnMakeRequest(h);
            ScheduleRequest(rs, h, request.Deadline);
            return h;
        }

        bool Wait(TEvent& ev, const TInstant deadline_ = TInstant::Max()) override {
            while (!Interrupt_) {
                TInstant deadline = deadline_;
                const TInstant now = TInstant::Now();
                if (deadline != TInstant::Max() && now >= deadline) {
                    break;
                }

                { //process jobs queue (requests/responses info)
                    TAutoPtr<IJob> j;
                    while (JQ_.Dequeue(&j)) {
                        if (j->Process(ev)) {
                            return true;
                        }
                    }
                }

                if (!RS_.Empty()) {
                    TRequestSupervisor* nearRS = &*RS_.Begin();
                    if (nearRS->Deadline() <= now) {
                        if (!nearRS->MarkAsHandled()) {
                            //race with notify, - now in queue must exist response job for this request
                            continue;
                        }
                        ev.Type = TEvent::Timeout;
                        nearRS->FillEvent(ev);
                        nearRS->ResetRequest();
                        nearRS->UnLink();
                        return true;
                    }
                    deadline = Min(nearRS->Deadline(), deadline);
                }

                if (SetNearDeadline(deadline)) {
                    continue; //update deadline to more far time, so need re-check queue for avoiding race
                }

                E_.WaitD(deadline);
            }
            Interrupt_ = false;
            return false;
        }

        void Interrupt() override {
            Interrupt_ = true;
            Signal();
        }

        size_t QueueSize() override {
            return JQ_.Size();
        }

    private:
        void Signal() {
            //TODO:try optimize - hack with skipping signaling if not have waiters (reduce mutex usage)
            E_.Signal();
        }

        void ScheduleRequest(TIntrusivePtr<TRequestSupervisor>& rs, const THandleRef& h, const TInstant& deadline) {
            TJobPtr j(new TNewRequest(rs));
            JQ_.Enqueue(j);
            if (!h->Signalled()) {
                if (deadline.GetValue() < GetNearDeadline_()) {
                    Signal();
                }
            }
        }

        void ScheduleResponse(TRequestSupervisor* rs, THandleRef& h) {
            TJobPtr j(new TNewResponse(rs, h));
            JQ_.Enqueue(j);
            if (Y_UNLIKELY(Shutdown_)) {
                CleanQueue();
            } else {
                Signal();
            }
        }

        //return true, if deadline re-installed to more late time
        bool SetNearDeadline(const TInstant& deadline) {
            bool deadlineMovedFurther = deadline.GetValue() > GetNearDeadline_();
            SetNearDeadline_(deadline.GetValue());
            return deadlineMovedFurther;
        }

        //used only from Wait()
        void RegisterRequest(TRequestSupervisor* rs) {
            if (rs->Deadline() != TInstant::Max()) {
                RS_.Insert(rs);
            } else {
                rs->ResetRequest(); //prevent blocking destruction 'endless' requests
            }
        }

        void CleanQueue() {
            TAutoPtr<IJob> j;
            while (JQ_.Dequeue(&j)) {
                j->Cancel();
            }
        }

    private:
        void SetNearDeadline_(const TInstant::TValue& v) noexcept {
            TGuard<TAdaptiveLock> g(NDLock_);
            NearDeadline_.store(v, std::memory_order_release);
        }

        TInstant::TValue GetNearDeadline_() const noexcept {
            TGuard<TAdaptiveLock> g(NDLock_);
            return NearDeadline_.load(std::memory_order_acquire);
        }

        NNeh::TAutoLockFreeQueue<IJob> JQ_;
        TAtomicBool Interrupt_;
        TRequestsSupervisors RS_;
        TAdaptiveLock NDLock_;
        std::atomic<TInstant::TValue> NearDeadline_;
        ::TSystemEvent E_;
        TAtomicBool Shutdown_;
    };

    class TMultiClientAutoShutdown: public IMultiClient {
    public:
        TMultiClientAutoShutdown()
            : MC_(new TMultiClient())
        {
        }

        ~TMultiClientAutoShutdown() override {
            MC_->Shutdown();
        }

        size_t QueueSize() override {
            return MC_->QueueSize();
        }

    private:
        THandleRef Request(const TRequest& req) override {
            return MC_->Request(req);
        }

        bool Wait(TEvent& ev, TInstant deadline = TInstant::Max()) override {
            return MC_->Wait(ev, deadline);
        }

        void Interrupt() override {
            return MC_->Interrupt();
        }

    private:
        TIntrusivePtr<TMultiClient> MC_;
    };
}

TMultiClientPtr NNeh::CreateMultiClient() {
    return new TMultiClientAutoShutdown();
}