aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-04-25 12:10:07 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-04-25 12:24:28 +0300
commit0d4d48be8da7262d2ce16251e75cac692db6fa10 (patch)
tree6404e9e05b305919b844acbcedf2eebac3ff3b1b
parentbae4fe8e1e7bd4c033bb805afb1af43277b785ef (diff)
downloadydb-0d4d48be8da7262d2ce16251e75cac692db6fa10.tar.gz
YT-20013: Introduce use_delivery_fenced_pipe_writer to improve granularity of job interrupt
yt/core: Added DeliveryFencedWriteOperation and corresponding Writer which complete their action only when written data has been read from the pipe (read operation itself may fail, but we only care about pipe emptiness) yt/server: Added UseDeliveryFencedPipeWriter option which forces jobs to use such writer. This makes it so that at any given moment pipe can contain only up to buffer_row_count rows of data and the next batch will be started only after the previous one is consumed. This would allow for a more precise interruptions 8a1ffb00b64ee0caa8b7926b76ba20120b9abedd
-rw-r--r--yt/yt/core/net/connection.cpp192
-rw-r--r--yt/yt/core/net/connection.h6
-rw-r--r--yt/yt/library/process/pipe.cpp6
-rw-r--r--yt/yt/library/process/pipe.h1
-rw-r--r--yt/yt/library/process/unittests/pipes_ut.cpp51
5 files changed, 228 insertions, 28 deletions
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index 525ca28414..29d18fef40 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -14,6 +14,10 @@
#include <errno.h>
+#ifdef _linux_
+ #include <sys/ioctl.h>
+#endif
+
#ifdef _win_
#include <util/network/socket.h>
#include <util/network/pair.h>
@@ -88,6 +92,42 @@ ssize_t WriteToFD(TFileDescriptor fd, const char* buffer, size_t length)
#endif
}
+enum class EPipeReadStatus
+{
+ PipeEmpty,
+ PipeNotEmpty,
+ NotSupportedError,
+};
+
+EPipeReadStatus CheckPipeReadStatus(const TString& pipePath)
+{
+#ifdef _linux_
+ int bytesLeft = 0;
+
+ {
+ int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK;
+ int fd = HandleEintr(::open, pipePath.c_str(), flags);
+
+ int ret = ::ioctl(fd, FIONREAD, &bytesLeft);
+ if (ret == -1 && errno == EINVAL) {
+ // Some linux platforms do not support
+ // FIONREAD call. In such cases we
+ // expect EINVAL error.
+ return EPipeReadStatus::NotSupportedError;
+ }
+
+ SafeClose(fd, /*ignoreBadFD*/ false);
+ }
+
+ return bytesLeft == 0
+ ? EPipeReadStatus::PipeEmpty
+ : EPipeReadStatus::PipeNotEmpty;
+#else
+ Y_UNUSED(pipePath);
+ return EPipeReadStatus::NotSupportedError;
+#endif
+}
+
} // namespace
////////////////////////////////////////////////////////////////////////////////
@@ -286,6 +326,42 @@ private:
////////////////////////////////////////////////////////////////////////////////
+class TDeliveryFencedWriteOperation
+ : public TWriteOperation
+{
+public:
+ TDeliveryFencedWriteOperation(const TSharedRef& buffer, const TString& pipePath)
+ : TWriteOperation(buffer)
+ , PipePath_(pipePath)
+ { }
+
+ TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override
+ {
+ auto result = TWriteOperation::PerformIO(fd);
+ if (IsWriteComplete(result)) {
+ auto pipeReadStatus = CheckPipeReadStatus(PipePath_);
+ if (pipeReadStatus == EPipeReadStatus::NotSupportedError) {
+ return TError("Delivery fenced write failed: FIONDREAD is not supported on your platform")
+ << TError::FromSystem();
+ }
+
+ result.Value().Retry = (pipeReadStatus != EPipeReadStatus::PipeEmpty);
+ }
+
+ return result;
+ }
+
+private:
+ TString PipePath_;
+
+ bool IsWriteComplete(const TErrorOr<TIOResult>& result)
+ {
+ return result.IsOK() && !result.Value().Retry;
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TWriteVOperation
: public IIOOperation
{
@@ -407,9 +483,10 @@ public:
static TFDConnectionImplPtr Create(
TFileDescriptor fd,
const TString& filePath,
- const IPollerPtr& poller)
+ const IPollerPtr& poller,
+ bool useDeliveryFence)
{
- auto impl = New<TFDConnectionImpl>(fd, filePath, poller);
+ auto impl = New<TFDConnectionImpl>(fd, filePath, poller, useDeliveryFence);
impl->Init();
return impl;
}
@@ -541,12 +618,29 @@ public:
TFuture<void> Write(const TSharedRef& data)
{
+ if (UseDeliveryFence_) {
+ return DoDeliveryFencedWrite(data);
+ }
+
+ return DoWrite(data);
+ }
+
+ TFuture<void> DoWrite(const TSharedRef& data)
+ {
auto write = std::make_unique<TWriteOperation>(data);
auto future = write->ToFuture();
StartIO(&WriteDirection_, std::move(write));
return future;
}
+ TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data)
+ {
+ auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_);
+ auto future = syncWrite->ToFuture();
+ StartIO(&WriteDirection_, std::move(syncWrite));
+ return future;
+ }
+
TFuture<void> WriteV(const TSharedRefArray& data)
{
auto writeV = std::make_unique<TWriteVOperation>(data);
@@ -682,14 +776,26 @@ private:
TFileDescriptor FD_ = -1;
const IPollerPtr Poller_;
+ // If set to true via ctor argument
+ // |useDeliveryFence| will use
+ // DeliverFencedWriteOperations
+ // instead of WriteOperations,
+ // which future is set only
+ // after data from pipe has been read.
+ const bool UseDeliveryFence_ = false;
+ const TString PipePath_ = {};
+
TFDConnectionImpl(
TFileDescriptor fd,
const TString& filePath,
- const IPollerPtr& poller)
+ const IPollerPtr& poller,
+ bool useDeliveryFence)
: Name_(Format("File{%v}", filePath))
, FD_(fd)
, Poller_(poller)
+ , UseDeliveryFence_(useDeliveryFence)
+ , PipePath_(filePath)
{ }
TFDConnectionImpl(
@@ -1023,8 +1129,9 @@ public:
TFileDescriptor fd,
const TString& pipePath,
const IPollerPtr& poller,
- TRefCountedPtr pipeHolder = nullptr)
- : Impl_(TFDConnectionImpl::Create(fd, pipePath, poller))
+ TRefCountedPtr pipeHolder = nullptr,
+ bool useDeliveryFence = false)
+ : Impl_(TFDConnectionImpl::Create(fd, pipePath, poller, useDeliveryFence))
, PipeHolder_(std::move(pipeHolder))
{ }
@@ -1148,6 +1255,41 @@ private:
////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+TFileDescriptor CreateWriteFDForConnection(
+ const TString& pipePath,
+ std::optional<int> capacity)
+{
+#ifdef _unix_
+ int flags = O_WRONLY | O_CLOEXEC;
+ int fd = HandleEintr(::open, pipePath.c_str(), flags);
+ if (fd == -1) {
+ THROW_ERROR_EXCEPTION("Failed to open named pipe")
+ << TError::FromSystem()
+ << TErrorAttribute("path", pipePath);
+ }
+
+ try {
+ if (capacity) {
+ SafeSetPipeCapacity(fd, *capacity);
+ }
+
+ SafeMakeNonblocking(fd);
+ } catch (...) {
+ SafeClose(fd, false);
+ throw;
+ }
+ return fd;
+#else
+ THROW_ERROR_EXCEPTION("Unsupported platform");
+#endif
+}
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(const IPollerPtr& poller)
{
SOCKET fds[2];
@@ -1231,29 +1373,25 @@ IConnectionWriterPtr CreateOutputConnectionFromPath(
const TRefCountedPtr& pipeHolder,
std::optional<int> capacity)
{
-#ifdef _unix_
- int flags = O_WRONLY | O_CLOEXEC;
- int fd = HandleEintr(::open, pipePath.c_str(), flags);
- if (fd == -1) {
- THROW_ERROR_EXCEPTION("Failed to open named pipe")
- << TError::FromSystem()
- << TErrorAttribute("path", pipePath);
- }
-
- try {
- if (capacity) {
- SafeSetPipeCapacity(fd, *capacity);
- }
+ return New<TFDConnection>(
+ CreateWriteFDForConnection(pipePath, capacity),
+ pipePath,
+ poller,
+ pipeHolder);
+}
- SafeMakeNonblocking(fd);
- } catch (...) {
- SafeClose(fd, false);
- throw;
- }
- return New<TFDConnection>(fd, pipePath, poller, pipeHolder);
-#else
- THROW_ERROR_EXCEPTION("Unsupported platform");
-#endif
+IConnectionWriterPtr CreateDeliveryFencedOutputConnectionFromPath(
+ const TString& pipePath,
+ const NConcurrency::IPollerPtr& poller,
+ const TRefCountedPtr& pipeHolder,
+ std::optional<int> capacity)
+{
+ return New<TFDConnection>(
+ CreateWriteFDForConnection(pipePath, capacity),
+ pipePath,
+ poller,
+ pipeHolder,
+ /*useDeliveryFence*/ true);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h
index a8f27d4b96..610481689f 100644
--- a/yt/yt/core/net/connection.h
+++ b/yt/yt/core/net/connection.h
@@ -114,6 +114,12 @@ IConnectionWriterPtr CreateOutputConnectionFromPath(
const TRefCountedPtr& pipeHolder,
std::optional<int> capacity = {});
+IConnectionWriterPtr CreateDeliveryFencedOutputConnectionFromPath(
+ const TString& pipePath,
+ const NConcurrency::IPollerPtr& poller,
+ const TRefCountedPtr& pipeHolder,
+ std::optional<int> capacity = {});
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NNet
diff --git a/yt/yt/library/process/pipe.cpp b/yt/yt/library/process/pipe.cpp
index c8092a5a5a..a2efaac44b 100644
--- a/yt/yt/library/process/pipe.cpp
+++ b/yt/yt/library/process/pipe.cpp
@@ -70,6 +70,12 @@ IConnectionWriterPtr TNamedPipe::CreateAsyncWriter()
return CreateOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_);
}
+IConnectionWriterPtr TNamedPipe::CreateDeliveryFencedAsyncWriter()
+{
+ YT_VERIFY(!Path_.empty());
+ return CreateDeliveryFencedOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_);
+}
+
TString TNamedPipe::GetPath() const
{
return Path_;
diff --git a/yt/yt/library/process/pipe.h b/yt/yt/library/process/pipe.h
index 337c4e4dad..7e0b518fc7 100644
--- a/yt/yt/library/process/pipe.h
+++ b/yt/yt/library/process/pipe.h
@@ -20,6 +20,7 @@ public:
NNet::IConnectionReaderPtr CreateAsyncReader();
NNet::IConnectionWriterPtr CreateAsyncWriter();
+ NNet::IConnectionWriterPtr CreateDeliveryFencedAsyncWriter();
TString GetPath() const;
diff --git a/yt/yt/library/process/unittests/pipes_ut.cpp b/yt/yt/library/process/unittests/pipes_ut.cpp
index 4200e38915..af851958a0 100644
--- a/yt/yt/library/process/unittests/pipes_ut.cpp
+++ b/yt/yt/library/process/unittests/pipes_ut.cpp
@@ -21,7 +21,7 @@ using namespace NNet;
#ifndef _win_
-//! NB: You can't set size smaller than that of a page size
+//! NB: You can't set size smaller than that of a page.
constexpr int SmallPipeCapacity = 4096;
TEST(TPipeIOHolder, CanInstantiate)
@@ -176,6 +176,13 @@ protected:
Writer = pipe->CreateAsyncWriter();
}
+ void SetUpWithDeliveryFence()
+ {
+ auto pipe = TNamedPipe::Create("./namedpipewcap", 0660);
+ Reader = pipe->CreateAsyncReader();
+ Writer = pipe->CreateDeliveryFencedAsyncWriter();
+ }
+
IConnectionReaderPtr Reader;
IConnectionWriterPtr Writer;
};
@@ -343,6 +350,48 @@ TEST_F(TNamedPipeReadWriteTest, CapacityDontDiscardSurplus)
EXPECT_TRUE(writeFuture.Get().IsOK());
}
+#ifdef _linux_
+
+TEST_F(TNamedPipeReadWriteTest, SyncWriteJustWorks)
+{
+ SetUpWithDeliveryFence();
+
+ TString text("aabbb");
+ auto writeBuffer = TSharedRef::FromString(text);
+ auto writeFuture = Writer->Write(writeBuffer);
+
+ auto readBuffer = TSharedMutableRef::Allocate(2, {.InitializeStorage = false});
+ auto readResult = Reader->Read(readBuffer).Get();
+ EXPECT_EQ(TString("aa"), TString(readBuffer.Begin(), readResult.Value()));
+
+ EXPECT_FALSE(writeFuture.IsSet());
+
+ readBuffer = TSharedMutableRef::Allocate(10, {.InitializeStorage = false});
+ readResult = Reader->Read(readBuffer).Get();
+ EXPECT_EQ(TString("bbb"), TString(readBuffer.Begin(), readResult.Value()));
+
+ // Future is set only after the entire buffer is read.
+ EXPECT_TRUE(writeFuture.Get().IsOK());
+}
+
+#else
+
+TEST_F(TNamedPipeReadWriteTest, SyncWriteUnsupportedPlatform)
+{
+ SetUpWithDeliveryFence();
+
+ TString text("aabbb");
+ auto writeBuffer = TSharedRef::FromString(text);
+ auto writeFuture = Writer->Write(writeBuffer);
+
+ // Future is set with error because platform is not supported
+ auto error = writeFuture.Get();
+ EXPECT_FALSE(error.IsOK());
+ EXPECT_TRUE(error.GetMessage().Contains("Delivery fenced write failed: FIONDREAD is not supported on your platform"));
+}
+
+#endif
+
////////////////////////////////////////////////////////////////////////////////
class TPipeBigReadWriteTest