aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/rq.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/rq.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/rq.cpp')
-rw-r--r--library/cpp/neh/rq.cpp312
1 files changed, 312 insertions, 0 deletions
diff --git a/library/cpp/neh/rq.cpp b/library/cpp/neh/rq.cpp
new file mode 100644
index 0000000000..b3ae8f470d
--- /dev/null
+++ b/library/cpp/neh/rq.cpp
@@ -0,0 +1,312 @@
+#include "rq.h"
+#include "lfqueue.h"
+
+#include <library/cpp/threading/atomic/bool.h>
+
+#include <util/system/tls.h>
+#include <util/system/pipe.h>
+#include <util/system/event.h>
+#include <util/system/mutex.h>
+#include <util/system/condvar.h>
+#include <util/system/guard.h>
+#include <util/network/socket.h>
+#include <util/generic/deque.h>
+
+using namespace NNeh;
+
+namespace {
+ class TBaseLockFreeRequestQueue: public IRequestQueue {
+ public:
+ void Clear() override {
+ IRequestRef req;
+ while (Q_.Dequeue(&req)) {
+ }
+ }
+
+ protected:
+ NNeh::TAutoLockFreeQueue<IRequest> Q_;
+ };
+
+ class TFdRequestQueue: public TBaseLockFreeRequestQueue {
+ public:
+ inline TFdRequestQueue() {
+ TPipeHandle::Pipe(R_, W_);
+ SetNonBlock(W_);
+ }
+
+ void Schedule(IRequestRef req) override {
+ Q_.Enqueue(req);
+ char ch = 42;
+ W_.Write(&ch, 1);
+ }
+
+ IRequestRef Next() override {
+ IRequestRef ret;
+
+#if 0
+ for (size_t i = 0; i < 20; ++i) {
+ if (Q_.Dequeue(&ret)) {
+ return ret;
+ }
+
+ //asm volatile ("pause;");
+ }
+#endif
+
+ while (!Q_.Dequeue(&ret)) {
+ char ch;
+
+ R_.Read(&ch, 1);
+ }
+
+ return ret;
+ }
+
+ private:
+ TPipeHandle R_;
+ TPipeHandle W_;
+ };
+
+ struct TNehFdEvent {
+ inline TNehFdEvent() {
+ TPipeHandle::Pipe(R, W);
+ SetNonBlock(W);
+ }
+
+ inline void Signal() noexcept {
+ char ch = 21;
+ W.Write(&ch, 1);
+ }
+
+ inline void Wait() noexcept {
+ char buf[128];
+ R.Read(buf, sizeof(buf));
+ }
+
+ TPipeHandle R;
+ TPipeHandle W;
+ };
+
+ template <class TEvent>
+ class TEventRequestQueue: public TBaseLockFreeRequestQueue {
+ public:
+ void Schedule(IRequestRef req) override {
+ Q_.Enqueue(req);
+ E_.Signal();
+ }
+
+ IRequestRef Next() override {
+ IRequestRef ret;
+
+ while (!Q_.Dequeue(&ret)) {
+ E_.Wait();
+ }
+
+ E_.Signal();
+
+ return ret;
+ }
+
+ private:
+ TEvent E_;
+ };
+
+ template <class TEvent>
+ class TLazyEventRequestQueue: public TBaseLockFreeRequestQueue {
+ public:
+ void Schedule(IRequestRef req) override {
+ Q_.Enqueue(req);
+ if (C_.Val()) {
+ E_.Signal();
+ }
+ }
+
+ IRequestRef Next() override {
+ IRequestRef ret;
+
+ C_.Inc();
+ while (!Q_.Dequeue(&ret)) {
+ E_.Wait();
+ }
+ C_.Dec();
+
+ if (Q_.Size() && C_.Val()) {
+ E_.Signal();
+ }
+
+ return ret;
+ }
+
+ private:
+ TEvent E_;
+ TAtomicCounter C_;
+ };
+
+ class TCondVarRequestQueue: public IRequestQueue {
+ public:
+ void Clear() override {
+ TGuard<TMutex> g(M_);
+ Q_.clear();
+ }
+
+ void Schedule(IRequestRef req) override {
+ {
+ TGuard<TMutex> g(M_);
+
+ Q_.push_back(req);
+ }
+
+ C_.Signal();
+ }
+
+ IRequestRef Next() override {
+ TGuard<TMutex> g(M_);
+
+ while (Q_.empty()) {
+ C_.Wait(M_);
+ }
+
+ IRequestRef ret = *Q_.begin();
+ Q_.pop_front();
+
+ return ret;
+ }
+
+ private:
+ TDeque<IRequestRef> Q_;
+ TMutex M_;
+ TCondVar C_;
+ };
+
+ class TBusyRequestQueue: public TBaseLockFreeRequestQueue {
+ public:
+ void Schedule(IRequestRef req) override {
+ Q_.Enqueue(req);
+ }
+
+ IRequestRef Next() override {
+ IRequestRef ret;
+
+ while (!Q_.Dequeue(&ret)) {
+ }
+
+ return ret;
+ }
+ };
+
+ class TSleepRequestQueue: public TBaseLockFreeRequestQueue {
+ public:
+ void Schedule(IRequestRef req) override {
+ Q_.Enqueue(req);
+ }
+
+ IRequestRef Next() override {
+ IRequestRef ret;
+
+ while (!Q_.Dequeue(&ret)) {
+ usleep(1);
+ }
+
+ return ret;
+ }
+ };
+
+ struct TStupidEvent {
+ inline TStupidEvent()
+ : InWait(false)
+ {
+ }
+
+ inline bool Signal() noexcept {
+ const bool ret = InWait;
+ Ev.Signal();
+
+ return ret;
+ }
+
+ inline void Wait() noexcept {
+ InWait = true;
+ Ev.Wait();
+ InWait = false;
+ }
+
+ TAutoEvent Ev;
+ NAtomic::TBool InWait;
+ };
+
+ template <class TEvent>
+ class TLFRequestQueue: public TBaseLockFreeRequestQueue {
+ struct TLocalQueue: public TEvent {
+ };
+
+ public:
+ void Schedule(IRequestRef req) override {
+ Q_.Enqueue(req);
+
+ for (TLocalQueue* lq = 0; FQ_.Dequeue(&lq) && !lq->Signal();) {
+ }
+ }
+
+ IRequestRef Next() override {
+ while (true) {
+ IRequestRef ret;
+
+ if (Q_.Dequeue(&ret)) {
+ return ret;
+ }
+
+ TLocalQueue* lq = LocalQueue();
+
+ FQ_.Enqueue(lq);
+
+ if (Q_.Dequeue(&ret)) {
+ TLocalQueue* besttry;
+
+ if (FQ_.Dequeue(&besttry)) {
+ if (besttry == lq) {
+ //huraay, get rid of spurious wakeup
+ } else {
+ FQ_.Enqueue(besttry);
+ }
+ }
+
+ return ret;
+ }
+
+ lq->Wait();
+ }
+ }
+
+ private:
+ static inline TLocalQueue* LocalQueue() noexcept {
+ Y_POD_STATIC_THREAD(TLocalQueue*)
+ lq((TLocalQueue*)0);
+
+ if (!lq) {
+ Y_STATIC_THREAD(TLocalQueue)
+ slq;
+
+ lq = &(TLocalQueue&)slq;
+ }
+
+ return lq;
+ }
+
+ private:
+ TLockFreeStack<TLocalQueue*> FQ_;
+ };
+}
+
+IRequestQueueRef NNeh::CreateRequestQueue() {
+//return new TCondVarRequestQueue();
+//return new TSleepRequestQueue();
+//return new TBusyRequestQueue();
+//return new TLFRequestQueue<TStupidEvent>();
+#if defined(_freebsd_)
+ return new TFdRequestQueue();
+#endif
+ //return new TFdRequestQueue();
+ return new TLazyEventRequestQueue<TAutoEvent>();
+ //return new TEventRequestQueue<TAutoEvent>();
+ //return new TEventRequestQueue<TNehFdEvent>();
+}