diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-06-02 23:33:24 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-06-02 23:33:24 +0300 |
commit | 635b01977572199634ab3b322adb33995faa61a6 (patch) | |
tree | 89aadd55f0159f99ea0429938f83de2a11f25d7e | |
parent | a4c33bffc209d4f4778dab08227b59b980cbd6f3 (diff) | |
download | ydb-635b01977572199634ab3b322adb33995faa61a6.tar.gz |
Intermediate changes
-rw-r--r-- | library/cpp/yt/system/handle_eintr-inl.h | 34 | ||||
-rw-r--r-- | library/cpp/yt/system/handle_eintr.h | 16 | ||||
-rw-r--r-- | library/cpp/yt/threading/notification_handle.cpp | 115 | ||||
-rw-r--r-- | library/cpp/yt/threading/notification_handle.h | 49 |
4 files changed, 214 insertions, 0 deletions
diff --git a/library/cpp/yt/system/handle_eintr-inl.h b/library/cpp/yt/system/handle_eintr-inl.h new file mode 100644 index 0000000000..d98c160c6e --- /dev/null +++ b/library/cpp/yt/system/handle_eintr-inl.h @@ -0,0 +1,34 @@ +#ifndef HANDLE_EINTR_INL_H_ +#error "Direct inclusion of this file is not allowed, include handle_eintr.h" +// For the sake of sane code completion. +#include "handle_eintr.h" +#endif + +#include <errno.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +#ifndef _win_ +template <class F, class... Args> +auto HandleEintr(F f, Args&&... args) -> decltype(f(args...)) +{ + while (true) { + auto x = f(args...); + if (x != -1 || errno != EINTR) { + return x; + } + } +} +#else +template <class F, class... Args> +auto HandleEintr(F f, Args&&... args) -> decltype(f(args...)) +{ + return f(args...); +} +#endif + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/system/handle_eintr.h b/library/cpp/yt/system/handle_eintr.h new file mode 100644 index 0000000000..4821c21a26 --- /dev/null +++ b/library/cpp/yt/system/handle_eintr.h @@ -0,0 +1,16 @@ +#pragma once + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class F, class... Args> +auto HandleEintr(F f, Args&&... args) -> decltype(f(args...)); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define HANDLE_EINTR_INL_H_ +#include "handle_eintr-inl.h" +#undef HANDLE_EINTR_INL_H_ diff --git a/library/cpp/yt/threading/notification_handle.cpp b/library/cpp/yt/threading/notification_handle.cpp new file mode 100644 index 0000000000..0896d8169b --- /dev/null +++ b/library/cpp/yt/threading/notification_handle.cpp @@ -0,0 +1,115 @@ +#include "notification_handle.h" + +#include <library/cpp/yt/system/handle_eintr.h> + +#include <library/cpp/yt/assert/assert.h> + +#ifdef _linux_ + #include <unistd.h> + #include <sys/eventfd.h> +#endif + +#ifdef _darwin_ + #include <fcntl.h> + #include <unistd.h> +#endif + +#ifdef _win_ + #include <util/network/socket.h> +#endif + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +TNotificationHandle::TNotificationHandle(bool blocking) +{ +#ifdef _linux_ + EventFD_ = HandleEintr( + eventfd, + 0, + EFD_CLOEXEC | (blocking ? 0 : EFD_NONBLOCK)); + YT_VERIFY(EventFD_ >= 0); +#elif defined(_win_) + TPipeHandle::Pipe(Reader_, Writer_, EOpenModeFlag::CloseOnExec); + if (!blocking) { + SetNonBlock(Reader_); + } +#else +#ifdef _darwin_ + YT_VERIFY(HandleEintr(pipe, PipeFDs_) == 0); +#else + YT_VERIFY(HandleEintr(pipe2, PipeFDs_, O_CLOEXEC) == 0); +#endif + if (!blocking) { + YT_VERIFY(fcntl(PipeFDs_[0], F_SETFL, O_NONBLOCK) == 0); + } +#endif +} + +TNotificationHandle::~TNotificationHandle() +{ +#ifdef _linux_ + YT_VERIFY(HandleEintr(close, EventFD_) == 0); +#elif !defined(_win_) + YT_VERIFY(HandleEintr(close, PipeFDs_[0]) == 0); + YT_VERIFY(HandleEintr(close, PipeFDs_[1]) == 0); +#endif +} + +void TNotificationHandle::Raise() +{ +#ifdef _linux_ + uint64_t one = 1; + YT_VERIFY(HandleEintr(write, EventFD_, &one, sizeof(one)) == sizeof(one)); +#elif defined(_win_) + char c = 'x'; + YT_VERIFY(Writer_.Write(&c, sizeof(char)) == sizeof(char)); +#else + char c = 'x'; + YT_VERIFY(HandleEintr(write, PipeFDs_[1], &c, sizeof(char)) == sizeof(char)); +#endif +} + +void TNotificationHandle::Clear() +{ +#ifdef _linux_ + uint64_t count = 0; + auto ret = HandleEintr(read, EventFD_, &count, sizeof(count)); + // For edge-triggered one could clear multiple events, others get nothing. + YT_VERIFY(ret == sizeof(count) || (ret < 0 && errno == EAGAIN)); +#elif defined(_win_) + while (true) { + char c; + auto ret = Reader_.Read(&c, sizeof(c)); + YT_VERIFY(ret == sizeof(c) || (ret == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK)); + if (ret == SOCKET_ERROR) { + break; + } + } +#else + while (true) { + char c; + auto ret = HandleEintr(read, PipeFDs_[0], &c, sizeof(c)); + YT_VERIFY(ret == sizeof(c) || (ret < 0 && errno == EAGAIN)); + if (ret < 0) { + break; + } + } +#endif +} + +int TNotificationHandle::GetFD() const +{ +#ifdef _linux_ + return EventFD_; +#elif defined(_win_) + return Reader_; +#else + return PipeFDs_[0]; +#endif +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading diff --git a/library/cpp/yt/threading/notification_handle.h b/library/cpp/yt/threading/notification_handle.h new file mode 100644 index 0000000000..9ee4010a43 --- /dev/null +++ b/library/cpp/yt/threading/notification_handle.h @@ -0,0 +1,49 @@ +#pragma once + +#include "public.h" + +#include <util/system/platform.h> + +#ifdef _win_ + #include <util/system/pipe.h> +#endif + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +//! Provides a handle which can be used to wake up a polling thread. +/*! + * Internally implemented via |eventfd| API (for Linux) or pipes (for all other platforms). + */ +class TNotificationHandle +{ +public: + explicit TNotificationHandle(bool blocking = false); + ~TNotificationHandle(); + + //! Called from an arbitrary thread to wake up the polling thread. + //! Multiple wakeups are coalesced. + void Raise(); + + //! Called from the polling thread to clear all outstanding notification. + void Clear(); + + //! Returns the pollable handle, which becomes readable when #Raise is invoked. + int GetFD() const; + +private: +#ifdef _linux_ + int EventFD_ = -1; +#elif defined(_win_) + TPipeHandle Reader_; + TPipeHandle Writer_; +#else + int PipeFDs_[2] = {-1, -1}; +#endif + +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading |