diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-04-25 12:10:07 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-04-25 12:24:28 +0300 |
commit | 0d4d48be8da7262d2ce16251e75cac692db6fa10 (patch) | |
tree | 6404e9e05b305919b844acbcedf2eebac3ff3b1b | |
parent | bae4fe8e1e7bd4c033bb805afb1af43277b785ef (diff) | |
download | ydb-0d4d48be8da7262d2ce16251e75cac692db6fa10.tar.gz |
YT-20013: Introduce use_delivery_fenced_pipe_writer to improve granularity of job interrupt
yt/core: Added DeliveryFencedWriteOperation and corresponding Writer which complete their action only when written data has been read from the pipe (read operation itself may fail, but we only care about pipe emptiness)
yt/server: Added UseDeliveryFencedPipeWriter option which forces jobs to use such writer. This makes it so that at any given moment pipe can contain only up to buffer_row_count rows of data and the next batch will be started only after the previous one is consumed. This would allow for a more precise interruptions
8a1ffb00b64ee0caa8b7926b76ba20120b9abedd
-rw-r--r-- | yt/yt/core/net/connection.cpp | 192 | ||||
-rw-r--r-- | yt/yt/core/net/connection.h | 6 | ||||
-rw-r--r-- | yt/yt/library/process/pipe.cpp | 6 | ||||
-rw-r--r-- | yt/yt/library/process/pipe.h | 1 | ||||
-rw-r--r-- | yt/yt/library/process/unittests/pipes_ut.cpp | 51 |
5 files changed, 228 insertions, 28 deletions
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index 525ca28414..29d18fef40 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -14,6 +14,10 @@ #include <errno.h> +#ifdef _linux_ + #include <sys/ioctl.h> +#endif + #ifdef _win_ #include <util/network/socket.h> #include <util/network/pair.h> @@ -88,6 +92,42 @@ ssize_t WriteToFD(TFileDescriptor fd, const char* buffer, size_t length) #endif } +enum class EPipeReadStatus +{ + PipeEmpty, + PipeNotEmpty, + NotSupportedError, +}; + +EPipeReadStatus CheckPipeReadStatus(const TString& pipePath) +{ +#ifdef _linux_ + int bytesLeft = 0; + + { + int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK; + int fd = HandleEintr(::open, pipePath.c_str(), flags); + + int ret = ::ioctl(fd, FIONREAD, &bytesLeft); + if (ret == -1 && errno == EINVAL) { + // Some linux platforms do not support + // FIONREAD call. In such cases we + // expect EINVAL error. + return EPipeReadStatus::NotSupportedError; + } + + SafeClose(fd, /*ignoreBadFD*/ false); + } + + return bytesLeft == 0 + ? EPipeReadStatus::PipeEmpty + : EPipeReadStatus::PipeNotEmpty; +#else + Y_UNUSED(pipePath); + return EPipeReadStatus::NotSupportedError; +#endif +} + } // namespace //////////////////////////////////////////////////////////////////////////////// @@ -286,6 +326,42 @@ private: //////////////////////////////////////////////////////////////////////////////// +class TDeliveryFencedWriteOperation + : public TWriteOperation +{ +public: + TDeliveryFencedWriteOperation(const TSharedRef& buffer, const TString& pipePath) + : TWriteOperation(buffer) + , PipePath_(pipePath) + { } + + TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override + { + auto result = TWriteOperation::PerformIO(fd); + if (IsWriteComplete(result)) { + auto pipeReadStatus = CheckPipeReadStatus(PipePath_); + if (pipeReadStatus == EPipeReadStatus::NotSupportedError) { + return TError("Delivery fenced write failed: FIONDREAD is not supported on your platform") + << TError::FromSystem(); + } + + result.Value().Retry = (pipeReadStatus != EPipeReadStatus::PipeEmpty); + } + + return result; + } + +private: + TString PipePath_; + + bool IsWriteComplete(const TErrorOr<TIOResult>& result) + { + return result.IsOK() && !result.Value().Retry; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + class TWriteVOperation : public IIOOperation { @@ -407,9 +483,10 @@ public: static TFDConnectionImplPtr Create( TFileDescriptor fd, const TString& filePath, - const IPollerPtr& poller) + const IPollerPtr& poller, + bool useDeliveryFence) { - auto impl = New<TFDConnectionImpl>(fd, filePath, poller); + auto impl = New<TFDConnectionImpl>(fd, filePath, poller, useDeliveryFence); impl->Init(); return impl; } @@ -541,12 +618,29 @@ public: TFuture<void> Write(const TSharedRef& data) { + if (UseDeliveryFence_) { + return DoDeliveryFencedWrite(data); + } + + return DoWrite(data); + } + + TFuture<void> DoWrite(const TSharedRef& data) + { auto write = std::make_unique<TWriteOperation>(data); auto future = write->ToFuture(); StartIO(&WriteDirection_, std::move(write)); return future; } + TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data) + { + auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_); + auto future = syncWrite->ToFuture(); + StartIO(&WriteDirection_, std::move(syncWrite)); + return future; + } + TFuture<void> WriteV(const TSharedRefArray& data) { auto writeV = std::make_unique<TWriteVOperation>(data); @@ -682,14 +776,26 @@ private: TFileDescriptor FD_ = -1; const IPollerPtr Poller_; + // If set to true via ctor argument + // |useDeliveryFence| will use + // DeliverFencedWriteOperations + // instead of WriteOperations, + // which future is set only + // after data from pipe has been read. + const bool UseDeliveryFence_ = false; + const TString PipePath_ = {}; + TFDConnectionImpl( TFileDescriptor fd, const TString& filePath, - const IPollerPtr& poller) + const IPollerPtr& poller, + bool useDeliveryFence) : Name_(Format("File{%v}", filePath)) , FD_(fd) , Poller_(poller) + , UseDeliveryFence_(useDeliveryFence) + , PipePath_(filePath) { } TFDConnectionImpl( @@ -1023,8 +1129,9 @@ public: TFileDescriptor fd, const TString& pipePath, const IPollerPtr& poller, - TRefCountedPtr pipeHolder = nullptr) - : Impl_(TFDConnectionImpl::Create(fd, pipePath, poller)) + TRefCountedPtr pipeHolder = nullptr, + bool useDeliveryFence = false) + : Impl_(TFDConnectionImpl::Create(fd, pipePath, poller, useDeliveryFence)) , PipeHolder_(std::move(pipeHolder)) { } @@ -1148,6 +1255,41 @@ private: //////////////////////////////////////////////////////////////////////////////// +namespace { + +TFileDescriptor CreateWriteFDForConnection( + const TString& pipePath, + std::optional<int> capacity) +{ +#ifdef _unix_ + int flags = O_WRONLY | O_CLOEXEC; + int fd = HandleEintr(::open, pipePath.c_str(), flags); + if (fd == -1) { + THROW_ERROR_EXCEPTION("Failed to open named pipe") + << TError::FromSystem() + << TErrorAttribute("path", pipePath); + } + + try { + if (capacity) { + SafeSetPipeCapacity(fd, *capacity); + } + + SafeMakeNonblocking(fd); + } catch (...) { + SafeClose(fd, false); + throw; + } + return fd; +#else + THROW_ERROR_EXCEPTION("Unsupported platform"); +#endif +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(const IPollerPtr& poller) { SOCKET fds[2]; @@ -1231,29 +1373,25 @@ IConnectionWriterPtr CreateOutputConnectionFromPath( const TRefCountedPtr& pipeHolder, std::optional<int> capacity) { -#ifdef _unix_ - int flags = O_WRONLY | O_CLOEXEC; - int fd = HandleEintr(::open, pipePath.c_str(), flags); - if (fd == -1) { - THROW_ERROR_EXCEPTION("Failed to open named pipe") - << TError::FromSystem() - << TErrorAttribute("path", pipePath); - } - - try { - if (capacity) { - SafeSetPipeCapacity(fd, *capacity); - } + return New<TFDConnection>( + CreateWriteFDForConnection(pipePath, capacity), + pipePath, + poller, + pipeHolder); +} - SafeMakeNonblocking(fd); - } catch (...) { - SafeClose(fd, false); - throw; - } - return New<TFDConnection>(fd, pipePath, poller, pipeHolder); -#else - THROW_ERROR_EXCEPTION("Unsupported platform"); -#endif +IConnectionWriterPtr CreateDeliveryFencedOutputConnectionFromPath( + const TString& pipePath, + const NConcurrency::IPollerPtr& poller, + const TRefCountedPtr& pipeHolder, + std::optional<int> capacity) +{ + return New<TFDConnection>( + CreateWriteFDForConnection(pipePath, capacity), + pipePath, + poller, + pipeHolder, + /*useDeliveryFence*/ true); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h index a8f27d4b96..610481689f 100644 --- a/yt/yt/core/net/connection.h +++ b/yt/yt/core/net/connection.h @@ -114,6 +114,12 @@ IConnectionWriterPtr CreateOutputConnectionFromPath( const TRefCountedPtr& pipeHolder, std::optional<int> capacity = {}); +IConnectionWriterPtr CreateDeliveryFencedOutputConnectionFromPath( + const TString& pipePath, + const NConcurrency::IPollerPtr& poller, + const TRefCountedPtr& pipeHolder, + std::optional<int> capacity = {}); + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NNet diff --git a/yt/yt/library/process/pipe.cpp b/yt/yt/library/process/pipe.cpp index c8092a5a5a..a2efaac44b 100644 --- a/yt/yt/library/process/pipe.cpp +++ b/yt/yt/library/process/pipe.cpp @@ -70,6 +70,12 @@ IConnectionWriterPtr TNamedPipe::CreateAsyncWriter() return CreateOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_); } +IConnectionWriterPtr TNamedPipe::CreateDeliveryFencedAsyncWriter() +{ + YT_VERIFY(!Path_.empty()); + return CreateDeliveryFencedOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_); +} + TString TNamedPipe::GetPath() const { return Path_; diff --git a/yt/yt/library/process/pipe.h b/yt/yt/library/process/pipe.h index 337c4e4dad..7e0b518fc7 100644 --- a/yt/yt/library/process/pipe.h +++ b/yt/yt/library/process/pipe.h @@ -20,6 +20,7 @@ public: NNet::IConnectionReaderPtr CreateAsyncReader(); NNet::IConnectionWriterPtr CreateAsyncWriter(); + NNet::IConnectionWriterPtr CreateDeliveryFencedAsyncWriter(); TString GetPath() const; diff --git a/yt/yt/library/process/unittests/pipes_ut.cpp b/yt/yt/library/process/unittests/pipes_ut.cpp index 4200e38915..af851958a0 100644 --- a/yt/yt/library/process/unittests/pipes_ut.cpp +++ b/yt/yt/library/process/unittests/pipes_ut.cpp @@ -21,7 +21,7 @@ using namespace NNet; #ifndef _win_ -//! NB: You can't set size smaller than that of a page size +//! NB: You can't set size smaller than that of a page. constexpr int SmallPipeCapacity = 4096; TEST(TPipeIOHolder, CanInstantiate) @@ -176,6 +176,13 @@ protected: Writer = pipe->CreateAsyncWriter(); } + void SetUpWithDeliveryFence() + { + auto pipe = TNamedPipe::Create("./namedpipewcap", 0660); + Reader = pipe->CreateAsyncReader(); + Writer = pipe->CreateDeliveryFencedAsyncWriter(); + } + IConnectionReaderPtr Reader; IConnectionWriterPtr Writer; }; @@ -343,6 +350,48 @@ TEST_F(TNamedPipeReadWriteTest, CapacityDontDiscardSurplus) EXPECT_TRUE(writeFuture.Get().IsOK()); } +#ifdef _linux_ + +TEST_F(TNamedPipeReadWriteTest, SyncWriteJustWorks) +{ + SetUpWithDeliveryFence(); + + TString text("aabbb"); + auto writeBuffer = TSharedRef::FromString(text); + auto writeFuture = Writer->Write(writeBuffer); + + auto readBuffer = TSharedMutableRef::Allocate(2, {.InitializeStorage = false}); + auto readResult = Reader->Read(readBuffer).Get(); + EXPECT_EQ(TString("aa"), TString(readBuffer.Begin(), readResult.Value())); + + EXPECT_FALSE(writeFuture.IsSet()); + + readBuffer = TSharedMutableRef::Allocate(10, {.InitializeStorage = false}); + readResult = Reader->Read(readBuffer).Get(); + EXPECT_EQ(TString("bbb"), TString(readBuffer.Begin(), readResult.Value())); + + // Future is set only after the entire buffer is read. + EXPECT_TRUE(writeFuture.Get().IsOK()); +} + +#else + +TEST_F(TNamedPipeReadWriteTest, SyncWriteUnsupportedPlatform) +{ + SetUpWithDeliveryFence(); + + TString text("aabbb"); + auto writeBuffer = TSharedRef::FromString(text); + auto writeFuture = Writer->Write(writeBuffer); + + // Future is set with error because platform is not supported + auto error = writeFuture.Get(); + EXPECT_FALSE(error.IsOK()); + EXPECT_TRUE(error.GetMessage().Contains("Delivery fenced write failed: FIONDREAD is not supported on your platform")); +} + +#endif + //////////////////////////////////////////////////////////////////////////////// class TPipeBigReadWriteTest |