aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/pipequeue.h
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/pipequeue.h
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/pipequeue.h')
-rw-r--r--library/cpp/neh/pipequeue.h207
1 files changed, 207 insertions, 0 deletions
diff --git a/library/cpp/neh/pipequeue.h b/library/cpp/neh/pipequeue.h
new file mode 100644
index 00000000000..bed8d44bd2c
--- /dev/null
+++ b/library/cpp/neh/pipequeue.h
@@ -0,0 +1,207 @@
+#pragma once
+
+#include "lfqueue.h"
+
+#include <library/cpp/coroutine/engine/impl.h>
+#include <library/cpp/coroutine/engine/network.h>
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <util/system/pipe.h>
+
+#ifdef _linux_
+#include <sys/eventfd.h>
+#endif
+
+#if defined(_bionic_) && !defined(EFD_SEMAPHORE)
+#define EFD_SEMAPHORE 1
+#endif
+
+namespace NNeh {
+#ifdef _linux_
+ class TSemaphoreEventFd {
+ public:
+ inline TSemaphoreEventFd() {
+ F_ = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
+ if (F_ < 0) {
+ ythrow TFileError() << "failed to create a eventfd";
+ }
+ }
+
+ inline ~TSemaphoreEventFd() {
+ close(F_);
+ }
+
+ inline size_t Acquire(TCont* c) {
+ ui64 ev;
+ return NCoro::ReadI(c, F_, &ev, sizeof ev).Processed();
+ }
+
+ inline void Release() {
+ const static ui64 ev(1);
+ (void)write(F_, &ev, sizeof ev);
+ }
+
+ private:
+ int F_;
+ };
+#endif
+
+ class TSemaphorePipe {
+ public:
+ inline TSemaphorePipe() {
+ TPipeHandle::Pipe(S_[0], S_[1]);
+
+ SetNonBlock(S_[0]);
+ SetNonBlock(S_[1]);
+ }
+
+ inline size_t Acquire(TCont* c) {
+ char ch;
+ return NCoro::ReadI(c, S_[0], &ch, 1).Processed();
+ }
+
+ inline size_t Acquire(TCont* c, char* buff, size_t buflen) {
+ return NCoro::ReadI(c, S_[0], buff, buflen).Processed();
+ }
+
+ inline void Release() {
+ char ch = 13;
+ S_[1].Write(&ch, 1);
+ }
+
+ private:
+ TPipeHandle S_[2];
+ };
+
+ class TPipeQueueBase {
+ public:
+ inline void Enqueue(void* job) {
+ Q_.Enqueue(job);
+ S_.Release();
+ }
+
+ inline void* Dequeue(TCont* c, char* ch, size_t buflen) {
+ void* ret = nullptr;
+
+ while (!Q_.Dequeue(&ret) && S_.Acquire(c, ch, buflen)) {
+ }
+
+ return ret;
+ }
+
+ inline void* Dequeue() noexcept {
+ void* ret = nullptr;
+
+ Q_.Dequeue(&ret);
+
+ return ret;
+ }
+
+ private:
+ TLockFreeQueue<void*> Q_;
+ TSemaphorePipe S_;
+ };
+
+ template <class T, size_t buflen = 1>
+ class TPipeQueue {
+ public:
+ template <class TPtr>
+ inline void EnqueueSafe(TPtr req) {
+ Enqueue(req.Get());
+ req.Release();
+ }
+
+ inline void Enqueue(T* req) {
+ Q_.Enqueue(req);
+ }
+
+ template <class TPtr>
+ inline void DequeueSafe(TCont* c, TPtr& ret) {
+ ret.Reset(Dequeue(c));
+ }
+
+ inline T* Dequeue(TCont* c) {
+ char ch[buflen];
+
+ return (T*)Q_.Dequeue(c, ch, sizeof(ch));
+ }
+
+ protected:
+ TPipeQueueBase Q_;
+ };
+
+ //optimized for avoiding unnecessary usage semaphore + use eventfd on linux
+ template <class T>
+ struct TOneConsumerPipeQueue {
+ inline TOneConsumerPipeQueue()
+ : Signaled_(0)
+ , SkipWait_(0)
+ {
+ }
+
+ inline void Enqueue(T* job) {
+ Q_.Enqueue(job);
+
+ AtomicSet(SkipWait_, 1);
+ if (AtomicCas(&Signaled_, 1, 0)) {
+ S_.Release();
+ }
+ }
+
+ inline T* Dequeue(TCont* c) {
+ T* ret = nullptr;
+
+ while (!Q_.Dequeue(&ret)) {
+ AtomicSet(Signaled_, 0);
+ if (!AtomicCas(&SkipWait_, 0, 1)) {
+ if (!S_.Acquire(c)) {
+ break;
+ }
+ }
+ AtomicSet(Signaled_, 1);
+ }
+
+ return ret;
+ }
+
+ template <class TPtr>
+ inline void EnqueueSafe(TPtr req) {
+ Enqueue(req.Get());
+ Y_UNUSED(req.Release());
+ }
+
+ template <class TPtr>
+ inline void DequeueSafe(TCont* c, TPtr& ret) {
+ ret.Reset(Dequeue(c));
+ }
+
+ protected:
+ TLockFreeQueue<T*> Q_;
+#ifdef _linux_
+ TSemaphoreEventFd S_;
+#else
+ TSemaphorePipe S_;
+#endif
+ TAtomic Signaled_;
+ TAtomic SkipWait_;
+ };
+
+ template <class T, size_t buflen = 1>
+ struct TAutoPipeQueue: public TPipeQueue<T, buflen> {
+ ~TAutoPipeQueue() {
+ while (T* t = (T*)TPipeQueue<T, buflen>::Q_.Dequeue()) {
+ delete t;
+ }
+ }
+ };
+
+ template <class T>
+ struct TAutoOneConsumerPipeQueue: public TOneConsumerPipeQueue<T> {
+ ~TAutoOneConsumerPipeQueue() {
+ T* ret = nullptr;
+
+ while (TOneConsumerPipeQueue<T>::Q_.Dequeue(&ret)) {
+ delete ret;
+ }
+ }
+ };
+}