aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/neh/pipequeue.h
blob: bed8d44bd2c2decbaa1c5ce5ee2a3424c9de5f86 (plain) (tree)













































































































































































































                                                                               
#pragma once

#include "lfqueue.h"

#include <library/cpp/coroutine/engine/impl.h>
#include <library/cpp/coroutine/engine/network.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/pipe.h>

#ifdef _linux_
#include <sys/eventfd.h>
#endif

#if defined(_bionic_) && !defined(EFD_SEMAPHORE)
#define EFD_SEMAPHORE 1
#endif

namespace NNeh {
#ifdef _linux_
    class TSemaphoreEventFd {
    public:
        inline TSemaphoreEventFd() {
            F_ = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
            if (F_ < 0) {
                ythrow TFileError() << "failed to create a eventfd";
            }
        }

        inline ~TSemaphoreEventFd() {
            close(F_);
        }

        inline size_t Acquire(TCont* c) {
            ui64 ev;
            return NCoro::ReadI(c, F_, &ev, sizeof ev).Processed();
        }

        inline void Release() {
            const static ui64 ev(1);
            (void)write(F_, &ev, sizeof ev);
        }

    private:
        int F_;
    };
#endif

    class TSemaphorePipe {
    public:
        inline TSemaphorePipe() {
            TPipeHandle::Pipe(S_[0], S_[1]);

            SetNonBlock(S_[0]);
            SetNonBlock(S_[1]);
        }

        inline size_t Acquire(TCont* c) {
            char ch;
            return NCoro::ReadI(c, S_[0], &ch, 1).Processed();
        }

        inline size_t Acquire(TCont* c, char* buff, size_t buflen) {
            return NCoro::ReadI(c, S_[0], buff, buflen).Processed();
        }

        inline void Release() {
            char ch = 13;
            S_[1].Write(&ch, 1);
        }

    private:
        TPipeHandle S_[2];
    };

    class TPipeQueueBase {
    public:
        inline void Enqueue(void* job) {
            Q_.Enqueue(job);
            S_.Release();
        }

        inline void* Dequeue(TCont* c, char* ch, size_t buflen) {
            void* ret = nullptr;

            while (!Q_.Dequeue(&ret) && S_.Acquire(c, ch, buflen)) {
            }

            return ret;
        }

        inline void* Dequeue() noexcept {
            void* ret = nullptr;

            Q_.Dequeue(&ret);

            return ret;
        }

    private:
        TLockFreeQueue<void*> Q_;
        TSemaphorePipe S_;
    };

    template <class T, size_t buflen = 1>
    class TPipeQueue {
    public:
        template <class TPtr>
        inline void EnqueueSafe(TPtr req) {
            Enqueue(req.Get());
            req.Release();
        }

        inline void Enqueue(T* req) {
            Q_.Enqueue(req);
        }

        template <class TPtr>
        inline void DequeueSafe(TCont* c, TPtr& ret) {
            ret.Reset(Dequeue(c));
        }

        inline T* Dequeue(TCont* c) {
            char ch[buflen];

            return (T*)Q_.Dequeue(c, ch, sizeof(ch));
        }

    protected:
        TPipeQueueBase Q_;
    };

    //optimized for avoiding unnecessary usage semaphore + use eventfd on linux
    template <class T>
    struct TOneConsumerPipeQueue {
        inline TOneConsumerPipeQueue()
            : Signaled_(0)
            , SkipWait_(0)
        {
        }

        inline void Enqueue(T* job) {
            Q_.Enqueue(job);

            AtomicSet(SkipWait_, 1);
            if (AtomicCas(&Signaled_, 1, 0)) {
                S_.Release();
            }
        }

        inline T* Dequeue(TCont* c) {
            T* ret = nullptr;

            while (!Q_.Dequeue(&ret)) {
                AtomicSet(Signaled_, 0);
                if (!AtomicCas(&SkipWait_, 0, 1)) {
                    if (!S_.Acquire(c)) {
                        break;
                    }
                }
                AtomicSet(Signaled_, 1);
            }

            return ret;
        }

        template <class TPtr>
        inline void EnqueueSafe(TPtr req) {
            Enqueue(req.Get());
            Y_UNUSED(req.Release());
        }

        template <class TPtr>
        inline void DequeueSafe(TCont* c, TPtr& ret) {
            ret.Reset(Dequeue(c));
        }

    protected:
        TLockFreeQueue<T*> Q_;
#ifdef _linux_
        TSemaphoreEventFd S_;
#else
        TSemaphorePipe S_;
#endif
        TAtomic Signaled_;
        TAtomic SkipWait_;
    };

    template <class T, size_t buflen = 1>
    struct TAutoPipeQueue: public TPipeQueue<T, buflen> {
        ~TAutoPipeQueue() {
            while (T* t = (T*)TPipeQueue<T, buflen>::Q_.Dequeue()) {
                delete t;
            }
        }
    };

    template <class T>
    struct TAutoOneConsumerPipeQueue: public TOneConsumerPipeQueue<T> {
        ~TAutoOneConsumerPipeQueue() {
            T* ret = nullptr;

            while (TOneConsumerPipeQueue<T>::Q_.Dequeue(&ret)) {
                delete ret;
            }
        }
    };
}