aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpogorelov <pogorelov@yandex-team.com>2025-04-25 14:31:39 +0300
committerpogorelov <pogorelov@yandex-team.com>2025-04-25 15:23:46 +0300
commitb24a00dcb26a76e07659a31d8e81596665547982 (patch)
tree4c78d7dd86c636a1f2cd1ac5f67c7a1d87068c1f
parente338fd9998b6fe51e62b62b2602254dff6db4bf3 (diff)
downloadydb-b24a00dcb26a76e07659a31d8e81596665547982.tar.gz
YT-24704: Introduce new delivery fenced connection for pipes
* Changelog entry Type: feature Component: controller-agent Add new delivery fenced connection that works on vanilla linux kernel. It may be used for cpu intensive or gpu jobs to prevent job abort on interruption. commit_hash:02e0f41ad907dfe1c11defc5439214e31ea12ad5
-rw-r--r--yt/yt/core/misc/proc.cpp47
-rw-r--r--yt/yt/core/misc/proc.h24
-rw-r--r--yt/yt/core/net/connection.cpp967
-rw-r--r--yt/yt/core/net/connection.h16
-rw-r--r--yt/yt/core/net/public.h16
5 files changed, 824 insertions, 246 deletions
diff --git a/yt/yt/core/misc/proc.cpp b/yt/yt/core/misc/proc.cpp
index 4f043800fbd..20179f75d2b 100644
--- a/yt/yt/core/misc/proc.cpp
+++ b/yt/yt/core/misc/proc.cpp
@@ -106,6 +106,53 @@ bool IsSystemError(const TError& error)
////////////////////////////////////////////////////////////////////////////////
+TFileDescriptorGuard::TFileDescriptorGuard(TFileDescriptor fd) noexcept
+ : FD_(fd)
+{}
+
+TFileDescriptorGuard::~TFileDescriptorGuard()
+{
+ Reset();
+}
+
+TFileDescriptorGuard::TFileDescriptorGuard(TFileDescriptorGuard&& other) noexcept
+ : FD_(other.FD_)
+{
+ other.FD_ = -1;
+}
+
+TFileDescriptorGuard& TFileDescriptorGuard::operator=(TFileDescriptorGuard&& other) noexcept
+{
+ if (this != &other) {
+ Reset();
+ FD_ = other.FD_;
+ other.FD_ = -1;
+ }
+ return *this;
+}
+
+TFileDescriptor TFileDescriptorGuard::Get() const noexcept
+{
+ return FD_;
+}
+
+TFileDescriptor TFileDescriptorGuard::Release() noexcept
+{
+ TFileDescriptor fd = FD_;
+ FD_ = -1;
+ return fd;
+}
+
+void TFileDescriptorGuard::Reset() noexcept
+{
+ if (FD_ != -1) {
+ YT_VERIFY(TryClose(FD_, false));
+ FD_ = -1;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
std::optional<int> GetParentPid(int pid)
{
TFileInput in(Format("/proc/%v/status", pid));
diff --git a/yt/yt/core/misc/proc.h b/yt/yt/core/misc/proc.h
index a76fea3c54e..7111ffbdafd 100644
--- a/yt/yt/core/misc/proc.h
+++ b/yt/yt/core/misc/proc.h
@@ -28,6 +28,30 @@ bool IsSystemError(const TError& error);
////////////////////////////////////////////////////////////////////////////////
+class TFileDescriptorGuard
+{
+public:
+ TFileDescriptorGuard(TFileDescriptor fd = -1) noexcept;
+
+ ~TFileDescriptorGuard();
+
+ TFileDescriptorGuard(const TFileDescriptorGuard&) = delete;
+ TFileDescriptorGuard& operator = (const TFileDescriptorGuard&) = delete;
+
+ TFileDescriptorGuard(TFileDescriptorGuard&& other) noexcept;
+ TFileDescriptorGuard& operator = (TFileDescriptorGuard&& other) noexcept;
+
+ TFileDescriptor Get() const noexcept;
+
+ TFileDescriptor Release() noexcept;
+ void Reset() noexcept;
+
+private:
+ TFileDescriptor FD_ = -1;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
std::vector<int> ListPids();
std::vector<int> GetPidsByUid(int uid = -1);
std::vector<int> GetPidsUnderParent(int targetPid);
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index c1d9ae15a8d..5f64072b0c9 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -10,6 +10,8 @@
#include <yt/yt/core/net/socket.h>
+#include <library/cpp/yt/memory/non_null_ptr.h>
+
#include <library/cpp/yt/system/handle_eintr.h>
#include <util/network/pollerimpl.h>
@@ -18,6 +20,7 @@
#ifdef _linux_
#include <sys/ioctl.h>
+ #include <sys/signalfd.h>
#endif
#ifdef _win_
@@ -97,48 +100,205 @@ ssize_t WriteToFD(TFileDescriptor fd, const char* buffer, size_t length)
#endif
}
-TErrorOr<int> CheckPipeBytesLeftToRead(const TString& pipePath) noexcept
+TError MakeSystemError(TFormatString<> message)
+{
+ return TError(message)
+ << TError::FromSystem();
+}
+
+[[maybe_unused]] TErrorOr<int> CheckPipeBytesLeftToRead(TFileDescriptor fd) noexcept
{
#ifdef _linux_
int bytesLeft = 0;
- auto makeSystemError = [&] (TFormatString<> message) {
- return TError(message)
- << TError::FromSystem()
- << TErrorAttribute("pipe_path", pipePath);
- };
-
{
- int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK;
- int fd = HandleEintr(::open, pipePath.c_str(), flags);
-
- if (fd == -1) {
- return makeSystemError("Failed to open file descriptor");
- }
-
int ret = ::ioctl(fd, FIONREAD, &bytesLeft);
-
if (ret == -1) {
- return makeSystemError("ioctl failed");
- }
-
- if (!TryClose(fd, /*ignoreBadFD*/ false)) {
- return makeSystemError("Failed to close file descriptor");
+ return MakeSystemError("ioctl failed");
}
}
return bytesLeft;
#else
- Y_UNUSED(pipePath);
+ Y_UNUSED(fd);
return TError("Unsupported platform");
#endif
}
+template <class TDerived>
+class TWriteConnectionBase
+ : public virtual IConnectionWriter
+{
+public:
+ TFuture<void> Write(const TSharedRef& data) override
+ {
+ return ToDerived()->GetImpl()->Write(data);
+ }
+
+ TFuture<void> Close() override
+ {
+ return ToDerived()->GetImpl()->Close();
+ }
+
+ TFuture<void> WriteV(const TSharedRefArray& data) override
+ {
+ return ToDerived()->GetImpl()->WriteV(data);
+ }
+
+ TFuture<void> CloseWrite() override
+ {
+ return ToDerived()->GetImpl()->CloseWrite();
+ }
+
+ TFuture<void> Abort() override
+ {
+ return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted"));
+ }
+
+ int GetHandle() const override
+ {
+ return ToDerived()->GetImpl()->GetHandle();
+ }
+
+ i64 GetWriteByteCount() const override
+ {
+ return ToDerived()->GetImpl()->GetWriteByteCount();
+ }
+
+ void SetWriteDeadline(std::optional<TInstant> deadline) override
+ {
+ ToDerived()->GetImpl()->SetWriteDeadline(deadline);
+ }
+
+ TConnectionStatistics GetWriteStatistics() const override
+ {
+ return ToDerived()->GetImpl()->GetWriteStatistics();
+ }
+
+protected:
+ TDerived* ToDerived()
+ {
+ return static_cast<TDerived*>(this);
+ }
+
+ const TDerived* ToDerived() const
+ {
+ return static_cast<const TDerived*>(this);
+ }
+};
+
+template <class TDerived>
+class TReadConnectionBase
+ : public virtual IConnectionReader
+{
+public:
+ TFuture<size_t> Read(const TSharedMutableRef& data) override
+ {
+ return ToDerived()->GetImpl()->Read(data);
+ }
+
+ TFuture<void> CloseRead() override
+ {
+ return ToDerived()->GetImpl()->CloseRead();
+ }
+
+ TFuture<void> Abort() override
+ {
+ return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted"));
+ }
+
+ int GetHandle() const override
+ {
+ return ToDerived()->GetImpl()->GetHandle();
+ }
+
+ i64 GetReadByteCount() const override
+ {
+ return ToDerived()->GetImpl()->GetReadByteCount();
+ }
+
+ void SetReadDeadline(std::optional<TInstant> deadline) override
+ {
+ ToDerived()->GetImpl()->SetReadDeadline(deadline);
+ }
+
+ TConnectionStatistics GetReadStatistics() const override
+ {
+ return ToDerived()->GetImpl()->GetReadStatistics();
+ }
+protected:
+ TDerived* ToDerived()
+ {
+ return static_cast<TDerived*>(this);
+ }
+
+ const TDerived* ToDerived() const
+ {
+ return static_cast<const TDerived*>(this);
+ }
+};
+
+template <class TDerived>
+class TReadWriteConnectionBase
+ : public TWriteConnectionBase<TDerived>
+ , public TReadConnectionBase<TDerived>
+ , public IConnection
+{
+ using TWriteConnectionBase<TDerived>::ToDerived;
+
+public:
+ TConnectionId GetId() const override
+ {
+ return ToDerived()->GetImpl()->GetId();
+ }
+
+ const TNetworkAddress& GetLocalAddress() const override
+ {
+ return ToDerived()->GetImpl()->GetLocalAddress();
+ }
+
+ const TNetworkAddress& GetRemoteAddress() const override
+ {
+ return ToDerived()->GetImpl()->GetRemoteAddress();
+ }
+
+ bool IsIdle() const override
+ {
+ return ToDerived()->GetImpl()->IsIdle();
+ }
+
+ bool IsReusable() const override
+ {
+ return ToDerived()->GetImpl()->IsReusable();
+ }
+
+ bool SetNoDelay() override
+ {
+ return ToDerived()->GetImpl()->SetNoDelay();
+ }
+
+ bool SetKeepAlive() override
+ {
+ return ToDerived()->GetImpl()->SetKeepAlive();
+ }
+
+ TFuture<void> Abort() override
+ {
+ return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted"));
+ }
+
+ void SubscribePeerDisconnect(TCallback<void()> cb) override
+ {
+ return ToDerived()->GetImpl()->SubscribePeerDisconnect(std::move(cb));
+ }
+};
+
} // namespace
////////////////////////////////////////////////////////////////////////////////
DECLARE_REFCOUNTED_CLASS(TFDConnectionImpl)
+DECLARE_REFCOUNTED_CLASS(TDeliveryFencedWriteConnectionImpl)
////////////////////////////////////////////////////////////////////////////////
@@ -182,8 +342,7 @@ public:
return TIOResult{.Retry = Position_ == 0, .ByteCount = bytesRead};
}
- return TError("Read failed")
- << TError::FromSystem();
+ return MakeSystemError("Read failed");
}
if (size == 0) {
break;
@@ -243,8 +402,7 @@ public:
return TIOResult{.Retry = true, .ByteCount = 0};
}
- return TError("Read failed")
- << TError::FromSystem();
+ return MakeSystemError("Read failed");
}
Position_ += size;
@@ -297,8 +455,7 @@ public:
if (GetLastNetworkError() == EWOULDBLOCK) {
return TIOResult{.Retry = true, .ByteCount = bytesWritten};
}
- return TError("Write failed")
- << TError::FromSystem();
+ return MakeSystemError("Write failed");
}
YT_VERIFY(size > 0);
@@ -323,6 +480,12 @@ public:
return ResultPromise_.ToFuture();
}
+protected:
+ bool IsWriteComplete(const TErrorOr<TIOResult>& result)
+ {
+ return result.IsOK() && !result.Value().Retry;
+ }
+
private:
const TSharedRef Buffer_;
const TPromise<void> ResultPromise_ = NewPromise<void>();
@@ -332,11 +495,12 @@ private:
////////////////////////////////////////////////////////////////////////////////
-class TDeliveryFencedWriteOperation
+#ifdef _linux_
+class TDeliveryFencedWriteOperationOld
: public TWriteOperation
{
public:
- TDeliveryFencedWriteOperation(TSharedRef buffer, TString pipePath)
+ TDeliveryFencedWriteOperationOld(TSharedRef buffer, std::string pipePath)
: TWriteOperation(std::move(buffer))
, PipePath_(std::move(pipePath))
{ }
@@ -345,12 +509,19 @@ public:
{
auto result = TWriteOperation::PerformIO(fd);
if (IsWriteComplete(result)) {
- auto bytesLeftOrError = CheckPipeBytesLeftToRead(PipePath_);
+ int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK;
+ int fd = HandleEintr(::open, PipePath_.data(), flags);
+ if (fd == -1) {
+ return MakeSystemError("Failed to open file descriptor");
+ }
+ auto bytesLeftOrError = CheckPipeBytesLeftToRead(fd);
+
+ YT_VERIFY(TryClose(fd, /*ignoreBadFD*/ false));
if (!bytesLeftOrError.IsOK()) {
YT_LOG_ERROR(bytesLeftOrError, "Delivery fenced write failed");
- return std::move(bytesLeftOrError).Wrap();
+ return bytesLeftOrError;
} else {
YT_LOG_DEBUG("Delivery fenced write pipe check finished (BytesLeft: %v)", bytesLeftOrError.Value());
}
@@ -364,14 +535,91 @@ public:
}
private:
- const TString PipePath_;
+ const std::string PipePath_;
+};
- bool IsWriteComplete(const TErrorOr<TIOResult>& result)
+class TDeliveryFencedWriteOperation
+ : public TWriteOperation
+{
+public:
+ TDeliveryFencedWriteOperation(TSharedRef buffer, TFileDescriptor writeFd, TFileDescriptor readFd)
+ : TWriteOperation(std::move(buffer))
+ , WriteFD_(writeFd)
+ , ReadFD_(readFd)
+ { }
+
+ TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override
{
- return result.IsOK() && !result.Value().Retry;
+ if (!std::exchange(FirstWritingAttempt_, false)) {
+ auto errorOrIsSignalConsumed = ConsumeSignalFdEventsAndCheckItContainsOurFD(fd);
+
+ if (!errorOrIsSignalConsumed.IsOK()) {
+ return errorOrIsSignalConsumed;
+ }
+
+ if (!errorOrIsSignalConsumed.Value()) {
+ return TIOResult{
+ .Retry = true,
+ .ByteCount = 0,
+ };
+ }
+ }
+
+ auto result = TWriteOperation::PerformIO(WriteFD_);
+ if (IsWriteComplete(result)) {
+ auto bytesLeftOrError = CheckPipeBytesLeftToRead(ReadFD_);
+
+ if (!bytesLeftOrError.IsOK()) {
+ YT_LOG_ERROR(bytesLeftOrError, "Delivery fenced write failed");
+ return bytesLeftOrError;
+ } else {
+ YT_LOG_DEBUG("Delivery fenced write pipe check finished (BytesLeft: %v)", bytesLeftOrError.Value());
+ }
+
+ result.Value().Retry = (bytesLeftOrError.Value() != 0);
+ } else {
+ YT_LOG_DEBUG("Delivery fenced write to pipe step finished (Result: %v)", result);
+ }
+
+ return result;
+ }
+
+private:
+ const TFileDescriptor WriteFD_;
+ const TFileDescriptor ReadFD_;
+
+ bool FirstWritingAttempt_ = true;
+
+ TErrorOr<bool> ConsumeSignalFdEventsAndCheckItContainsOurFD(TFileDescriptor fd)
+ {
+ bool signalForWriteFDConsumed = false;
+ for (; ;) {
+ struct signalfd_siginfo fdsi;
+ ssize_t s = HandleEintr(::read, fd, &fdsi, sizeof(fdsi));
+ if (s <= 0) {
+ break;
+ }
+ if (s != sizeof(fdsi)) {
+ return MakeSystemError("Invalid signalfd_siginfo size");
+ }
+
+ if (fdsi.ssi_signo != static_cast<uint32_t>(DeliveryFencedWriteSignal)) {
+ continue;
+ }
+
+ if (fdsi.ssi_fd != WriteFD_) {
+ continue;
+ }
+
+ signalForWriteFDConsumed = true;
+ }
+
+ return signalForWriteFDConsumed;
}
};
+#endif // _linux_
+
////////////////////////////////////////////////////////////////////////////////
class TWriteVOperation
@@ -407,8 +655,7 @@ public:
return TIOResult{.Retry = true, .ByteCount = bytesWritten};
}
- return TError("Write failed")
- << TError::FromSystem();
+ return MakeSystemError("Write failed");
}
YT_VERIFY(size > 0);
@@ -460,8 +707,7 @@ public:
{
int res = HandleEintr(::shutdown, fd, ShutdownRead_ ? SHUT_RD : SHUT_WR);
if (res == -1) {
- return TError("Shutdown failed")
- << TError::FromSystem();
+ return MakeSystemError("Shutdown failed");
}
return TIOResult{.Retry = false, .ByteCount = 0};
}
@@ -488,17 +734,33 @@ private:
////////////////////////////////////////////////////////////////////////////////
+// TODO(pogorelov): Make separate clases for pipe and socket connections.
class TFDConnectionImpl
: public TPollableBase
{
+ struct TIODirection;
public:
static TFDConnectionImplPtr Create(
TFileDescriptor fd,
- TString filePath,
IPollerPtr poller,
+ std::string filePath,
+ // COMPAT(pogorelov)
bool useDeliveryFence)
{
- auto impl = New<TFDConnectionImpl>(fd, std::move(filePath), std::move(poller), useDeliveryFence);
+ #ifndef _linux_
+ THROW_ERROR_EXCEPTION_IF(useDeliveryFence, "Delivery fenced write is not supported on this platform");
+ #endif // _linux_
+ auto epollControl = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered;
+ auto readEpollControl = EPollControl::Read;
+ auto writeEpollControl = EPollControl::Write;
+ auto impl = New<TFDConnectionImpl>(
+ fd,
+ epollControl,
+ readEpollControl,
+ writeEpollControl,
+ std::move(poller),
+ std::move(filePath),
+ useDeliveryFence);
impl->Init();
return impl;
}
@@ -509,8 +771,19 @@ public:
const TNetworkAddress& remoteAddress,
IPollerPtr poller)
{
- auto impl = New<TFDConnectionImpl>(fd, localAddress, remoteAddress, std::move(poller));
+ auto epollControl = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered | EPollControl::ReadHup;
+ auto readEpollControl = EPollControl::Read;
+ auto writeEpollControl = EPollControl::Write;
+ auto impl = New<TFDConnectionImpl>(
+ fd,
+ epollControl,
+ readEpollControl,
+ writeEpollControl,
+ localAddress,
+ remoteAddress,
+ std::move(poller));
impl->Init();
+
return impl;
}
@@ -521,8 +794,8 @@ public:
void OnEvent(EPollControl control) override
{
- DoIO(&WriteDirection_, Any(control & EPollControl::Write));
- DoIO(&ReadDirection_, Any(control & EPollControl::Read));
+ DoIO(GetPtr(WriteDirection_), Any(control & WriteEpollControl_));
+ DoIO(GetPtr(ReadDirection_), Any(control & ReadEpollControl_));
if (Any(control & EPollControl::ReadHup)) {
OnPeerDisconnected();
@@ -566,7 +839,7 @@ public:
}
Poller_->Unarm(FD_, this);
- YT_VERIFY(TryClose(FD_, false));
+ YT_VERIFY(TryClose(FD_, /*ignoreBadFD*/ false));
FD_ = -1;
OnPeerDisconnected();
@@ -580,7 +853,7 @@ public:
{
auto read = std::make_unique<TReadOperation>(data);
auto future = read->ToFuture();
- StartIO(&ReadDirection_, std::move(read));
+ StartIO(GetPtr(ReadDirection_), std::move(read));
return future;
}
@@ -588,7 +861,7 @@ public:
{
auto receive = std::make_unique<TReceiveFromOperation>(buffer);
auto future = receive->ToFuture();
- StartIO(&ReadDirection_, std::move(receive));
+ StartIO(GetPtr(ReadDirection_), std::move(receive));
return future;
}
@@ -604,8 +877,7 @@ public:
address.GetSockAddr(),
address.GetLength());
if (res == -1) {
- THROW_ERROR_EXCEPTION("Write failed")
- << TError::FromSystem();
+ THROW_ERROR_EXCEPTION(MakeSystemError("Write failed"));
}
}
@@ -623,18 +895,34 @@ public:
TFuture<void> Write(const TSharedRef& data)
{
- if (UseDeliveryFence_) {
- return DoDeliveryFencedWrite(data);
- }
+ #ifdef _linux_
+ auto writeOperation = UseDeliveryFence_
+ ? std::make_unique<TDeliveryFencedWriteOperationOld>(data, PipePath_)
+ : std::make_unique<TWriteOperation>(data);
+ #else // _linux_
+ auto writeOperation = std::make_unique<TWriteOperation>(data);
+ YT_VERIFY(!UseDeliveryFence_);
+ #endif // _linux_
+
+ auto future = writeOperation->ToFuture();
+
+ DoWrite(std::move(writeOperation));
+
+ return future;
+ }
- return DoWrite(data);
+ // forcefullyConsiderPending used to perform first io without waiting event on epoll.
+ // See TDeliveryFencedWriteConnection.
+ void DoWrite(std::unique_ptr<IIOOperation> operation, bool forcefullyConsiderPending = false)
+ {
+ StartIO(GetPtr(WriteDirection_), std::move(operation), forcefullyConsiderPending);
}
TFuture<void> WriteV(const TSharedRefArray& data)
{
auto writeV = std::make_unique<TWriteVOperation>(data);
auto future = writeV->ToFuture();
- StartIO(&WriteDirection_, std::move(writeV));
+ StartIO(GetPtr(WriteDirection_), std::move(writeV));
return future;
}
@@ -661,17 +949,17 @@ public:
return IsIdle();
}
- TFuture<void> Abort(const TError& error)
+ TFuture<void> Abort(TError error)
{
YT_LOG_DEBUG(error, "Aborting connection");
- return AbortIO(error);
+ return AbortIO(std::move(error));
}
TFuture<void> CloseRead()
{
auto shutdownRead = std::make_unique<TShutdownOperation>(true);
auto future = shutdownRead->ToFuture();
- StartIO(&ReadDirection_, std::move(shutdownRead));
+ StartIO(GetPtr(ReadDirection_), std::move(shutdownRead));
return future;
}
@@ -679,7 +967,7 @@ public:
{
auto shutdownWrite = std::make_unique<TShutdownOperation>(false);
auto future = shutdownWrite->ToFuture();
- StartIO(&WriteDirection_, std::move(shutdownWrite));
+ StartIO(GetPtr(WriteDirection_), std::move(shutdownWrite));
return future;
}
@@ -760,35 +1048,30 @@ public:
PeerDisconnectedList_.Subscribe(std::move(callback));
}
-private:
+protected:
const TConnectionId Id_ = TConnectionId::Create();
- const TString Endpoint_;
+ const std::string Endpoint_;
const std::string LoggingTag_;
const NLogging::TLogger Logger;
- const TNetworkAddress LocalAddress_;
- const TNetworkAddress RemoteAddress_;
TFileDescriptor FD_ = -1;
- const IPollerPtr Poller_;
-
- // If set to true via ctor argument
- // |useDeliveryFence| will use
- // DeliverFencedWriteOperations
- // instead of WriteOperations,
- // which future is set only
- // after data from pipe has been read.
- const bool UseDeliveryFence_ = false;
- const TString PipePath_;
-
+ int SynchronousIOCount_ = 0;
TFDConnectionImpl(
TFileDescriptor fd,
- TString filePath,
+ EPollControl FDEpollControl,
+ EPollControl readEpollControl,
+ EPollControl writeEpollControl,
IPollerPtr poller,
+ std::string filePath,
+ // COMPAT(pogorelov)
bool useDeliveryFence)
: Endpoint_(Format("File{%v}", filePath))
, LoggingTag_(MakeLoggingTag(Id_, Endpoint_))
, Logger(NetLogger().WithRawTag(LoggingTag_))
, FD_(fd)
+ , FDEpollControl_(FDEpollControl)
+ , ReadEpollControl_(readEpollControl)
+ , WriteEpollControl_(writeEpollControl)
, Poller_(std::move(poller))
, UseDeliveryFence_(useDeliveryFence)
, PipePath_(std::move(filePath))
@@ -796,15 +1079,21 @@ private:
TFDConnectionImpl(
TFileDescriptor fd,
+ EPollControl epollControl,
+ EPollControl readEpollControl,
+ EPollControl writeEpollControl,
const TNetworkAddress& localAddress,
const TNetworkAddress& remoteAddress,
IPollerPtr poller)
: Endpoint_(Format("FD{%v<->%v}", localAddress, remoteAddress))
, LoggingTag_(MakeLoggingTag(Id_, Endpoint_))
, Logger(NetLogger().WithRawTag(LoggingTag_))
+ , FD_(fd)
+ , FDEpollControl_(epollControl)
+ , ReadEpollControl_(readEpollControl)
+ , WriteEpollControl_(writeEpollControl)
, LocalAddress_(localAddress)
, RemoteAddress_(remoteAddress)
- , FD_(fd)
, Poller_(std::move(poller))
{ }
@@ -813,6 +1102,34 @@ private:
YT_LOG_DEBUG("Connection destroyed");
}
+ void Arm(EPollControl additionalFlags = {})
+ {
+ Poller_->Arm(FD_, this, FDEpollControl_ | additionalFlags);
+ }
+
+ bool TryRegister()
+ {
+ return Poller_->TryRegister(MakeStrong(this));
+ }
+
+private:
+ const EPollControl FDEpollControl_;
+ const EPollControl ReadEpollControl_;
+ const EPollControl WriteEpollControl_;
+ const TNetworkAddress LocalAddress_;
+ const TNetworkAddress RemoteAddress_;
+ const IPollerPtr Poller_;
+
+ // COMPAT(pogorelov)
+ // If set to true via ctor argument
+ // |useDeliveryFence| will use
+ // DeliverFencedWriteOperations
+ // instead of WriteOperations,
+ // which future is set only
+ // after data from pipe has been read.
+ const bool UseDeliveryFence_ = false;
+ const std::string PipePath_;
+
DECLARE_NEW_FRIEND()
class TSynchronousIOGuard
@@ -905,7 +1222,6 @@ private:
TIODirection ReadDirection_{EDirection::Read};
TIODirection WriteDirection_{EDirection::Write};
bool ShutdownRequested_ = false;
- int SynchronousIOCount_ = 0;
TError WriteError_;
TError ReadError_;
const TPromise<void> ShutdownPromise_ = NewPromise<void>();
@@ -918,36 +1234,21 @@ private:
TDelayedExecutorCookie ReadTimeoutCookie_;
TDelayedExecutorCookie WriteTimeoutCookie_;
- static std::string MakeLoggingTag(TConnectionId id, const TString& endpoint)
+ static std::string MakeLoggingTag(TConnectionId id, const std::string& endpoint)
{
- return Format("ConnectionId: %v, Endpoint: %v",
+ return Format(
+ "ConnectionId: %v, Endpoint: %v",
id,
endpoint);
}
- TError AnnotateError(const TError& error) const
+ TError AnnotateError(TError error) const
{
- return error
+ return std::move(error)
<< TErrorAttribute("connection_id", Id_)
<< TErrorAttribute("connection_endpoint", Endpoint_);
}
- TFuture<void> DoWrite(const TSharedRef& data)
- {
- auto write = std::make_unique<TWriteOperation>(data);
- auto future = write->ToFuture();
- StartIO(&WriteDirection_, std::move(write));
- return future;
- }
-
- TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data)
- {
- auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_);
- auto future = syncWrite->ToFuture();
- StartIO(&WriteDirection_, std::move(syncWrite));
- return future;
- }
-
void Init()
{
YT_LOG_DEBUG("Connection created");
@@ -955,7 +1256,7 @@ private:
AbortFromReadTimeout_ = BIND(&TFDConnectionImpl::AbortFromReadTimeout, MakeWeak(this));
AbortFromWriteTimeout_ = BIND(&TFDConnectionImpl::AbortFromWriteTimeout, MakeWeak(this));
- if (!Poller_->TryRegister(this)) {
+ if (!TryRegister()) {
ReadError_ = WriteError_ = AnnotateError(TError("Cannot register connection pollable"));
return;
}
@@ -963,12 +1264,6 @@ private:
Arm();
}
- void Arm(EPollControl additionalFlags = {})
- {
- auto control = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered | EPollControl::ReadHup;
- Poller_->Arm(FD_, this, control | additionalFlags);
- }
-
TError GetCurrentError(EDirection direction)
{
switch (direction) {
@@ -984,7 +1279,7 @@ private:
}
}
- void StartIO(TIODirection* direction, std::unique_ptr<IIOOperation> operation)
+ void StartIO(TNonNullPtr<TIODirection> direction, std::unique_ptr<IIOOperation> operation, bool forcefullyConsiderPending = false)
{
TError error;
bool needRetry = false;
@@ -1001,6 +1296,9 @@ private:
YT_VERIFY(!direction->Running);
direction->Operation = std::move(operation);
direction->StartBusyTimer();
+
+ direction->Pending |= forcefullyConsiderPending;
+
// Start operation only if this direction already has pending
// event otherwise reading from FIFO before opening by writer
// will return EOF immediately.
@@ -1009,7 +1307,7 @@ private:
}
if (!error.IsOK()) {
- operation->Abort(error);
+ operation->Abort(std::move(error));
return;
}
@@ -1018,7 +1316,7 @@ private:
}
}
- void DoIO(TIODirection* direction, bool event)
+ void DoIO(TNonNullPtr<TIODirection> direction, bool event)
{
{
auto guard = Guard(Lock_);
@@ -1045,7 +1343,7 @@ private:
if (result.IsOK()) {
direction->BytesTransferred += result.Value().ByteCount;
} else {
- result = AnnotateError(result);
+ result = AnnotateError(std::move(result));
}
bool needUnregister = false;
@@ -1102,7 +1400,7 @@ private:
}
if (!result.IsOK()) {
- operation->Abort(result);
+ operation->Abort(std::move(result));
} else if (!result.Value().Retry) {
operation->SetResult();
} else if (needRetry) {
@@ -1114,12 +1412,13 @@ private:
}
}
- TFuture<void> AbortIO(const TError& error)
+ TFuture<void> AbortIO(TError error)
{
+ auto annotatedError = AnnotateError(std::move(error));
+
auto guard = Guard(Lock_);
// In case of read errors we have called Unarm and Unregister already.
bool needUnarmAndUnregister = ReadError_.IsOK();
- auto annotatedError = AnnotateError(error);
if (WriteError_.IsOK()) {
WriteError_ = annotatedError;
}
@@ -1154,184 +1453,359 @@ private:
DEFINE_REFCOUNTED_TYPE(TFDConnectionImpl)
-////////////////////////////////////////////////////////////////////////////////
-
-// The sole purpose of this class is to call Abort on Impl in dtor.
-class TFDConnection
- : public IConnection
+#ifdef _linux_
+class TDeliveryFencedWriteConnectionImpl
+ : public TFDConnectionImpl
{
public:
- TFDConnection(
- TFileDescriptor fd,
- TString pipePath,
+ static TDeliveryFencedWriteConnectionImplPtr Create(
IPollerPtr poller,
- TRefCountedPtr pipeHolder = nullptr,
- bool useDeliveryFence = false)
- : Impl_(TFDConnectionImpl::Create(fd, std::move(pipePath), std::move(poller), useDeliveryFence))
- , PipeHolder_(std::move(pipeHolder))
- { }
-
- TFDConnection(
- TFileDescriptor fd,
- const TNetworkAddress& localAddress,
- const TNetworkAddress& remoteAddress,
- IPollerPtr poller)
- : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, std::move(poller)))
- { }
-
- ~TFDConnection()
+ std::string pipePath,
+ std::optional<int> capacity)
{
- YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned")));
- }
+ TFileDescriptorGuard signalFD = CreateSignalFD();
+ TDeliveryFencedWriteConnectionImplPtr impl;
+ impl = New<TDeliveryFencedWriteConnectionImpl>(std::move(poller), std::move(pipePath), signalFD.Get(), capacity);
+ impl->Init();
- TConnectionId GetId() const override
- {
- return Impl_->GetId();
+ signalFD.Release();
+ return impl;
}
- const TNetworkAddress& GetLocalAddress() const override
+ TFuture<void> Write(const TSharedRef& data)
{
- return Impl_->GetLocalAddress();
- }
+ auto writeOperation = std::make_unique<TDeliveryFencedWriteOperation>(data, WriteFD_, ReadFD_);
- const TNetworkAddress& GetRemoteAddress() const override
- {
- return Impl_->GetRemoteAddress();
- }
+ auto future = writeOperation->ToFuture();
- int GetHandle() const override
- {
- return Impl_->GetHandle();
- }
+ DoWrite(std::move(writeOperation), /*forcefullyConsiderPending*/ true);
- TFuture<size_t> Read(const TSharedMutableRef& data) override
- {
- return Impl_->Read(data);
+ return future;
}
- TFuture<void> Write(const TSharedRef& data) override
- {
- return Impl_->Write(data);
- }
+private:
+ const std::string PipePath_;
+ const std::optional<int> PipeCapacity_;
- TFuture<void> WriteV(const TSharedRefArray& data) override
- {
- return Impl_->WriteV(data);
- }
+ TFileDescriptor WriteFD_ = -1;
+ TFileDescriptor ReadFD_ = -1;
- TFuture<void> Close() override
+ TDeliveryFencedWriteConnectionImpl(
+ IPollerPtr poller,
+ std::string pipePath,
+ TFileDescriptor signalFD,
+ std::optional<int> capacity)
+ : TFDConnectionImpl(
+ signalFD,
+ /*FDEpollControl*/ EPollControl::Read | EPollControl::EdgeTriggered,
+ /*readEpollControll*/ EPollControl::None,
+ // Yes, we read to write :)
+ /*writeEpollControl*/ EPollControl::Read,
+ std::move(poller),
+ pipePath,
+ // NB(pogorelov): DeliveryFence is old compat logic, so we turn it off here.
+ /*useDeliveryFence*/ false)
+ , PipePath_(std::move(pipePath))
+ , PipeCapacity_(capacity)
{
- return Impl_->Close();
+ YT_LOG_DEBUG("Delivery fenced connection created");
}
- bool IsIdle() const override
+ ~TDeliveryFencedWriteConnectionImpl()
{
- return Impl_->IsIdle();
+ YT_LOG_DEBUG("Delivery fenced connection destroyed");
}
- bool IsReusable() const override
+ DECLARE_NEW_FRIEND();
+
+ static TFileDescriptor CreateSignalFD()
{
- return Impl_->IsReusable();
+ sigset_t mask;
+ if (sigemptyset(&mask) == -1) {
+ ThrowError("empty sig set");
+ }
+ if (sigaddset(&mask, DeliveryFencedWriteSignal) == -1) {
+ ThrowError(Format("add %v RT signal to sig set", DeliveryFencedWriteSignal - SIGRTMIN));
+ }
+
+ auto fd = HandleEintr(::signalfd, -1, &mask, SFD_NONBLOCK | SFD_CLOEXEC);
+ if (fd == -1) {
+ ThrowError("open signalfd");
+ }
+
+ return fd;
}
- TFuture<void> Abort() override
+ void Init()
{
- return Impl_->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted"));
+ TFileDescriptorGuard readFdGuard = HandleEintr(::open, PipePath_.data(), O_RDONLY | O_CLOEXEC | O_NONBLOCK);
+ if (readFdGuard.Get() == -1) {
+ ThrowError("open pipe for reading");
+ }
+
+ TFileDescriptorGuard writeFdGuard = HandleEintr(::open, PipePath_.data(), O_WRONLY | O_CLOEXEC);
+ if (writeFdGuard.Get() == -1) {
+ ThrowError("open pipe for writing");
+ }
+
+ auto flags = fcntl(writeFdGuard.Get(), F_GETFL);
+ if (flags == -1) {
+ ThrowError("get pipe writing fd flags");
+ }
+ if (fcntl(writeFdGuard.Get(), F_SETFL, flags | O_NONBLOCK | FASYNC) == -1) {
+ ThrowError("set pipe writing fd flags");
+ }
+ if (fcntl(writeFdGuard.Get(), F_SETOWN, getpid()) == -1) {
+ ThrowError("set pipe owner");
+ }
+ if (fcntl(writeFdGuard.Get(), F_SETSIG, DeliveryFencedWriteSignal) == -1) {
+ ThrowError("set custom pipe signal");
+ }
+ if (PipeCapacity_) {
+ SafeSetPipeCapacity(writeFdGuard.Get(), *PipeCapacity_);
+ }
+
+ if (!TryRegister()) {
+ ThrowError("register connection in poller");
+ }
+
+ try {
+ Arm();
+ } catch (const std::exception& ex) {
+ ThrowError("arm connection", ex);
+ } catch (...) {
+ ThrowError("arm connection");
+ }
+
+ YT_LOG_DEBUG("Delivery fenced connection initialized");
+
+ ReadFD_ = readFdGuard.Release();
+ WriteFD_ = writeFdGuard.Release();
}
- TFuture<void> CloseRead() override
+ void OnShutdown() final
{
- return Impl_->CloseRead();
+ TFDConnectionImpl::OnShutdown();
+
+ YT_VERIFY(SynchronousIOCount_ == 0);
+
+ YT_VERIFY(TryClose(WriteFD_, /*ignoreBadFD*/ false));
+ YT_VERIFY(TryClose(ReadFD_, /*ignoreBadFD*/ false));
}
- TFuture<void> CloseWrite() override
+ [[noreturn]] static void ThrowError(std::string_view action, TError innerError = TError())
{
- return Impl_->CloseWrite();
+ auto error = TError("Failed to %v for delivery fenced connection", action);
+ if (!innerError.IsOK()) {
+ innerError <<= std::move(innerError);
+ } else {
+ error <<= TError::FromSystem();
+ }
+ THROW_ERROR(std::move(error));
}
+};
- i64 GetReadByteCount() const override
+DEFINE_REFCOUNTED_TYPE(TDeliveryFencedWriteConnectionImpl)
+
+#endif // _linux_
+
+////////////////////////////////////////////////////////////////////////////////
+
+// TODO(pogorelov): Make separate clases for pipe and socket connections.
+// The sole purpose of this class is to call Abort on Impl in dtor.
+// Since object of TFDConnection is created, you should not care about fd.
+// But in case of exception you should close fd by yourself.
+class TFDConnection
+ : public TReadWriteConnectionBase<TFDConnection>
+{
+public:
+ TFDConnection(
+ TFileDescriptor fd,
+ IPollerPtr poller,
+ TRefCountedPtr pipeHolder,
+ std::string pipePath = "",
+ // COMPAT(pogorelov)
+ bool useDeliveryFence = false)
+ : Impl_(TFDConnectionImpl::Create(fd, std::move(poller), std::move(pipePath), useDeliveryFence))
+ , PipeHolder_(std::move(pipeHolder))
+ { }
+
+ TFDConnection(
+ TFileDescriptor fd,
+ const TNetworkAddress& localAddress,
+ const TNetworkAddress& remoteAddress,
+ IPollerPtr poller)
+ : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, std::move(poller)))
+ { }
+
+ TFDConnection(
+ TRefCountedPtr pipeHolder,
+ TFDConnectionImplPtr impl)
+ : Impl_(std::move(impl))
+ , PipeHolder_(std::move(pipeHolder))
+ { }
+
+ ~TFDConnection()
{
- return Impl_->GetReadByteCount();
+ YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned")));
}
-
- i64 GetWriteByteCount() const override
+ const TFDConnectionImplPtr& GetImpl() const
{
- return Impl_->GetWriteByteCount();
+ return Impl_;
}
- TConnectionStatistics GetReadStatistics() const override
+private:
+ const TFDConnectionImplPtr Impl_;
+ const TRefCountedPtr PipeHolder_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+#ifdef _linux_
+
+[[noreturn]] void ThrowUnimplemented(std::string_view method)
+{
+ THROW_ERROR_EXCEPTION("%v is not supported", method);
+}
+
+// NOTE(pogorelov): Using 2 TDeliveryFencedWriteConnection concurrently is not supported.
+// Desired behavior:
+// Write should complete only after all data has been written to the pipe and fully read by the reader.
+// Yes, this is not what pipes were originally designed for.
+// However, we now have a large amount of user code relying on our API (which currently only reads from stdin).
+// We want to be able to adjust the supplying data portion size without modifying the API, hence this solution.
+//
+// Implementation:
+// To achieve this behavior, we use:
+// - signalfd (for signal-based notifications)
+// - FASYNC mode (for asynchronous pipe I/O)
+// - FIONREAD (to check remaining data in the pipe)
+//
+// Note about FASYNC Behavior:
+// Starting from kernel 5.15, the writer receives a signal (SIGIO) on every read operation from the pipe.
+// In earlier kernel versions, this behavior has been changed several times, but it appears to be finalized in 5.15 and later:
+// https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit?id=fe67f4dd8daa252eb9aa7acb61555f3cc3c1ce4c
+//
+// Initialization Steps:
+// 1) Configure the pipe for signals:
+// - Set the process as the FIFO owner (F_SETOWN).
+// - Enable FASYNC on the write-side file descriptor (F_SETFL) to receive I/O event signals.
+// - Use a custom real-time signal (instead of SIGIO) to not occupy SIGIO.
+// - We use real-time signal as a reserve for the future
+// (when RT signal is used, kernel stores the queue with signal and its metadata such as fd it triggered by).
+// 2) Block the custom signal for the entire process (sigprocmask).
+// 3) Create a signalfd to read signals via a file descriptor.
+// 4) Arm epoll for the signal FD:
+// Monitor for EPOLLIN (even though this is the writer side—we’re waiting to read signals).
+//
+// Write Steps:
+// 1) Write data to the pipe:
+// Write until all data is sent or EWOULDBLOCK is encountered.
+// (Unlike common writer, we don’t immediately sleep on epoll.
+// Opening the pipe for reading wakes the writer, but FASYNC does not send SIGIO in this case.)
+// 2) Check pipe status with FIONREAD:
+// If the pipe is empty, writing is complete.
+// 3) Wait for signals via epoll.
+// 4) Process signals from signalfd:
+// Read all the signals and repeat from Step 1.
+class TDeliveryFencedWriteConnection
+ : public TWriteConnectionBase<TDeliveryFencedWriteConnection>
+{
+public:
+ TDeliveryFencedWriteConnection(
+ TRefCountedPtr pipeHolder,
+ IPollerPtr poller,
+ std::string pipePath,
+ std::optional<int> capacity)
+ : Impl_(TDeliveryFencedWriteConnectionImpl::Create(std::move(poller), std::move(pipePath), capacity))
+ , PipeHolder_(std::move(pipeHolder))
{
- return Impl_->GetReadStatistics();
+ YT_VERIFY(!HasActiveConnection.exchange(true));
}
- TConnectionStatistics GetWriteStatistics() const override
+ ~TDeliveryFencedWriteConnection()
{
- return Impl_->GetWriteStatistics();
+ YT_VERIFY(HasActiveConnection.exchange(false));
+ YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned")));
}
-
- void SetReadDeadline(std::optional<TInstant> deadline) override
+ TFuture<void> WriteV(const TSharedRefArray& /*data*/) final
{
- Impl_->SetReadDeadline(deadline);
+ ThrowUnimplemented("WriteV");
}
- void SetWriteDeadline(std::optional<TInstant> deadline) override
+ TFuture<void> CloseWrite() final
{
- Impl_->SetWriteDeadline(deadline);
+ ThrowUnimplemented("CloseWrite");;
}
- bool SetNoDelay() override
+ void SetWriteDeadline(std::optional<TInstant> /*deadline*/)
{
- return Impl_->SetNoDelay();
+ ThrowUnimplemented("SetWriteDeadline");
}
- bool SetKeepAlive() override
+ TFuture<void> Write(const TSharedRef& data) final
{
- return Impl_->SetKeepAlive();
+ return static_cast<TDeliveryFencedWriteConnectionImpl&>(*Impl_).Write(data);
}
- void SubscribePeerDisconnect(TCallback<void()> cb) override
+ const TDeliveryFencedWriteConnectionImplPtr& GetImpl() const
{
- return Impl_->SubscribePeerDisconnect(std::move(cb));
+ return Impl_;
}
private:
- const TFDConnectionImplPtr Impl_;
+ const TDeliveryFencedWriteConnectionImplPtr Impl_;
const TRefCountedPtr PipeHolder_;
+
+ static inline std::atomic<bool> HasActiveConnection = false;
};
+#endif // _linux_
+
////////////////////////////////////////////////////////////////////////////////
namespace {
TFileDescriptor CreateWriteFDForConnection(
- const TString& pipePath,
+ const std::string& pipePath,
std::optional<int> capacity,
+ // COMPAT(pogorelov)
bool useDeliveryFence)
{
#ifdef _unix_
int flags = O_WRONLY | O_CLOEXEC;
- int fd = HandleEintr(::open, pipePath.c_str(), flags);
- if (fd == -1) {
- THROW_ERROR_EXCEPTION("Failed to open named pipe")
- << TError::FromSystem()
+ TFileDescriptorGuard fd = HandleEintr(::open, pipePath.c_str(), flags);
+ if (fd.Get() == -1) {
+ THROW_ERROR_EXCEPTION(MakeSystemError("Failed to open named pipe"))
<< TErrorAttribute("path", pipePath);
}
try {
if (capacity) {
- SafeSetPipeCapacity(fd, *capacity);
+ SafeSetPipeCapacity(fd.Get(), *capacity);
}
if (useDeliveryFence) {
- SafeEnableEmptyPipeEpollEvent(fd);
+ SafeEnableEmptyPipeEpollEvent(fd.Get());
}
- SafeMakeNonblocking(fd);
+ SafeMakeNonblocking(fd.Get());
+ } catch (const std::exception& ex) {
+ YT_LOG_WARNING(
+ TError(ex) << TError::FromSystem(),
+ "Failed to open pipe for writing (UseDeliveryFence: %v, Capacity: %v)",
+ useDeliveryFence,
+ capacity);
+ throw;
} catch (...) {
- SafeClose(fd, false);
+ YT_LOG_WARNING(
+ "Failed to open pipe for writing (MaybeRelevantError: %v, UseDeliveryFence: %v, Capacity: %v)",
+ TError::FromSystem(),
+ useDeliveryFence,
+ capacity);
throw;
}
- return fd;
+ return fd.Release();
#else
THROW_ERROR_EXCEPTION("Unsupported platform");
#endif
@@ -1353,31 +1827,27 @@ std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(IPollerPtr poller
#endif
if (HandleEintr(::socketpair, AF_LOCAL, flags, 0, fds) == -1) {
- THROW_ERROR_EXCEPTION("Failed to create socket pair")
- << TError::FromSystem();
+ THROW_ERROR_EXCEPTION(MakeSystemError("Failed to create socket pair"));
}
#else
if (SocketPair(fds, /*overlapped*/ false, /*cloexec*/ true) == SOCKET_ERROR) {
- THROW_ERROR_EXCEPTION("Failed to create socket pair")
- << TError::FromSystem();
+ THROW_ERROR_EXCEPTION(MakeSystemError("Failed to create socket pair"));
}
SetNonBlock(fds[0]);
SetNonBlock(fds[1]);
#endif
+ TFileDescriptorGuard fd0(fds[0]);
+ TFileDescriptorGuard fd1(fds[1]);
- try {
- auto address0 = GetSocketName(fds[0]);
- auto address1 = GetSocketName(fds[1]);
+ auto address0 = GetSocketName(fds[0]);
+ auto address1 = GetSocketName(fds[1]);
- auto first = New<TFDConnection>(fds[0], address0, address1, poller);
- auto second = New<TFDConnection>(fds[1], address1, address0, std::move(poller));
- return std::pair(std::move(first), std::move(second));
- } catch (...) {
- YT_VERIFY(TryClose(fds[0], false));
- YT_VERIFY(TryClose(fds[1], false));
- throw;
- }
+ auto first = New<TFDConnection>(fds[0], address0, address1, poller);
+ fd0.Release();
+ auto second = New<TFDConnection>(fds[1], address1, address0, std::move(poller));
+ fd1.Release();
+ return std::pair(std::move(first), std::move(second));
}
IConnectionPtr CreateConnectionFromFD(
@@ -1391,46 +1861,65 @@ IConnectionPtr CreateConnectionFromFD(
IConnectionReaderPtr CreateInputConnectionFromFD(
TFileDescriptor fd,
- TString pipePath,
+ const std::string& /*pipePath*/,
IPollerPtr poller,
const TRefCountedPtr& pipeHolder)
{
- return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder);
+ return New<TFDConnection>(fd, std::move(poller), pipeHolder);
}
IConnectionReaderPtr CreateInputConnectionFromPath(
- TString pipePath,
+ std::string pipePath,
IPollerPtr poller,
- const TRefCountedPtr& pipeHolder)
+ TRefCountedPtr pipeHolder)
{
#ifdef _unix_
int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK;
- int fd = HandleEintr(::open, pipePath.c_str(), flags);
- if (fd == -1) {
- THROW_ERROR_EXCEPTION("Failed to open named pipe")
- << TError::FromSystem()
+ TFileDescriptorGuard fd = HandleEintr(::open, pipePath.c_str(), flags);
+ if (fd.Get() == -1) {
+ THROW_ERROR_EXCEPTION(MakeSystemError("Failed to open named pipe"))
<< TErrorAttribute("path", pipePath);
}
- return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder);
+ auto connection = New<TFDConnection>(fd.Get(), std::move(poller), std::move(pipeHolder), std::move(pipePath));
+ fd.Release();
+ return connection;
#else
THROW_ERROR_EXCEPTION("Unsupported platform");
#endif
}
IConnectionWriterPtr CreateOutputConnectionFromPath(
- TString pipePath,
+ std::string pipePath,
IPollerPtr poller,
- const TRefCountedPtr& pipeHolder,
+ TRefCountedPtr pipeHolder,
std::optional<int> capacity,
- bool useDeliveryFence)
+ EDeliveryFencedMode deliveryFencedMode)
{
- return New<TFDConnection>(
- CreateWriteFDForConnection(pipePath, capacity, useDeliveryFence),
- std::move(pipePath),
+ if (deliveryFencedMode == EDeliveryFencedMode::New) {
+ #ifdef _linux_
+ return New<TDeliveryFencedWriteConnection>(
+ std::move(pipeHolder),
+ std::move(poller),
+ std::move(pipePath),
+ capacity);
+ #else // _linux_
+ THROW_ERROR_EXCEPTION("Delivery fenced write is not supported on this platform");
+ #endif // _linux_
+ }
+
+ bool useDeliveryFence = deliveryFencedMode == EDeliveryFencedMode::Old;
+
+ TFileDescriptorGuard fd = CreateWriteFDForConnection(pipePath, capacity, useDeliveryFence);
+ auto connection = New<TFDConnection>(
+ fd.Get(),
std::move(poller),
- pipeHolder,
+ std::move(pipeHolder),
+ std::move(pipePath),
useDeliveryFence);
+ fd.Release();
+
+ return connection;
}
////////////////////////////////////////////////////////////////////////////////
@@ -1475,16 +1964,18 @@ IPacketConnectionPtr CreatePacketConnection(
const TNetworkAddress& at,
NConcurrency::IPollerPtr poller)
{
- auto fd = CreateUdpSocket();
+ TFileDescriptorGuard fd = CreateUdpSocket();
try {
- SetReuseAddrFlag(fd);
- BindSocket(fd, at);
+ SetReuseAddrFlag(fd.Get());
+ BindSocket(fd.Get(), at);
} catch (...) {
- SafeClose(fd, false);
+ SafeClose(fd.Get(), false);
throw;
}
- return New<TPacketConnection>(fd, at, std::move(poller));
+ auto connection = New<TPacketConnection>(fd.Get(), at, std::move(poller));
+ fd.Release();
+ return connection;
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h
index 3c9998e1d0c..4f0c3e4c238 100644
--- a/yt/yt/core/net/connection.h
+++ b/yt/yt/core/net/connection.h
@@ -65,8 +65,8 @@ DEFINE_REFCOUNTED_TYPE(IConnectionWriter)
////////////////////////////////////////////////////////////////////////////////
struct IConnection
- : public IConnectionReader
- , public IConnectionWriter
+ : public virtual IConnectionReader
+ , public virtual IConnectionWriter
{
virtual TConnectionId GetId() const = 0;
@@ -104,21 +104,21 @@ IConnectionPtr CreateConnectionFromFD(
IConnectionReaderPtr CreateInputConnectionFromFD(
TFileDescriptor fd,
- TString pipePath,
+ const std::string& pipePath,
NConcurrency::IPollerPtr poller,
const TRefCountedPtr& pipeHolder);
IConnectionReaderPtr CreateInputConnectionFromPath(
- TString pipePath,
+ std::string pipePath,
NConcurrency::IPollerPtr poller,
- const TRefCountedPtr& pipeHolder);
+ TRefCountedPtr pipeHolder);
IConnectionWriterPtr CreateOutputConnectionFromPath(
- TString pipePath,
+ std::string pipePath,
NConcurrency::IPollerPtr poller,
- const TRefCountedPtr& pipeHolder,
+ TRefCountedPtr pipeHolder,
std::optional<int> capacity = {},
- bool useDeliveryFence = false);
+ EDeliveryFencedMode deliveryFencedMode = EDeliveryFencedMode::None);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h
index 725dc0c5170..bbca1eccfb4 100644
--- a/yt/yt/core/net/public.h
+++ b/yt/yt/core/net/public.h
@@ -8,6 +8,10 @@
#include <library/cpp/yt/misc/guid.h>
+#ifdef _linux_
+ #include <sys/signalfd.h>
+#endif
+
namespace NYT::NNet {
////////////////////////////////////////////////////////////////////////////////
@@ -18,6 +22,18 @@ class TIP6Network;
using TConnectionId = TGuid;
+enum class EDeliveryFencedMode
+{
+ None,
+ // COMPAT(pogorelov)
+ Old,
+ New,
+};
+
+#ifdef _linux_
+static inline const auto DeliveryFencedWriteSignal = SIGRTMIN;
+#endif // _linux
+
DECLARE_REFCOUNTED_STRUCT(IConnection)
DECLARE_REFCOUNTED_STRUCT(IPacketConnection)
DECLARE_REFCOUNTED_STRUCT(IConnectionReader)