aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-06-02 23:33:24 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-06-02 23:33:24 +0300
commit635b01977572199634ab3b322adb33995faa61a6 (patch)
tree89aadd55f0159f99ea0429938f83de2a11f25d7e
parenta4c33bffc209d4f4778dab08227b59b980cbd6f3 (diff)
downloadydb-635b01977572199634ab3b322adb33995faa61a6.tar.gz
Intermediate changes
-rw-r--r--library/cpp/yt/system/handle_eintr-inl.h34
-rw-r--r--library/cpp/yt/system/handle_eintr.h16
-rw-r--r--library/cpp/yt/threading/notification_handle.cpp115
-rw-r--r--library/cpp/yt/threading/notification_handle.h49
4 files changed, 214 insertions, 0 deletions
diff --git a/library/cpp/yt/system/handle_eintr-inl.h b/library/cpp/yt/system/handle_eintr-inl.h
new file mode 100644
index 0000000000..d98c160c6e
--- /dev/null
+++ b/library/cpp/yt/system/handle_eintr-inl.h
@@ -0,0 +1,34 @@
+#ifndef HANDLE_EINTR_INL_H_
+#error "Direct inclusion of this file is not allowed, include handle_eintr.h"
+// For the sake of sane code completion.
+#include "handle_eintr.h"
+#endif
+
+#include <errno.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+#ifndef _win_
+template <class F, class... Args>
+auto HandleEintr(F f, Args&&... args) -> decltype(f(args...))
+{
+ while (true) {
+ auto x = f(args...);
+ if (x != -1 || errno != EINTR) {
+ return x;
+ }
+ }
+}
+#else
+template <class F, class... Args>
+auto HandleEintr(F f, Args&&... args) -> decltype(f(args...))
+{
+ return f(args...);
+}
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/system/handle_eintr.h b/library/cpp/yt/system/handle_eintr.h
new file mode 100644
index 0000000000..4821c21a26
--- /dev/null
+++ b/library/cpp/yt/system/handle_eintr.h
@@ -0,0 +1,16 @@
+#pragma once
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class F, class... Args>
+auto HandleEintr(F f, Args&&... args) -> decltype(f(args...));
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define HANDLE_EINTR_INL_H_
+#include "handle_eintr-inl.h"
+#undef HANDLE_EINTR_INL_H_
diff --git a/library/cpp/yt/threading/notification_handle.cpp b/library/cpp/yt/threading/notification_handle.cpp
new file mode 100644
index 0000000000..0896d8169b
--- /dev/null
+++ b/library/cpp/yt/threading/notification_handle.cpp
@@ -0,0 +1,115 @@
+#include "notification_handle.h"
+
+#include <library/cpp/yt/system/handle_eintr.h>
+
+#include <library/cpp/yt/assert/assert.h>
+
+#ifdef _linux_
+ #include <unistd.h>
+ #include <sys/eventfd.h>
+#endif
+
+#ifdef _darwin_
+ #include <fcntl.h>
+ #include <unistd.h>
+#endif
+
+#ifdef _win_
+ #include <util/network/socket.h>
+#endif
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TNotificationHandle::TNotificationHandle(bool blocking)
+{
+#ifdef _linux_
+ EventFD_ = HandleEintr(
+ eventfd,
+ 0,
+ EFD_CLOEXEC | (blocking ? 0 : EFD_NONBLOCK));
+ YT_VERIFY(EventFD_ >= 0);
+#elif defined(_win_)
+ TPipeHandle::Pipe(Reader_, Writer_, EOpenModeFlag::CloseOnExec);
+ if (!blocking) {
+ SetNonBlock(Reader_);
+ }
+#else
+#ifdef _darwin_
+ YT_VERIFY(HandleEintr(pipe, PipeFDs_) == 0);
+#else
+ YT_VERIFY(HandleEintr(pipe2, PipeFDs_, O_CLOEXEC) == 0);
+#endif
+ if (!blocking) {
+ YT_VERIFY(fcntl(PipeFDs_[0], F_SETFL, O_NONBLOCK) == 0);
+ }
+#endif
+}
+
+TNotificationHandle::~TNotificationHandle()
+{
+#ifdef _linux_
+ YT_VERIFY(HandleEintr(close, EventFD_) == 0);
+#elif !defined(_win_)
+ YT_VERIFY(HandleEintr(close, PipeFDs_[0]) == 0);
+ YT_VERIFY(HandleEintr(close, PipeFDs_[1]) == 0);
+#endif
+}
+
+void TNotificationHandle::Raise()
+{
+#ifdef _linux_
+ uint64_t one = 1;
+ YT_VERIFY(HandleEintr(write, EventFD_, &one, sizeof(one)) == sizeof(one));
+#elif defined(_win_)
+ char c = 'x';
+ YT_VERIFY(Writer_.Write(&c, sizeof(char)) == sizeof(char));
+#else
+ char c = 'x';
+ YT_VERIFY(HandleEintr(write, PipeFDs_[1], &c, sizeof(char)) == sizeof(char));
+#endif
+}
+
+void TNotificationHandle::Clear()
+{
+#ifdef _linux_
+ uint64_t count = 0;
+ auto ret = HandleEintr(read, EventFD_, &count, sizeof(count));
+ // For edge-triggered one could clear multiple events, others get nothing.
+ YT_VERIFY(ret == sizeof(count) || (ret < 0 && errno == EAGAIN));
+#elif defined(_win_)
+ while (true) {
+ char c;
+ auto ret = Reader_.Read(&c, sizeof(c));
+ YT_VERIFY(ret == sizeof(c) || (ret == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK));
+ if (ret == SOCKET_ERROR) {
+ break;
+ }
+ }
+#else
+ while (true) {
+ char c;
+ auto ret = HandleEintr(read, PipeFDs_[0], &c, sizeof(c));
+ YT_VERIFY(ret == sizeof(c) || (ret < 0 && errno == EAGAIN));
+ if (ret < 0) {
+ break;
+ }
+ }
+#endif
+}
+
+int TNotificationHandle::GetFD() const
+{
+#ifdef _linux_
+ return EventFD_;
+#elif defined(_win_)
+ return Reader_;
+#else
+ return PipeFDs_[0];
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading
diff --git a/library/cpp/yt/threading/notification_handle.h b/library/cpp/yt/threading/notification_handle.h
new file mode 100644
index 0000000000..9ee4010a43
--- /dev/null
+++ b/library/cpp/yt/threading/notification_handle.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include "public.h"
+
+#include <util/system/platform.h>
+
+#ifdef _win_
+ #include <util/system/pipe.h>
+#endif
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Provides a handle which can be used to wake up a polling thread.
+/*!
+ * Internally implemented via |eventfd| API (for Linux) or pipes (for all other platforms).
+ */
+class TNotificationHandle
+{
+public:
+ explicit TNotificationHandle(bool blocking = false);
+ ~TNotificationHandle();
+
+ //! Called from an arbitrary thread to wake up the polling thread.
+ //! Multiple wakeups are coalesced.
+ void Raise();
+
+ //! Called from the polling thread to clear all outstanding notification.
+ void Clear();
+
+ //! Returns the pollable handle, which becomes readable when #Raise is invoked.
+ int GetFD() const;
+
+private:
+#ifdef _linux_
+ int EventFD_ = -1;
+#elif defined(_win_)
+ TPipeHandle Reader_;
+ TPipeHandle Writer_;
+#else
+ int PipeFDs_[2] = {-1, -1};
+#endif
+
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading