diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-12-02 23:17:28 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-12-02 23:34:47 +0300 |
commit | 4dae825cad50be3d76fc9fddd839dd3e83dd823f (patch) | |
tree | 0484968bb7e33624fd5faf8fafc07d0019172cb5 | |
parent | 03b88a6e8e5deadc338e43765c7599ceb21aca3b (diff) | |
download | ydb-4dae825cad50be3d76fc9fddd839dd3e83dd823f.tar.gz |
Use F_SET_PIPE_WAKE_WRITER to fix delivery fenced writers
This code works properly only after
commit_hash:c5500a7dd15c2797897767c0a2d62582975a22a3
-rw-r--r-- | yt/yt/core/misc/proc.cpp | 31 | ||||
-rw-r--r-- | yt/yt/core/misc/proc.h | 3 | ||||
-rw-r--r-- | yt/yt/core/net/connection.cpp | 9 | ||||
-rw-r--r-- | yt/yt/library/process/unittests/pipes_ut.cpp | 20 |
4 files changed, 43 insertions, 20 deletions
diff --git a/yt/yt/core/misc/proc.cpp b/yt/yt/core/misc/proc.cpp index 363549b4bf9..e25dd22cc08 100644 --- a/yt/yt/core/misc/proc.cpp +++ b/yt/yt/core/misc/proc.cpp @@ -42,6 +42,7 @@ #include <unistd.h> #endif #ifdef _linux_ + #include <fcntl.h> #include <pty.h> #include <pwd.h> #include <grp.h> @@ -951,6 +952,36 @@ void SafeSetPipeCapacity(int fd, int capacity) } } +bool TryEnableEmptyPipeEpollEvent(TFileDescriptor fd) +{ +// TODO(arkady-e1ppa): To not waste gpu we swallow an error +// resulting in a potentially broken behavior. +// if F_SET_PIPE_WAKE_WRITER is not defined and/or properly +// implemented we should return false. +#if defined(_linux_) && defined(F_SET_PIPE_WAKE_WRITER) + int res = ::fcntl(fd, F_SET_PIPE_WAKE_WRITER, 1); + + // TODO(arkady-e1ppa): Once kernel version is fresh enough + // remove this branch altogether. + if (res == -1) { + return errno == EINVAL; + } + + return res != -1; +#else + Y_UNUSED(fd); + return true; +#endif +} + +void SafeEnableEmptyPipeEpollEvent(TFileDescriptor fd) +{ + if (!TryEnableEmptyPipeEpollEvent(fd)) { + THROW_ERROR_EXCEPTION("Failed to enable empty pipe epoll event for descriptor %v", fd) + << TError::FromSystem(); + } +} + bool TrySetUid(int uid) { #ifdef _linux_ diff --git a/yt/yt/core/misc/proc.h b/yt/yt/core/misc/proc.h index 36d9ce8bc6a..86d8ad42232 100644 --- a/yt/yt/core/misc/proc.h +++ b/yt/yt/core/misc/proc.h @@ -166,6 +166,9 @@ void SafeMakeNonblocking(TFileDescriptor fd); bool TrySetPipeCapacity(TFileDescriptor fd, int capacity); void SafeSetPipeCapacity(TFileDescriptor fd, int capacity); +bool TryEnableEmptyPipeEpollEvent(TFileDescriptor fd); +void SafeEnableEmptyPipeEpollEvent(TFileDescriptor fd); + bool TrySetUid(int uid); void SafeSetUid(int uid); diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index a8435fbf5a0..458d4e6b1a4 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -1305,7 +1305,8 @@ namespace { TFileDescriptor CreateWriteFDForConnection( const TString& pipePath, - std::optional<int> capacity) + std::optional<int> capacity, + bool useDeliveryFence) { #ifdef _unix_ int flags = O_WRONLY | O_CLOEXEC; @@ -1321,6 +1322,10 @@ TFileDescriptor CreateWriteFDForConnection( SafeSetPipeCapacity(fd, *capacity); } + if (useDeliveryFence) { + SafeEnableEmptyPipeEpollEvent(fd); + } + SafeMakeNonblocking(fd); } catch (...) { SafeClose(fd, false); @@ -1421,7 +1426,7 @@ IConnectionWriterPtr CreateOutputConnectionFromPath( bool useDeliveryFence) { return New<TFDConnection>( - CreateWriteFDForConnection(pipePath, capacity), + CreateWriteFDForConnection(pipePath, capacity, useDeliveryFence), std::move(pipePath), std::move(poller), pipeHolder, diff --git a/yt/yt/library/process/unittests/pipes_ut.cpp b/yt/yt/library/process/unittests/pipes_ut.cpp index b41b7fc5957..6d088d3e3ad 100644 --- a/yt/yt/library/process/unittests/pipes_ut.cpp +++ b/yt/yt/library/process/unittests/pipes_ut.cpp @@ -350,9 +350,9 @@ TEST_F(TNamedPipeReadWriteTest, CapacityDontDiscardSurplus) EXPECT_TRUE(writeFuture.Get().IsOK()); } -#ifdef _linux_ +#if defined(_linux_) && defined(F_SET_PIPE_WAKE_WRITER) -TEST_F(TNamedPipeReadWriteTest, SyncWriteJustWorks) +TEST_F(TNamedPipeReadWriteTest, DeliveryFencedWriteJustWorks) { SetUpWithDeliveryFence(); @@ -374,22 +374,6 @@ TEST_F(TNamedPipeReadWriteTest, SyncWriteJustWorks) EXPECT_TRUE(writeFuture.Get().IsOK()); } -#else - -TEST_F(TNamedPipeReadWriteTest, SyncWriteUnsupportedPlatform) -{ - SetUpWithDeliveryFence(); - - TString text("aabbb"); - auto writeBuffer = TSharedRef::FromString(text); - auto writeFuture = Writer->Write(writeBuffer); - - // Future is set with error because platform is not supported - auto error = writeFuture.Get(); - EXPECT_FALSE(error.IsOK()); - EXPECT_TRUE(error.GetMessage().Contains("Delivery fenced write failed: FIONDREAD is not supported on your platform")); -} - #endif //////////////////////////////////////////////////////////////////////////////// |