aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/neh/rq.cpp
blob: b3ae8f470dc264be3a9ff1010810098ee027f9fe (plain) (tree)






















































































































































































































































































































                                                                           
#include "rq.h"
#include "lfqueue.h"

#include <library/cpp/threading/atomic/bool.h>

#include <util/system/tls.h>
#include <util/system/pipe.h>
#include <util/system/event.h>
#include <util/system/mutex.h>
#include <util/system/condvar.h>
#include <util/system/guard.h>
#include <util/network/socket.h>
#include <util/generic/deque.h>

using namespace NNeh;

namespace {
    class TBaseLockFreeRequestQueue: public IRequestQueue {
    public:
        void Clear() override {
            IRequestRef req;
            while (Q_.Dequeue(&req)) {
            }
        }

    protected:
        NNeh::TAutoLockFreeQueue<IRequest> Q_;
    };

    class TFdRequestQueue: public TBaseLockFreeRequestQueue {
    public:
        inline TFdRequestQueue() {
            TPipeHandle::Pipe(R_, W_);
            SetNonBlock(W_);
        }

        void Schedule(IRequestRef req) override {
            Q_.Enqueue(req);
            char ch = 42;
            W_.Write(&ch, 1);
        }

        IRequestRef Next() override {
            IRequestRef ret;

#if 0
            for (size_t i = 0; i < 20; ++i) {
                if (Q_.Dequeue(&ret)) {
                    return ret;
                }

                //asm volatile ("pause;");
            }
#endif

            while (!Q_.Dequeue(&ret)) {
                char ch;

                R_.Read(&ch, 1);
            }

            return ret;
        }

    private:
        TPipeHandle R_;
        TPipeHandle W_;
    };

    struct TNehFdEvent {
        inline TNehFdEvent() {
            TPipeHandle::Pipe(R, W);
            SetNonBlock(W);
        }

        inline void Signal() noexcept {
            char ch = 21;
            W.Write(&ch, 1);
        }

        inline void Wait() noexcept {
            char buf[128];
            R.Read(buf, sizeof(buf));
        }

        TPipeHandle R;
        TPipeHandle W;
    };

    template <class TEvent>
    class TEventRequestQueue: public TBaseLockFreeRequestQueue {
    public:
        void Schedule(IRequestRef req) override {
            Q_.Enqueue(req);
            E_.Signal();
        }

        IRequestRef Next() override {
            IRequestRef ret;

            while (!Q_.Dequeue(&ret)) {
                E_.Wait();
            }

            E_.Signal();

            return ret;
        }

    private:
        TEvent E_;
    };

    template <class TEvent>
    class TLazyEventRequestQueue: public TBaseLockFreeRequestQueue {
    public:
        void Schedule(IRequestRef req) override {
            Q_.Enqueue(req);
            if (C_.Val()) {
                E_.Signal();
            }
        }

        IRequestRef Next() override {
            IRequestRef ret;

            C_.Inc();
            while (!Q_.Dequeue(&ret)) {
                E_.Wait();
            }
            C_.Dec();

            if (Q_.Size() && C_.Val()) {
                E_.Signal();
            }

            return ret;
        }

    private:
        TEvent E_;
        TAtomicCounter C_;
    };

    class TCondVarRequestQueue: public IRequestQueue {
    public:
        void Clear() override {
            TGuard<TMutex> g(M_);
            Q_.clear();
        }

        void Schedule(IRequestRef req) override {
            {
                TGuard<TMutex> g(M_);

                Q_.push_back(req);
            }

            C_.Signal();
        }

        IRequestRef Next() override {
            TGuard<TMutex> g(M_);

            while (Q_.empty()) {
                C_.Wait(M_);
            }

            IRequestRef ret = *Q_.begin();
            Q_.pop_front();

            return ret;
        }

    private:
        TDeque<IRequestRef> Q_;
        TMutex M_;
        TCondVar C_;
    };

    class TBusyRequestQueue: public TBaseLockFreeRequestQueue {
    public:
        void Schedule(IRequestRef req) override {
            Q_.Enqueue(req);
        }

        IRequestRef Next() override {
            IRequestRef ret;

            while (!Q_.Dequeue(&ret)) {
            }

            return ret;
        }
    };

    class TSleepRequestQueue: public TBaseLockFreeRequestQueue {
    public:
        void Schedule(IRequestRef req) override {
            Q_.Enqueue(req);
        }

        IRequestRef Next() override {
            IRequestRef ret;

            while (!Q_.Dequeue(&ret)) {
                usleep(1);
            }

            return ret;
        }
    };

    struct TStupidEvent {
        inline TStupidEvent()
            : InWait(false)
        {
        }

        inline bool Signal() noexcept {
            const bool ret = InWait;
            Ev.Signal();

            return ret;
        }

        inline void Wait() noexcept {
            InWait = true;
            Ev.Wait();
            InWait = false;
        }

        TAutoEvent Ev;
        NAtomic::TBool InWait;
    };

    template <class TEvent>
    class TLFRequestQueue: public TBaseLockFreeRequestQueue {
        struct TLocalQueue: public TEvent {
        };

    public:
        void Schedule(IRequestRef req) override {
            Q_.Enqueue(req);

            for (TLocalQueue* lq = 0; FQ_.Dequeue(&lq) && !lq->Signal();) {
            }
        }

        IRequestRef Next() override {
            while (true) {
                IRequestRef ret;

                if (Q_.Dequeue(&ret)) {
                    return ret;
                }

                TLocalQueue* lq = LocalQueue();

                FQ_.Enqueue(lq);

                if (Q_.Dequeue(&ret)) {
                    TLocalQueue* besttry;

                    if (FQ_.Dequeue(&besttry)) {
                        if (besttry == lq) {
                            //huraay, get rid of spurious wakeup
                        } else {
                            FQ_.Enqueue(besttry);
                        }
                    }

                    return ret;
                }

                lq->Wait();
            }
        }

    private:
        static inline TLocalQueue* LocalQueue() noexcept {
            Y_POD_STATIC_THREAD(TLocalQueue*)
            lq((TLocalQueue*)0);

            if (!lq) {
                Y_STATIC_THREAD(TLocalQueue)
                slq;

                lq = &(TLocalQueue&)slq;
            }

            return lq;
        }

    private:
        TLockFreeStack<TLocalQueue*> FQ_;
    };
}

IRequestQueueRef NNeh::CreateRequestQueue() {
//return new TCondVarRequestQueue();
//return new TSleepRequestQueue();
//return new TBusyRequestQueue();
//return new TLFRequestQueue<TStupidEvent>();
#if defined(_freebsd_)
    return new TFdRequestQueue();
#endif
    //return new TFdRequestQueue();
    return new TLazyEventRequestQueue<TAutoEvent>();
    //return new TEventRequestQueue<TAutoEvent>();
    //return new TEventRequestQueue<TNehFdEvent>();
}