aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-12-02 23:17:28 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-12-02 23:34:47 +0300
commit4dae825cad50be3d76fc9fddd839dd3e83dd823f (patch)
tree0484968bb7e33624fd5faf8fafc07d0019172cb5
parent03b88a6e8e5deadc338e43765c7599ceb21aca3b (diff)
downloadydb-4dae825cad50be3d76fc9fddd839dd3e83dd823f.tar.gz
Use F_SET_PIPE_WAKE_WRITER to fix delivery fenced writers
This code works properly only after commit_hash:c5500a7dd15c2797897767c0a2d62582975a22a3
-rw-r--r--yt/yt/core/misc/proc.cpp31
-rw-r--r--yt/yt/core/misc/proc.h3
-rw-r--r--yt/yt/core/net/connection.cpp9
-rw-r--r--yt/yt/library/process/unittests/pipes_ut.cpp20
4 files changed, 43 insertions, 20 deletions
diff --git a/yt/yt/core/misc/proc.cpp b/yt/yt/core/misc/proc.cpp
index 363549b4bf9..e25dd22cc08 100644
--- a/yt/yt/core/misc/proc.cpp
+++ b/yt/yt/core/misc/proc.cpp
@@ -42,6 +42,7 @@
#include <unistd.h>
#endif
#ifdef _linux_
+ #include <fcntl.h>
#include <pty.h>
#include <pwd.h>
#include <grp.h>
@@ -951,6 +952,36 @@ void SafeSetPipeCapacity(int fd, int capacity)
}
}
+bool TryEnableEmptyPipeEpollEvent(TFileDescriptor fd)
+{
+// TODO(arkady-e1ppa): To not waste gpu we swallow an error
+// resulting in a potentially broken behavior.
+// if F_SET_PIPE_WAKE_WRITER is not defined and/or properly
+// implemented we should return false.
+#if defined(_linux_) && defined(F_SET_PIPE_WAKE_WRITER)
+ int res = ::fcntl(fd, F_SET_PIPE_WAKE_WRITER, 1);
+
+ // TODO(arkady-e1ppa): Once kernel version is fresh enough
+ // remove this branch altogether.
+ if (res == -1) {
+ return errno == EINVAL;
+ }
+
+ return res != -1;
+#else
+ Y_UNUSED(fd);
+ return true;
+#endif
+}
+
+void SafeEnableEmptyPipeEpollEvent(TFileDescriptor fd)
+{
+ if (!TryEnableEmptyPipeEpollEvent(fd)) {
+ THROW_ERROR_EXCEPTION("Failed to enable empty pipe epoll event for descriptor %v", fd)
+ << TError::FromSystem();
+ }
+}
+
bool TrySetUid(int uid)
{
#ifdef _linux_
diff --git a/yt/yt/core/misc/proc.h b/yt/yt/core/misc/proc.h
index 36d9ce8bc6a..86d8ad42232 100644
--- a/yt/yt/core/misc/proc.h
+++ b/yt/yt/core/misc/proc.h
@@ -166,6 +166,9 @@ void SafeMakeNonblocking(TFileDescriptor fd);
bool TrySetPipeCapacity(TFileDescriptor fd, int capacity);
void SafeSetPipeCapacity(TFileDescriptor fd, int capacity);
+bool TryEnableEmptyPipeEpollEvent(TFileDescriptor fd);
+void SafeEnableEmptyPipeEpollEvent(TFileDescriptor fd);
+
bool TrySetUid(int uid);
void SafeSetUid(int uid);
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index a8435fbf5a0..458d4e6b1a4 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -1305,7 +1305,8 @@ namespace {
TFileDescriptor CreateWriteFDForConnection(
const TString& pipePath,
- std::optional<int> capacity)
+ std::optional<int> capacity,
+ bool useDeliveryFence)
{
#ifdef _unix_
int flags = O_WRONLY | O_CLOEXEC;
@@ -1321,6 +1322,10 @@ TFileDescriptor CreateWriteFDForConnection(
SafeSetPipeCapacity(fd, *capacity);
}
+ if (useDeliveryFence) {
+ SafeEnableEmptyPipeEpollEvent(fd);
+ }
+
SafeMakeNonblocking(fd);
} catch (...) {
SafeClose(fd, false);
@@ -1421,7 +1426,7 @@ IConnectionWriterPtr CreateOutputConnectionFromPath(
bool useDeliveryFence)
{
return New<TFDConnection>(
- CreateWriteFDForConnection(pipePath, capacity),
+ CreateWriteFDForConnection(pipePath, capacity, useDeliveryFence),
std::move(pipePath),
std::move(poller),
pipeHolder,
diff --git a/yt/yt/library/process/unittests/pipes_ut.cpp b/yt/yt/library/process/unittests/pipes_ut.cpp
index b41b7fc5957..6d088d3e3ad 100644
--- a/yt/yt/library/process/unittests/pipes_ut.cpp
+++ b/yt/yt/library/process/unittests/pipes_ut.cpp
@@ -350,9 +350,9 @@ TEST_F(TNamedPipeReadWriteTest, CapacityDontDiscardSurplus)
EXPECT_TRUE(writeFuture.Get().IsOK());
}
-#ifdef _linux_
+#if defined(_linux_) && defined(F_SET_PIPE_WAKE_WRITER)
-TEST_F(TNamedPipeReadWriteTest, SyncWriteJustWorks)
+TEST_F(TNamedPipeReadWriteTest, DeliveryFencedWriteJustWorks)
{
SetUpWithDeliveryFence();
@@ -374,22 +374,6 @@ TEST_F(TNamedPipeReadWriteTest, SyncWriteJustWorks)
EXPECT_TRUE(writeFuture.Get().IsOK());
}
-#else
-
-TEST_F(TNamedPipeReadWriteTest, SyncWriteUnsupportedPlatform)
-{
- SetUpWithDeliveryFence();
-
- TString text("aabbb");
- auto writeBuffer = TSharedRef::FromString(text);
- auto writeFuture = Writer->Write(writeBuffer);
-
- // Future is set with error because platform is not supported
- auto error = writeFuture.Get();
- EXPECT_FALSE(error.IsOK());
- EXPECT_TRUE(error.GetMessage().Contains("Delivery fenced write failed: FIONDREAD is not supported on your platform"));
-}
-
#endif
////////////////////////////////////////////////////////////////////////////////