diff options
author | pogorelov <pogorelov@yandex-team.com> | 2025-04-25 14:31:39 +0300 |
---|---|---|
committer | pogorelov <pogorelov@yandex-team.com> | 2025-04-25 15:23:46 +0300 |
commit | b24a00dcb26a76e07659a31d8e81596665547982 (patch) | |
tree | 4c78d7dd86c636a1f2cd1ac5f67c7a1d87068c1f | |
parent | e338fd9998b6fe51e62b62b2602254dff6db4bf3 (diff) | |
download | ydb-b24a00dcb26a76e07659a31d8e81596665547982.tar.gz |
YT-24704: Introduce new delivery fenced connection for pipes
* Changelog entry
Type: feature
Component: controller-agent
Add new delivery fenced connection that works on vanilla linux kernel. It may be used for cpu intensive or gpu jobs to prevent job abort on interruption.
commit_hash:02e0f41ad907dfe1c11defc5439214e31ea12ad5
-rw-r--r-- | yt/yt/core/misc/proc.cpp | 47 | ||||
-rw-r--r-- | yt/yt/core/misc/proc.h | 24 | ||||
-rw-r--r-- | yt/yt/core/net/connection.cpp | 967 | ||||
-rw-r--r-- | yt/yt/core/net/connection.h | 16 | ||||
-rw-r--r-- | yt/yt/core/net/public.h | 16 |
5 files changed, 824 insertions, 246 deletions
diff --git a/yt/yt/core/misc/proc.cpp b/yt/yt/core/misc/proc.cpp index 4f043800fbd..20179f75d2b 100644 --- a/yt/yt/core/misc/proc.cpp +++ b/yt/yt/core/misc/proc.cpp @@ -106,6 +106,53 @@ bool IsSystemError(const TError& error) //////////////////////////////////////////////////////////////////////////////// +TFileDescriptorGuard::TFileDescriptorGuard(TFileDescriptor fd) noexcept + : FD_(fd) +{} + +TFileDescriptorGuard::~TFileDescriptorGuard() +{ + Reset(); +} + +TFileDescriptorGuard::TFileDescriptorGuard(TFileDescriptorGuard&& other) noexcept + : FD_(other.FD_) +{ + other.FD_ = -1; +} + +TFileDescriptorGuard& TFileDescriptorGuard::operator=(TFileDescriptorGuard&& other) noexcept +{ + if (this != &other) { + Reset(); + FD_ = other.FD_; + other.FD_ = -1; + } + return *this; +} + +TFileDescriptor TFileDescriptorGuard::Get() const noexcept +{ + return FD_; +} + +TFileDescriptor TFileDescriptorGuard::Release() noexcept +{ + TFileDescriptor fd = FD_; + FD_ = -1; + return fd; +} + +void TFileDescriptorGuard::Reset() noexcept +{ + if (FD_ != -1) { + YT_VERIFY(TryClose(FD_, false)); + FD_ = -1; + } +} + +//////////////////////////////////////////////////////////////////////////////// + std::optional<int> GetParentPid(int pid) { TFileInput in(Format("/proc/%v/status", pid)); diff --git a/yt/yt/core/misc/proc.h b/yt/yt/core/misc/proc.h index a76fea3c54e..7111ffbdafd 100644 --- a/yt/yt/core/misc/proc.h +++ b/yt/yt/core/misc/proc.h @@ -28,6 +28,30 @@ bool IsSystemError(const TError& error); //////////////////////////////////////////////////////////////////////////////// +class TFileDescriptorGuard +{ +public: + TFileDescriptorGuard(TFileDescriptor fd = -1) noexcept; + + ~TFileDescriptorGuard(); + + TFileDescriptorGuard(const TFileDescriptorGuard&) = delete; + TFileDescriptorGuard& operator = (const TFileDescriptorGuard&) = delete; + + TFileDescriptorGuard(TFileDescriptorGuard&& other) noexcept; + TFileDescriptorGuard& operator = (TFileDescriptorGuard&& other) noexcept; + + TFileDescriptor Get() const noexcept; + + TFileDescriptor Release() noexcept; + void Reset() noexcept; + +private: + TFileDescriptor FD_ = -1; +}; + +//////////////////////////////////////////////////////////////////////////////// + std::vector<int> ListPids(); std::vector<int> GetPidsByUid(int uid = -1); std::vector<int> GetPidsUnderParent(int targetPid); diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index c1d9ae15a8d..5f64072b0c9 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -10,6 +10,8 @@ #include <yt/yt/core/net/socket.h> +#include <library/cpp/yt/memory/non_null_ptr.h> + #include <library/cpp/yt/system/handle_eintr.h> #include <util/network/pollerimpl.h> @@ -18,6 +20,7 @@ #ifdef _linux_ #include <sys/ioctl.h> + #include <sys/signalfd.h> #endif #ifdef _win_ @@ -97,48 +100,205 @@ ssize_t WriteToFD(TFileDescriptor fd, const char* buffer, size_t length) #endif } -TErrorOr<int> CheckPipeBytesLeftToRead(const TString& pipePath) noexcept +TError MakeSystemError(TFormatString<> message) +{ + return TError(message) + << TError::FromSystem(); +} + +[[maybe_unused]] TErrorOr<int> CheckPipeBytesLeftToRead(TFileDescriptor fd) noexcept { #ifdef _linux_ int bytesLeft = 0; - auto makeSystemError = [&] (TFormatString<> message) { - return TError(message) - << TError::FromSystem() - << TErrorAttribute("pipe_path", pipePath); - }; - { - int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK; - int fd = HandleEintr(::open, pipePath.c_str(), flags); - - if (fd == -1) { - return makeSystemError("Failed to open file descriptor"); - } - int ret = ::ioctl(fd, FIONREAD, &bytesLeft); - if (ret == -1) { - return makeSystemError("ioctl failed"); - } - - if (!TryClose(fd, /*ignoreBadFD*/ false)) { - return makeSystemError("Failed to close file descriptor"); + return MakeSystemError("ioctl failed"); } } return bytesLeft; #else - Y_UNUSED(pipePath); + Y_UNUSED(fd); return TError("Unsupported platform"); #endif } +template <class TDerived> +class TWriteConnectionBase + : public virtual IConnectionWriter +{ +public: + TFuture<void> Write(const TSharedRef& data) override + { + return ToDerived()->GetImpl()->Write(data); + } + + TFuture<void> Close() override + { + return ToDerived()->GetImpl()->Close(); + } + + TFuture<void> WriteV(const TSharedRefArray& data) override + { + return ToDerived()->GetImpl()->WriteV(data); + } + + TFuture<void> CloseWrite() override + { + return ToDerived()->GetImpl()->CloseWrite(); + } + + TFuture<void> Abort() override + { + return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted")); + } + + int GetHandle() const override + { + return ToDerived()->GetImpl()->GetHandle(); + } + + i64 GetWriteByteCount() const override + { + return ToDerived()->GetImpl()->GetWriteByteCount(); + } + + void SetWriteDeadline(std::optional<TInstant> deadline) override + { + ToDerived()->GetImpl()->SetWriteDeadline(deadline); + } + + TConnectionStatistics GetWriteStatistics() const override + { + return ToDerived()->GetImpl()->GetWriteStatistics(); + } + +protected: + TDerived* ToDerived() + { + return static_cast<TDerived*>(this); + } + + const TDerived* ToDerived() const + { + return static_cast<const TDerived*>(this); + } +}; + +template <class TDerived> +class TReadConnectionBase + : public virtual IConnectionReader +{ +public: + TFuture<size_t> Read(const TSharedMutableRef& data) override + { + return ToDerived()->GetImpl()->Read(data); + } + + TFuture<void> CloseRead() override + { + return ToDerived()->GetImpl()->CloseRead(); + } + + TFuture<void> Abort() override + { + return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted")); + } + + int GetHandle() const override + { + return ToDerived()->GetImpl()->GetHandle(); + } + + i64 GetReadByteCount() const override + { + return ToDerived()->GetImpl()->GetReadByteCount(); + } + + void SetReadDeadline(std::optional<TInstant> deadline) override + { + ToDerived()->GetImpl()->SetReadDeadline(deadline); + } + + TConnectionStatistics GetReadStatistics() const override + { + return ToDerived()->GetImpl()->GetReadStatistics(); + } +protected: + TDerived* ToDerived() + { + return static_cast<TDerived*>(this); + } + + const TDerived* ToDerived() const + { + return static_cast<const TDerived*>(this); + } +}; + +template <class TDerived> +class TReadWriteConnectionBase + : public TWriteConnectionBase<TDerived> + , public TReadConnectionBase<TDerived> + , public IConnection +{ + using TWriteConnectionBase<TDerived>::ToDerived; + +public: + TConnectionId GetId() const override + { + return ToDerived()->GetImpl()->GetId(); + } + + const TNetworkAddress& GetLocalAddress() const override + { + return ToDerived()->GetImpl()->GetLocalAddress(); + } + + const TNetworkAddress& GetRemoteAddress() const override + { + return ToDerived()->GetImpl()->GetRemoteAddress(); + } + + bool IsIdle() const override + { + return ToDerived()->GetImpl()->IsIdle(); + } + + bool IsReusable() const override + { + return ToDerived()->GetImpl()->IsReusable(); + } + + bool SetNoDelay() override + { + return ToDerived()->GetImpl()->SetNoDelay(); + } + + bool SetKeepAlive() override + { + return ToDerived()->GetImpl()->SetKeepAlive(); + } + + TFuture<void> Abort() override + { + return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted")); + } + + void SubscribePeerDisconnect(TCallback<void()> cb) override + { + return ToDerived()->GetImpl()->SubscribePeerDisconnect(std::move(cb)); + } +}; + } // namespace //////////////////////////////////////////////////////////////////////////////// DECLARE_REFCOUNTED_CLASS(TFDConnectionImpl) +DECLARE_REFCOUNTED_CLASS(TDeliveryFencedWriteConnectionImpl) //////////////////////////////////////////////////////////////////////////////// @@ -182,8 +342,7 @@ public: return TIOResult{.Retry = Position_ == 0, .ByteCount = bytesRead}; } - return TError("Read failed") - << TError::FromSystem(); + return MakeSystemError("Read failed"); } if (size == 0) { break; @@ -243,8 +402,7 @@ public: return TIOResult{.Retry = true, .ByteCount = 0}; } - return TError("Read failed") - << TError::FromSystem(); + return MakeSystemError("Read failed"); } Position_ += size; @@ -297,8 +455,7 @@ public: if (GetLastNetworkError() == EWOULDBLOCK) { return TIOResult{.Retry = true, .ByteCount = bytesWritten}; } - return TError("Write failed") - << TError::FromSystem(); + return MakeSystemError("Write failed"); } YT_VERIFY(size > 0); @@ -323,6 +480,12 @@ public: return ResultPromise_.ToFuture(); } +protected: + bool IsWriteComplete(const TErrorOr<TIOResult>& result) + { + return result.IsOK() && !result.Value().Retry; + } + private: const TSharedRef Buffer_; const TPromise<void> ResultPromise_ = NewPromise<void>(); @@ -332,11 +495,12 @@ private: //////////////////////////////////////////////////////////////////////////////// -class TDeliveryFencedWriteOperation +#ifdef _linux_ +class TDeliveryFencedWriteOperationOld : public TWriteOperation { public: - TDeliveryFencedWriteOperation(TSharedRef buffer, TString pipePath) + TDeliveryFencedWriteOperationOld(TSharedRef buffer, std::string pipePath) : TWriteOperation(std::move(buffer)) , PipePath_(std::move(pipePath)) { } @@ -345,12 +509,19 @@ public: { auto result = TWriteOperation::PerformIO(fd); if (IsWriteComplete(result)) { - auto bytesLeftOrError = CheckPipeBytesLeftToRead(PipePath_); + int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK; + int fd = HandleEintr(::open, PipePath_.data(), flags); + if (fd == -1) { + return MakeSystemError("Failed to open file descriptor"); + } + auto bytesLeftOrError = CheckPipeBytesLeftToRead(fd); + + YT_VERIFY(TryClose(fd, /*ignoreBadFD*/ false)); if (!bytesLeftOrError.IsOK()) { YT_LOG_ERROR(bytesLeftOrError, "Delivery fenced write failed"); - return std::move(bytesLeftOrError).Wrap(); + return bytesLeftOrError; } else { YT_LOG_DEBUG("Delivery fenced write pipe check finished (BytesLeft: %v)", bytesLeftOrError.Value()); } @@ -364,14 +535,91 @@ public: } private: - const TString PipePath_; + const std::string PipePath_; +}; - bool IsWriteComplete(const TErrorOr<TIOResult>& result) +class TDeliveryFencedWriteOperation + : public TWriteOperation +{ +public: + TDeliveryFencedWriteOperation(TSharedRef buffer, TFileDescriptor writeFd, TFileDescriptor readFd) + : TWriteOperation(std::move(buffer)) + , WriteFD_(writeFd) + , ReadFD_(readFd) + { } + + TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override { - return result.IsOK() && !result.Value().Retry; + if (!std::exchange(FirstWritingAttempt_, false)) { + auto errorOrIsSignalConsumed = ConsumeSignalFdEventsAndCheckItContainsOurFD(fd); + + if (!errorOrIsSignalConsumed.IsOK()) { + return errorOrIsSignalConsumed; + } + + if (!errorOrIsSignalConsumed.Value()) { + return TIOResult{ + .Retry = true, + .ByteCount = 0, + }; + } + } + + auto result = TWriteOperation::PerformIO(WriteFD_); + if (IsWriteComplete(result)) { + auto bytesLeftOrError = CheckPipeBytesLeftToRead(ReadFD_); + + if (!bytesLeftOrError.IsOK()) { + YT_LOG_ERROR(bytesLeftOrError, "Delivery fenced write failed"); + return bytesLeftOrError; + } else { + YT_LOG_DEBUG("Delivery fenced write pipe check finished (BytesLeft: %v)", bytesLeftOrError.Value()); + } + + result.Value().Retry = (bytesLeftOrError.Value() != 0); + } else { + YT_LOG_DEBUG("Delivery fenced write to pipe step finished (Result: %v)", result); + } + + return result; + } + +private: + const TFileDescriptor WriteFD_; + const TFileDescriptor ReadFD_; + + bool FirstWritingAttempt_ = true; + + TErrorOr<bool> ConsumeSignalFdEventsAndCheckItContainsOurFD(TFileDescriptor fd) + { + bool signalForWriteFDConsumed = false; + for (; ;) { + struct signalfd_siginfo fdsi; + ssize_t s = HandleEintr(::read, fd, &fdsi, sizeof(fdsi)); + if (s <= 0) { + break; + } + if (s != sizeof(fdsi)) { + return MakeSystemError("Invalid signalfd_siginfo size"); + } + + if (fdsi.ssi_signo != static_cast<uint32_t>(DeliveryFencedWriteSignal)) { + continue; + } + + if (fdsi.ssi_fd != WriteFD_) { + continue; + } + + signalForWriteFDConsumed = true; + } + + return signalForWriteFDConsumed; } }; +#endif // _linux_ + //////////////////////////////////////////////////////////////////////////////// class TWriteVOperation @@ -407,8 +655,7 @@ public: return TIOResult{.Retry = true, .ByteCount = bytesWritten}; } - return TError("Write failed") - << TError::FromSystem(); + return MakeSystemError("Write failed"); } YT_VERIFY(size > 0); @@ -460,8 +707,7 @@ public: { int res = HandleEintr(::shutdown, fd, ShutdownRead_ ? SHUT_RD : SHUT_WR); if (res == -1) { - return TError("Shutdown failed") - << TError::FromSystem(); + return MakeSystemError("Shutdown failed"); } return TIOResult{.Retry = false, .ByteCount = 0}; } @@ -488,17 +734,33 @@ private: //////////////////////////////////////////////////////////////////////////////// +// TODO(pogorelov): Make separate clases for pipe and socket connections. class TFDConnectionImpl : public TPollableBase { + struct TIODirection; public: static TFDConnectionImplPtr Create( TFileDescriptor fd, - TString filePath, IPollerPtr poller, + std::string filePath, + // COMPAT(pogorelov) bool useDeliveryFence) { - auto impl = New<TFDConnectionImpl>(fd, std::move(filePath), std::move(poller), useDeliveryFence); + #ifndef _linux_ + THROW_ERROR_EXCEPTION_IF(useDeliveryFence, "Delivery fenced write is not supported on this platform"); + #endif // _linux_ + auto epollControl = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered; + auto readEpollControl = EPollControl::Read; + auto writeEpollControl = EPollControl::Write; + auto impl = New<TFDConnectionImpl>( + fd, + epollControl, + readEpollControl, + writeEpollControl, + std::move(poller), + std::move(filePath), + useDeliveryFence); impl->Init(); return impl; } @@ -509,8 +771,19 @@ public: const TNetworkAddress& remoteAddress, IPollerPtr poller) { - auto impl = New<TFDConnectionImpl>(fd, localAddress, remoteAddress, std::move(poller)); + auto epollControl = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered | EPollControl::ReadHup; + auto readEpollControl = EPollControl::Read; + auto writeEpollControl = EPollControl::Write; + auto impl = New<TFDConnectionImpl>( + fd, + epollControl, + readEpollControl, + writeEpollControl, + localAddress, + remoteAddress, + std::move(poller)); impl->Init(); + return impl; } @@ -521,8 +794,8 @@ public: void OnEvent(EPollControl control) override { - DoIO(&WriteDirection_, Any(control & EPollControl::Write)); - DoIO(&ReadDirection_, Any(control & EPollControl::Read)); + DoIO(GetPtr(WriteDirection_), Any(control & WriteEpollControl_)); + DoIO(GetPtr(ReadDirection_), Any(control & ReadEpollControl_)); if (Any(control & EPollControl::ReadHup)) { OnPeerDisconnected(); @@ -566,7 +839,7 @@ public: } Poller_->Unarm(FD_, this); - YT_VERIFY(TryClose(FD_, false)); + YT_VERIFY(TryClose(FD_, /*ignoreBadFD*/ false)); FD_ = -1; OnPeerDisconnected(); @@ -580,7 +853,7 @@ public: { auto read = std::make_unique<TReadOperation>(data); auto future = read->ToFuture(); - StartIO(&ReadDirection_, std::move(read)); + StartIO(GetPtr(ReadDirection_), std::move(read)); return future; } @@ -588,7 +861,7 @@ public: { auto receive = std::make_unique<TReceiveFromOperation>(buffer); auto future = receive->ToFuture(); - StartIO(&ReadDirection_, std::move(receive)); + StartIO(GetPtr(ReadDirection_), std::move(receive)); return future; } @@ -604,8 +877,7 @@ public: address.GetSockAddr(), address.GetLength()); if (res == -1) { - THROW_ERROR_EXCEPTION("Write failed") - << TError::FromSystem(); + THROW_ERROR_EXCEPTION(MakeSystemError("Write failed")); } } @@ -623,18 +895,34 @@ public: TFuture<void> Write(const TSharedRef& data) { - if (UseDeliveryFence_) { - return DoDeliveryFencedWrite(data); - } + #ifdef _linux_ + auto writeOperation = UseDeliveryFence_ + ? std::make_unique<TDeliveryFencedWriteOperationOld>(data, PipePath_) + : std::make_unique<TWriteOperation>(data); + #else // _linux_ + auto writeOperation = std::make_unique<TWriteOperation>(data); + YT_VERIFY(!UseDeliveryFence_); + #endif // _linux_ + + auto future = writeOperation->ToFuture(); + + DoWrite(std::move(writeOperation)); + + return future; + } - return DoWrite(data); + // forcefullyConsiderPending used to perform first io without waiting event on epoll. + // See TDeliveryFencedWriteConnection. + void DoWrite(std::unique_ptr<IIOOperation> operation, bool forcefullyConsiderPending = false) + { + StartIO(GetPtr(WriteDirection_), std::move(operation), forcefullyConsiderPending); } TFuture<void> WriteV(const TSharedRefArray& data) { auto writeV = std::make_unique<TWriteVOperation>(data); auto future = writeV->ToFuture(); - StartIO(&WriteDirection_, std::move(writeV)); + StartIO(GetPtr(WriteDirection_), std::move(writeV)); return future; } @@ -661,17 +949,17 @@ public: return IsIdle(); } - TFuture<void> Abort(const TError& error) + TFuture<void> Abort(TError error) { YT_LOG_DEBUG(error, "Aborting connection"); - return AbortIO(error); + return AbortIO(std::move(error)); } TFuture<void> CloseRead() { auto shutdownRead = std::make_unique<TShutdownOperation>(true); auto future = shutdownRead->ToFuture(); - StartIO(&ReadDirection_, std::move(shutdownRead)); + StartIO(GetPtr(ReadDirection_), std::move(shutdownRead)); return future; } @@ -679,7 +967,7 @@ public: { auto shutdownWrite = std::make_unique<TShutdownOperation>(false); auto future = shutdownWrite->ToFuture(); - StartIO(&WriteDirection_, std::move(shutdownWrite)); + StartIO(GetPtr(WriteDirection_), std::move(shutdownWrite)); return future; } @@ -760,35 +1048,30 @@ public: PeerDisconnectedList_.Subscribe(std::move(callback)); } -private: +protected: const TConnectionId Id_ = TConnectionId::Create(); - const TString Endpoint_; + const std::string Endpoint_; const std::string LoggingTag_; const NLogging::TLogger Logger; - const TNetworkAddress LocalAddress_; - const TNetworkAddress RemoteAddress_; TFileDescriptor FD_ = -1; - const IPollerPtr Poller_; - - // If set to true via ctor argument - // |useDeliveryFence| will use - // DeliverFencedWriteOperations - // instead of WriteOperations, - // which future is set only - // after data from pipe has been read. - const bool UseDeliveryFence_ = false; - const TString PipePath_; - + int SynchronousIOCount_ = 0; TFDConnectionImpl( TFileDescriptor fd, - TString filePath, + EPollControl FDEpollControl, + EPollControl readEpollControl, + EPollControl writeEpollControl, IPollerPtr poller, + std::string filePath, + // COMPAT(pogorelov) bool useDeliveryFence) : Endpoint_(Format("File{%v}", filePath)) , LoggingTag_(MakeLoggingTag(Id_, Endpoint_)) , Logger(NetLogger().WithRawTag(LoggingTag_)) , FD_(fd) + , FDEpollControl_(FDEpollControl) + , ReadEpollControl_(readEpollControl) + , WriteEpollControl_(writeEpollControl) , Poller_(std::move(poller)) , UseDeliveryFence_(useDeliveryFence) , PipePath_(std::move(filePath)) @@ -796,15 +1079,21 @@ private: TFDConnectionImpl( TFileDescriptor fd, + EPollControl epollControl, + EPollControl readEpollControl, + EPollControl writeEpollControl, const TNetworkAddress& localAddress, const TNetworkAddress& remoteAddress, IPollerPtr poller) : Endpoint_(Format("FD{%v<->%v}", localAddress, remoteAddress)) , LoggingTag_(MakeLoggingTag(Id_, Endpoint_)) , Logger(NetLogger().WithRawTag(LoggingTag_)) + , FD_(fd) + , FDEpollControl_(epollControl) + , ReadEpollControl_(readEpollControl) + , WriteEpollControl_(writeEpollControl) , LocalAddress_(localAddress) , RemoteAddress_(remoteAddress) - , FD_(fd) , Poller_(std::move(poller)) { } @@ -813,6 +1102,34 @@ private: YT_LOG_DEBUG("Connection destroyed"); } + void Arm(EPollControl additionalFlags = {}) + { + Poller_->Arm(FD_, this, FDEpollControl_ | additionalFlags); + } + + bool TryRegister() + { + return Poller_->TryRegister(MakeStrong(this)); + } + +private: + const EPollControl FDEpollControl_; + const EPollControl ReadEpollControl_; + const EPollControl WriteEpollControl_; + const TNetworkAddress LocalAddress_; + const TNetworkAddress RemoteAddress_; + const IPollerPtr Poller_; + + // COMPAT(pogorelov) + // If set to true via ctor argument + // |useDeliveryFence| will use + // DeliverFencedWriteOperations + // instead of WriteOperations, + // which future is set only + // after data from pipe has been read. + const bool UseDeliveryFence_ = false; + const std::string PipePath_; + DECLARE_NEW_FRIEND() class TSynchronousIOGuard @@ -905,7 +1222,6 @@ private: TIODirection ReadDirection_{EDirection::Read}; TIODirection WriteDirection_{EDirection::Write}; bool ShutdownRequested_ = false; - int SynchronousIOCount_ = 0; TError WriteError_; TError ReadError_; const TPromise<void> ShutdownPromise_ = NewPromise<void>(); @@ -918,36 +1234,21 @@ private: TDelayedExecutorCookie ReadTimeoutCookie_; TDelayedExecutorCookie WriteTimeoutCookie_; - static std::string MakeLoggingTag(TConnectionId id, const TString& endpoint) + static std::string MakeLoggingTag(TConnectionId id, const std::string& endpoint) { - return Format("ConnectionId: %v, Endpoint: %v", + return Format( + "ConnectionId: %v, Endpoint: %v", id, endpoint); } - TError AnnotateError(const TError& error) const + TError AnnotateError(TError error) const { - return error + return std::move(error) << TErrorAttribute("connection_id", Id_) << TErrorAttribute("connection_endpoint", Endpoint_); } - TFuture<void> DoWrite(const TSharedRef& data) - { - auto write = std::make_unique<TWriteOperation>(data); - auto future = write->ToFuture(); - StartIO(&WriteDirection_, std::move(write)); - return future; - } - - TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data) - { - auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_); - auto future = syncWrite->ToFuture(); - StartIO(&WriteDirection_, std::move(syncWrite)); - return future; - } - void Init() { YT_LOG_DEBUG("Connection created"); @@ -955,7 +1256,7 @@ private: AbortFromReadTimeout_ = BIND(&TFDConnectionImpl::AbortFromReadTimeout, MakeWeak(this)); AbortFromWriteTimeout_ = BIND(&TFDConnectionImpl::AbortFromWriteTimeout, MakeWeak(this)); - if (!Poller_->TryRegister(this)) { + if (!TryRegister()) { ReadError_ = WriteError_ = AnnotateError(TError("Cannot register connection pollable")); return; } @@ -963,12 +1264,6 @@ private: Arm(); } - void Arm(EPollControl additionalFlags = {}) - { - auto control = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered | EPollControl::ReadHup; - Poller_->Arm(FD_, this, control | additionalFlags); - } - TError GetCurrentError(EDirection direction) { switch (direction) { @@ -984,7 +1279,7 @@ private: } } - void StartIO(TIODirection* direction, std::unique_ptr<IIOOperation> operation) + void StartIO(TNonNullPtr<TIODirection> direction, std::unique_ptr<IIOOperation> operation, bool forcefullyConsiderPending = false) { TError error; bool needRetry = false; @@ -1001,6 +1296,9 @@ private: YT_VERIFY(!direction->Running); direction->Operation = std::move(operation); direction->StartBusyTimer(); + + direction->Pending |= forcefullyConsiderPending; + // Start operation only if this direction already has pending // event otherwise reading from FIFO before opening by writer // will return EOF immediately. @@ -1009,7 +1307,7 @@ private: } if (!error.IsOK()) { - operation->Abort(error); + operation->Abort(std::move(error)); return; } @@ -1018,7 +1316,7 @@ private: } } - void DoIO(TIODirection* direction, bool event) + void DoIO(TNonNullPtr<TIODirection> direction, bool event) { { auto guard = Guard(Lock_); @@ -1045,7 +1343,7 @@ private: if (result.IsOK()) { direction->BytesTransferred += result.Value().ByteCount; } else { - result = AnnotateError(result); + result = AnnotateError(std::move(result)); } bool needUnregister = false; @@ -1102,7 +1400,7 @@ private: } if (!result.IsOK()) { - operation->Abort(result); + operation->Abort(std::move(result)); } else if (!result.Value().Retry) { operation->SetResult(); } else if (needRetry) { @@ -1114,12 +1412,13 @@ private: } } - TFuture<void> AbortIO(const TError& error) + TFuture<void> AbortIO(TError error) { + auto annotatedError = AnnotateError(std::move(error)); + auto guard = Guard(Lock_); // In case of read errors we have called Unarm and Unregister already. bool needUnarmAndUnregister = ReadError_.IsOK(); - auto annotatedError = AnnotateError(error); if (WriteError_.IsOK()) { WriteError_ = annotatedError; } @@ -1154,184 +1453,359 @@ private: DEFINE_REFCOUNTED_TYPE(TFDConnectionImpl) -//////////////////////////////////////////////////////////////////////////////// - -// The sole purpose of this class is to call Abort on Impl in dtor. -class TFDConnection - : public IConnection +#ifdef _linux_ +class TDeliveryFencedWriteConnectionImpl + : public TFDConnectionImpl { public: - TFDConnection( - TFileDescriptor fd, - TString pipePath, + static TDeliveryFencedWriteConnectionImplPtr Create( IPollerPtr poller, - TRefCountedPtr pipeHolder = nullptr, - bool useDeliveryFence = false) - : Impl_(TFDConnectionImpl::Create(fd, std::move(pipePath), std::move(poller), useDeliveryFence)) - , PipeHolder_(std::move(pipeHolder)) - { } - - TFDConnection( - TFileDescriptor fd, - const TNetworkAddress& localAddress, - const TNetworkAddress& remoteAddress, - IPollerPtr poller) - : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, std::move(poller))) - { } - - ~TFDConnection() + std::string pipePath, + std::optional<int> capacity) { - YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned"))); - } + TFileDescriptorGuard signalFD = CreateSignalFD(); + TDeliveryFencedWriteConnectionImplPtr impl; + impl = New<TDeliveryFencedWriteConnectionImpl>(std::move(poller), std::move(pipePath), signalFD.Get(), capacity); + impl->Init(); - TConnectionId GetId() const override - { - return Impl_->GetId(); + signalFD.Release(); + return impl; } - const TNetworkAddress& GetLocalAddress() const override + TFuture<void> Write(const TSharedRef& data) { - return Impl_->GetLocalAddress(); - } + auto writeOperation = std::make_unique<TDeliveryFencedWriteOperation>(data, WriteFD_, ReadFD_); - const TNetworkAddress& GetRemoteAddress() const override - { - return Impl_->GetRemoteAddress(); - } + auto future = writeOperation->ToFuture(); - int GetHandle() const override - { - return Impl_->GetHandle(); - } + DoWrite(std::move(writeOperation), /*forcefullyConsiderPending*/ true); - TFuture<size_t> Read(const TSharedMutableRef& data) override - { - return Impl_->Read(data); + return future; } - TFuture<void> Write(const TSharedRef& data) override - { - return Impl_->Write(data); - } +private: + const std::string PipePath_; + const std::optional<int> PipeCapacity_; - TFuture<void> WriteV(const TSharedRefArray& data) override - { - return Impl_->WriteV(data); - } + TFileDescriptor WriteFD_ = -1; + TFileDescriptor ReadFD_ = -1; - TFuture<void> Close() override + TDeliveryFencedWriteConnectionImpl( + IPollerPtr poller, + std::string pipePath, + TFileDescriptor signalFD, + std::optional<int> capacity) + : TFDConnectionImpl( + signalFD, + /*FDEpollControl*/ EPollControl::Read | EPollControl::EdgeTriggered, + /*readEpollControll*/ EPollControl::None, + // Yes, we read to write :) + /*writeEpollControl*/ EPollControl::Read, + std::move(poller), + pipePath, + // NB(pogorelov): DeliveryFence is old compat logic, so we turn it off here. + /*useDeliveryFence*/ false) + , PipePath_(std::move(pipePath)) + , PipeCapacity_(capacity) { - return Impl_->Close(); + YT_LOG_DEBUG("Delivery fenced connection created"); } - bool IsIdle() const override + ~TDeliveryFencedWriteConnectionImpl() { - return Impl_->IsIdle(); + YT_LOG_DEBUG("Delivery fenced connection destroyed"); } - bool IsReusable() const override + DECLARE_NEW_FRIEND(); + + static TFileDescriptor CreateSignalFD() { - return Impl_->IsReusable(); + sigset_t mask; + if (sigemptyset(&mask) == -1) { + ThrowError("empty sig set"); + } + if (sigaddset(&mask, DeliveryFencedWriteSignal) == -1) { + ThrowError(Format("add %v RT signal to sig set", DeliveryFencedWriteSignal - SIGRTMIN)); + } + + auto fd = HandleEintr(::signalfd, -1, &mask, SFD_NONBLOCK | SFD_CLOEXEC); + if (fd == -1) { + ThrowError("open signalfd"); + } + + return fd; } - TFuture<void> Abort() override + void Init() { - return Impl_->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted")); + TFileDescriptorGuard readFdGuard = HandleEintr(::open, PipePath_.data(), O_RDONLY | O_CLOEXEC | O_NONBLOCK); + if (readFdGuard.Get() == -1) { + ThrowError("open pipe for reading"); + } + + TFileDescriptorGuard writeFdGuard = HandleEintr(::open, PipePath_.data(), O_WRONLY | O_CLOEXEC); + if (writeFdGuard.Get() == -1) { + ThrowError("open pipe for writing"); + } + + auto flags = fcntl(writeFdGuard.Get(), F_GETFL); + if (flags == -1) { + ThrowError("get pipe writing fd flags"); + } + if (fcntl(writeFdGuard.Get(), F_SETFL, flags | O_NONBLOCK | FASYNC) == -1) { + ThrowError("set pipe writing fd flags"); + } + if (fcntl(writeFdGuard.Get(), F_SETOWN, getpid()) == -1) { + ThrowError("set pipe owner"); + } + if (fcntl(writeFdGuard.Get(), F_SETSIG, DeliveryFencedWriteSignal) == -1) { + ThrowError("set custom pipe signal"); + } + if (PipeCapacity_) { + SafeSetPipeCapacity(writeFdGuard.Get(), *PipeCapacity_); + } + + if (!TryRegister()) { + ThrowError("register connection in poller"); + } + + try { + Arm(); + } catch (const std::exception& ex) { + ThrowError("arm connection", ex); + } catch (...) { + ThrowError("arm connection"); + } + + YT_LOG_DEBUG("Delivery fenced connection initialized"); + + ReadFD_ = readFdGuard.Release(); + WriteFD_ = writeFdGuard.Release(); } - TFuture<void> CloseRead() override + void OnShutdown() final { - return Impl_->CloseRead(); + TFDConnectionImpl::OnShutdown(); + + YT_VERIFY(SynchronousIOCount_ == 0); + + YT_VERIFY(TryClose(WriteFD_, /*ignoreBadFD*/ false)); + YT_VERIFY(TryClose(ReadFD_, /*ignoreBadFD*/ false)); } - TFuture<void> CloseWrite() override + [[noreturn]] static void ThrowError(std::string_view action, TError innerError = TError()) { - return Impl_->CloseWrite(); + auto error = TError("Failed to %v for delivery fenced connection", action); + if (!innerError.IsOK()) { + innerError <<= std::move(innerError); + } else { + error <<= TError::FromSystem(); + } + THROW_ERROR(std::move(error)); } +}; - i64 GetReadByteCount() const override +DEFINE_REFCOUNTED_TYPE(TDeliveryFencedWriteConnectionImpl) + +#endif // _linux_ + +//////////////////////////////////////////////////////////////////////////////// + +// TODO(pogorelov): Make separate clases for pipe and socket connections. +// The sole purpose of this class is to call Abort on Impl in dtor. +// Since object of TFDConnection is created, you should not care about fd. +// But in case of exception you should close fd by yourself. +class TFDConnection + : public TReadWriteConnectionBase<TFDConnection> +{ +public: + TFDConnection( + TFileDescriptor fd, + IPollerPtr poller, + TRefCountedPtr pipeHolder, + std::string pipePath = "", + // COMPAT(pogorelov) + bool useDeliveryFence = false) + : Impl_(TFDConnectionImpl::Create(fd, std::move(poller), std::move(pipePath), useDeliveryFence)) + , PipeHolder_(std::move(pipeHolder)) + { } + + TFDConnection( + TFileDescriptor fd, + const TNetworkAddress& localAddress, + const TNetworkAddress& remoteAddress, + IPollerPtr poller) + : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, std::move(poller))) + { } + + TFDConnection( + TRefCountedPtr pipeHolder, + TFDConnectionImplPtr impl) + : Impl_(std::move(impl)) + , PipeHolder_(std::move(pipeHolder)) + { } + + ~TFDConnection() { - return Impl_->GetReadByteCount(); + YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned"))); } - - i64 GetWriteByteCount() const override + const TFDConnectionImplPtr& GetImpl() const { - return Impl_->GetWriteByteCount(); + return Impl_; } - TConnectionStatistics GetReadStatistics() const override +private: + const TFDConnectionImplPtr Impl_; + const TRefCountedPtr PipeHolder_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +#ifdef _linux_ + +[[noreturn]] void ThrowUnimplemented(std::string_view method) +{ + THROW_ERROR_EXCEPTION("%v is not supported", method); +} + +// NOTE(pogorelov): Using 2 TDeliveryFencedWriteConnection concurrently is not supported. +// Desired behavior: +// Write should complete only after all data has been written to the pipe and fully read by the reader. +// Yes, this is not what pipes were originally designed for. +// However, we now have a large amount of user code relying on our API (which currently only reads from stdin). +// We want to be able to adjust the supplying data portion size without modifying the API, hence this solution. +// +// Implementation: +// To achieve this behavior, we use: +// - signalfd (for signal-based notifications) +// - FASYNC mode (for asynchronous pipe I/O) +// - FIONREAD (to check remaining data in the pipe) +// +// Note about FASYNC Behavior: +// Starting from kernel 5.15, the writer receives a signal (SIGIO) on every read operation from the pipe. +// In earlier kernel versions, this behavior has been changed several times, but it appears to be finalized in 5.15 and later: +// https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit?id=fe67f4dd8daa252eb9aa7acb61555f3cc3c1ce4c +// +// Initialization Steps: +// 1) Configure the pipe for signals: +// - Set the process as the FIFO owner (F_SETOWN). +// - Enable FASYNC on the write-side file descriptor (F_SETFL) to receive I/O event signals. +// - Use a custom real-time signal (instead of SIGIO) to not occupy SIGIO. +// - We use real-time signal as a reserve for the future +// (when RT signal is used, kernel stores the queue with signal and its metadata such as fd it triggered by). +// 2) Block the custom signal for the entire process (sigprocmask). +// 3) Create a signalfd to read signals via a file descriptor. +// 4) Arm epoll for the signal FD: +// Monitor for EPOLLIN (even though this is the writer side—we’re waiting to read signals). +// +// Write Steps: +// 1) Write data to the pipe: +// Write until all data is sent or EWOULDBLOCK is encountered. +// (Unlike common writer, we don’t immediately sleep on epoll. +// Opening the pipe for reading wakes the writer, but FASYNC does not send SIGIO in this case.) +// 2) Check pipe status with FIONREAD: +// If the pipe is empty, writing is complete. +// 3) Wait for signals via epoll. +// 4) Process signals from signalfd: +// Read all the signals and repeat from Step 1. +class TDeliveryFencedWriteConnection + : public TWriteConnectionBase<TDeliveryFencedWriteConnection> +{ +public: + TDeliveryFencedWriteConnection( + TRefCountedPtr pipeHolder, + IPollerPtr poller, + std::string pipePath, + std::optional<int> capacity) + : Impl_(TDeliveryFencedWriteConnectionImpl::Create(std::move(poller), std::move(pipePath), capacity)) + , PipeHolder_(std::move(pipeHolder)) { - return Impl_->GetReadStatistics(); + YT_VERIFY(!HasActiveConnection.exchange(true)); } - TConnectionStatistics GetWriteStatistics() const override + ~TDeliveryFencedWriteConnection() { - return Impl_->GetWriteStatistics(); + YT_VERIFY(HasActiveConnection.exchange(false)); + YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned"))); } - - void SetReadDeadline(std::optional<TInstant> deadline) override + TFuture<void> WriteV(const TSharedRefArray& /*data*/) final { - Impl_->SetReadDeadline(deadline); + ThrowUnimplemented("WriteV"); } - void SetWriteDeadline(std::optional<TInstant> deadline) override + TFuture<void> CloseWrite() final { - Impl_->SetWriteDeadline(deadline); + ThrowUnimplemented("CloseWrite");; } - bool SetNoDelay() override + void SetWriteDeadline(std::optional<TInstant> /*deadline*/) { - return Impl_->SetNoDelay(); + ThrowUnimplemented("SetWriteDeadline"); } - bool SetKeepAlive() override + TFuture<void> Write(const TSharedRef& data) final { - return Impl_->SetKeepAlive(); + return static_cast<TDeliveryFencedWriteConnectionImpl&>(*Impl_).Write(data); } - void SubscribePeerDisconnect(TCallback<void()> cb) override + const TDeliveryFencedWriteConnectionImplPtr& GetImpl() const { - return Impl_->SubscribePeerDisconnect(std::move(cb)); + return Impl_; } private: - const TFDConnectionImplPtr Impl_; + const TDeliveryFencedWriteConnectionImplPtr Impl_; const TRefCountedPtr PipeHolder_; + + static inline std::atomic<bool> HasActiveConnection = false; }; +#endif // _linux_ + //////////////////////////////////////////////////////////////////////////////// namespace { TFileDescriptor CreateWriteFDForConnection( - const TString& pipePath, + const std::string& pipePath, std::optional<int> capacity, + // COMPAT(pogorelov) bool useDeliveryFence) { #ifdef _unix_ int flags = O_WRONLY | O_CLOEXEC; - int fd = HandleEintr(::open, pipePath.c_str(), flags); - if (fd == -1) { - THROW_ERROR_EXCEPTION("Failed to open named pipe") - << TError::FromSystem() + TFileDescriptorGuard fd = HandleEintr(::open, pipePath.c_str(), flags); + if (fd.Get() == -1) { + THROW_ERROR_EXCEPTION(MakeSystemError("Failed to open named pipe")) << TErrorAttribute("path", pipePath); } try { if (capacity) { - SafeSetPipeCapacity(fd, *capacity); + SafeSetPipeCapacity(fd.Get(), *capacity); } if (useDeliveryFence) { - SafeEnableEmptyPipeEpollEvent(fd); + SafeEnableEmptyPipeEpollEvent(fd.Get()); } - SafeMakeNonblocking(fd); + SafeMakeNonblocking(fd.Get()); + } catch (const std::exception& ex) { + YT_LOG_WARNING( + TError(ex) << TError::FromSystem(), + "Failed to open pipe for writing (UseDeliveryFence: %v, Capacity: %v)", + useDeliveryFence, + capacity); + throw; } catch (...) { - SafeClose(fd, false); + YT_LOG_WARNING( + "Failed to open pipe for writing (MaybeRelevantError: %v, UseDeliveryFence: %v, Capacity: %v)", + TError::FromSystem(), + useDeliveryFence, + capacity); throw; } - return fd; + return fd.Release(); #else THROW_ERROR_EXCEPTION("Unsupported platform"); #endif @@ -1353,31 +1827,27 @@ std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(IPollerPtr poller #endif if (HandleEintr(::socketpair, AF_LOCAL, flags, 0, fds) == -1) { - THROW_ERROR_EXCEPTION("Failed to create socket pair") - << TError::FromSystem(); + THROW_ERROR_EXCEPTION(MakeSystemError("Failed to create socket pair")); } #else if (SocketPair(fds, /*overlapped*/ false, /*cloexec*/ true) == SOCKET_ERROR) { - THROW_ERROR_EXCEPTION("Failed to create socket pair") - << TError::FromSystem(); + THROW_ERROR_EXCEPTION(MakeSystemError("Failed to create socket pair")); } SetNonBlock(fds[0]); SetNonBlock(fds[1]); #endif + TFileDescriptorGuard fd0(fds[0]); + TFileDescriptorGuard fd1(fds[1]); - try { - auto address0 = GetSocketName(fds[0]); - auto address1 = GetSocketName(fds[1]); + auto address0 = GetSocketName(fds[0]); + auto address1 = GetSocketName(fds[1]); - auto first = New<TFDConnection>(fds[0], address0, address1, poller); - auto second = New<TFDConnection>(fds[1], address1, address0, std::move(poller)); - return std::pair(std::move(first), std::move(second)); - } catch (...) { - YT_VERIFY(TryClose(fds[0], false)); - YT_VERIFY(TryClose(fds[1], false)); - throw; - } + auto first = New<TFDConnection>(fds[0], address0, address1, poller); + fd0.Release(); + auto second = New<TFDConnection>(fds[1], address1, address0, std::move(poller)); + fd1.Release(); + return std::pair(std::move(first), std::move(second)); } IConnectionPtr CreateConnectionFromFD( @@ -1391,46 +1861,65 @@ IConnectionPtr CreateConnectionFromFD( IConnectionReaderPtr CreateInputConnectionFromFD( TFileDescriptor fd, - TString pipePath, + const std::string& /*pipePath*/, IPollerPtr poller, const TRefCountedPtr& pipeHolder) { - return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder); + return New<TFDConnection>(fd, std::move(poller), pipeHolder); } IConnectionReaderPtr CreateInputConnectionFromPath( - TString pipePath, + std::string pipePath, IPollerPtr poller, - const TRefCountedPtr& pipeHolder) + TRefCountedPtr pipeHolder) { #ifdef _unix_ int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK; - int fd = HandleEintr(::open, pipePath.c_str(), flags); - if (fd == -1) { - THROW_ERROR_EXCEPTION("Failed to open named pipe") - << TError::FromSystem() + TFileDescriptorGuard fd = HandleEintr(::open, pipePath.c_str(), flags); + if (fd.Get() == -1) { + THROW_ERROR_EXCEPTION(MakeSystemError("Failed to open named pipe")) << TErrorAttribute("path", pipePath); } - return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder); + auto connection = New<TFDConnection>(fd.Get(), std::move(poller), std::move(pipeHolder), std::move(pipePath)); + fd.Release(); + return connection; #else THROW_ERROR_EXCEPTION("Unsupported platform"); #endif } IConnectionWriterPtr CreateOutputConnectionFromPath( - TString pipePath, + std::string pipePath, IPollerPtr poller, - const TRefCountedPtr& pipeHolder, + TRefCountedPtr pipeHolder, std::optional<int> capacity, - bool useDeliveryFence) + EDeliveryFencedMode deliveryFencedMode) { - return New<TFDConnection>( - CreateWriteFDForConnection(pipePath, capacity, useDeliveryFence), - std::move(pipePath), + if (deliveryFencedMode == EDeliveryFencedMode::New) { + #ifdef _linux_ + return New<TDeliveryFencedWriteConnection>( + std::move(pipeHolder), + std::move(poller), + std::move(pipePath), + capacity); + #else // _linux_ + THROW_ERROR_EXCEPTION("Delivery fenced write is not supported on this platform"); + #endif // _linux_ + } + + bool useDeliveryFence = deliveryFencedMode == EDeliveryFencedMode::Old; + + TFileDescriptorGuard fd = CreateWriteFDForConnection(pipePath, capacity, useDeliveryFence); + auto connection = New<TFDConnection>( + fd.Get(), std::move(poller), - pipeHolder, + std::move(pipeHolder), + std::move(pipePath), useDeliveryFence); + fd.Release(); + + return connection; } //////////////////////////////////////////////////////////////////////////////// @@ -1475,16 +1964,18 @@ IPacketConnectionPtr CreatePacketConnection( const TNetworkAddress& at, NConcurrency::IPollerPtr poller) { - auto fd = CreateUdpSocket(); + TFileDescriptorGuard fd = CreateUdpSocket(); try { - SetReuseAddrFlag(fd); - BindSocket(fd, at); + SetReuseAddrFlag(fd.Get()); + BindSocket(fd.Get(), at); } catch (...) { - SafeClose(fd, false); + SafeClose(fd.Get(), false); throw; } - return New<TPacketConnection>(fd, at, std::move(poller)); + auto connection = New<TPacketConnection>(fd.Get(), at, std::move(poller)); + fd.Release(); + return connection; } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h index 3c9998e1d0c..4f0c3e4c238 100644 --- a/yt/yt/core/net/connection.h +++ b/yt/yt/core/net/connection.h @@ -65,8 +65,8 @@ DEFINE_REFCOUNTED_TYPE(IConnectionWriter) //////////////////////////////////////////////////////////////////////////////// struct IConnection - : public IConnectionReader - , public IConnectionWriter + : public virtual IConnectionReader + , public virtual IConnectionWriter { virtual TConnectionId GetId() const = 0; @@ -104,21 +104,21 @@ IConnectionPtr CreateConnectionFromFD( IConnectionReaderPtr CreateInputConnectionFromFD( TFileDescriptor fd, - TString pipePath, + const std::string& pipePath, NConcurrency::IPollerPtr poller, const TRefCountedPtr& pipeHolder); IConnectionReaderPtr CreateInputConnectionFromPath( - TString pipePath, + std::string pipePath, NConcurrency::IPollerPtr poller, - const TRefCountedPtr& pipeHolder); + TRefCountedPtr pipeHolder); IConnectionWriterPtr CreateOutputConnectionFromPath( - TString pipePath, + std::string pipePath, NConcurrency::IPollerPtr poller, - const TRefCountedPtr& pipeHolder, + TRefCountedPtr pipeHolder, std::optional<int> capacity = {}, - bool useDeliveryFence = false); + EDeliveryFencedMode deliveryFencedMode = EDeliveryFencedMode::None); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h index 725dc0c5170..bbca1eccfb4 100644 --- a/yt/yt/core/net/public.h +++ b/yt/yt/core/net/public.h @@ -8,6 +8,10 @@ #include <library/cpp/yt/misc/guid.h> +#ifdef _linux_ + #include <sys/signalfd.h> +#endif + namespace NYT::NNet { //////////////////////////////////////////////////////////////////////////////// @@ -18,6 +22,18 @@ class TIP6Network; using TConnectionId = TGuid; +enum class EDeliveryFencedMode +{ + None, + // COMPAT(pogorelov) + Old, + New, +}; + +#ifdef _linux_ +static inline const auto DeliveryFencedWriteSignal = SIGRTMIN; +#endif // _linux + DECLARE_REFCOUNTED_STRUCT(IConnection) DECLARE_REFCOUNTED_STRUCT(IPacketConnection) DECLARE_REFCOUNTED_STRUCT(IConnectionReader) |