summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <[email protected]>2026-06-25 17:03:00 +0200
committerGitHub <[email protected]>2026-06-25 18:03:00 +0300
commit931f110afb00cbaf8e998daec79b1438985ca355 (patch)
treea75f16d3ac40c49512e722cca34790650e0c1daf
parent2335250e8a0c286aac3501f289e0643ab286b67a (diff)
IC with io_uring prototype, #44325 (#44424)
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp14
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/library/actors/interconnect/events/events.h9
-rw-r--r--ydb/library/actors/interconnect/interconnect_common.h2
-rw-r--r--ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp266
-rw-r--r--ydb/library/actors/interconnect/interconnect_tcp_session.cpp331
-rw-r--r--ydb/library/actors/interconnect/interconnect_tcp_session.h69
-rw-r--r--ydb/library/actors/interconnect/poller/uring_poller_actor.cpp726
-rw-r--r--ydb/library/actors/interconnect/poller/uring_poller_actor.h106
-rw-r--r--ydb/library/actors/interconnect/poller/ya.make5
-rw-r--r--ydb/library/actors/interconnect/uring_context.cpp238
-rw-r--r--ydb/library/actors/interconnect/uring_context.h178
-rw-r--r--ydb/library/actors/interconnect/uring_recv_buffer_pool.h174
-rw-r--r--ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h16
-rw-r--r--ydb/library/actors/interconnect/ut/lib/node.h5
-rw-r--r--ydb/library/actors/interconnect/ut/uring_ut.cpp499
-rw-r--r--ydb/library/actors/interconnect/ut/ya.make1
-rw-r--r--ydb/library/actors/interconnect/ya.make11
-rw-r--r--ydb/tools/stress_tool/device_test_tool.cpp13
-rw-r--r--ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h22
20 files changed, 2663 insertions, 24 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index fe668e13615..4a0bf6cb8f1 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -259,6 +259,7 @@
#include <ydb/library/actors/interconnect/load.h>
#include <ydb/library/actors/interconnect/poller/poller_actor.h>
#include <ydb/library/actors/interconnect/poller/poller_tcp.h>
+#include <ydb/library/actors/interconnect/poller/uring_poller_actor.h>
#include <ydb/library/actors/interconnect/rdma/cq_actor/cq_actor.h>
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
#include <ydb/core/retro_tracing_impl/distributed_collector/distributed_retro_collector.h>
@@ -586,6 +587,14 @@ static TInterconnectSettings GetInterconnectSettings(const NKikimrConfig::TInter
result.CollectSubscriptionStackTrace = config.GetCollectSubscriptionStackTrace();
}
+ if (config.HasUseUring()) {
+ result.UseUring = config.GetUseUring();
+ }
+
+ if (config.HasEnableUringSQPOLL()) {
+ result.EnableUringSQPOLL = config.GetEnableUringSQPOLL();
+ }
+
return result;
}
@@ -702,6 +711,11 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(
CreatePollerActor(schedulerConfig.MonCounters), TMailboxType::ReadAsFilled, systemPoolId));
+ if (settings.UseUring && TUringContext::IsSupported()) {
+ setup->LocalServices.emplace_back(MakeUringPollerActorId(), TActorSetupCmd(
+ CreateUringPollerActor(settings.EnableUringSQPOLL), TMailboxType::ReadAsFilled, systemPoolId));
+ }
+
auto destructorQueueSize = std::make_shared<std::atomic<TAtomicBase>>(0);
TIntrusivePtr<TInterconnectProxyCommon> icCommon;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 82520ca7bbd..237597138a6 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -528,6 +528,8 @@ message TInterconnectConfig {
optional bool CollectSubscriptionStackTrace = 59 [default = false];
optional uint32 RdmaPayloadCopySizeThreshold = 60 [default = 65536]; // 0 disables copying RDMA payload to regular memory
optional uint32 MaxRdmaRetryBackoffLevel = 61 [default = 8]; // 5s * 2^8 = 1280s, about 21 minutes by default
+ optional bool UseUring = 62 [default = false];
+ optional bool EnableUringSQPOLL = 63 [default = false]; // only effective when UseUring is set
// ballast is added to IC handshake frames to ensure correctness of jumbo frames transmission over network
optional uint32 HandshakeBallastSize = 14;
diff --git a/ydb/library/actors/interconnect/events/events.h b/ydb/library/actors/interconnect/events/events.h
index 8063a43da7f..285edbe0dd5 100644
--- a/ydb/library/actors/interconnect/events/events.h
+++ b/ydb/library/actors/interconnect/events/events.h
@@ -56,6 +56,15 @@ namespace NActors {
EvSubscribeForConnection,
EvReportConnection,
+ // io_uring transport events
+ EvUringRegister,
+ EvUringRegisterResult,
+ EvUringWriteComplete,
+ EvUringRecvComplete,
+ EvUringSendZcNotif,
+ EvUringUnregister,
+ EvUringRegisterFailed,
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/library/actors/interconnect/interconnect_common.h b/ydb/library/actors/interconnect/interconnect_common.h
index a5ed548f890..4d11de471ba 100644
--- a/ydb/library/actors/interconnect/interconnect_common.h
+++ b/ydb/library/actors/interconnect/interconnect_common.h
@@ -81,6 +81,8 @@ namespace NActors {
// 5s * 2^8 = 1280s, about 21 minutes with the current RDMA retry base delay.
ui32 MaxRdmaRetryBackoffLevel = 8;
bool CollectSubscriptionStackTrace = false;
+ bool UseUring = false;
+ bool EnableUringSQPOLL = false; // only effective when UseUring is set
};
struct TWhiteboardSessionStatus {
diff --git a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp
index 0e8c94cf3b9..b9121ab5cb5 100644
--- a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -5,6 +5,10 @@
#include <ydb/library/actors/core/probes.h>
#include <ydb/library/actors/util/datetime.h>
+#ifdef __linux__
+#include <liburing.h>
+#endif
+
#include <variant>
namespace NActors {
@@ -334,6 +338,221 @@ namespace NActors {
ReceiveData();
}
+ void TInputSessionTCP::Handle(TEvUringRegisterResult::TPtr& ev) {
+ auto* msg = ev->Get();
+ UringContext = std::move(msg->Context);
+ MainRecvBufGroupId = msg->MainRecvBufGroupId;
+ XdcRecvBufGroupId = msg->XdcRecvBufGroupId;
+ StartRecvUring();
+ }
+
+ void TInputSessionTCP::StartRecvUring() {
+ if (!UringContext) {
+ return;
+ }
+ if (Socket && !MainRecvMultishotActive) {
+ if (UringContext->SubmitRecvMultishot((int)*Socket, MainRecvBufGroupId, EUringOpTag::MainRecv)) {
+ UringContext->IncrementPendingRecvs();
+ MainRecvMultishotActive = true;
+ }
+ }
+ UringContext->Flush();
+ }
+
+ void TInputSessionTCP::Handle(TEvUringRecvComplete::TPtr& ev) {
+ auto* msg = ev->Get();
+ EUringOpTag tag = static_cast<EUringOpTag>(msg->UserData & UringOpTagMask);
+
+ if (tag == EUringOpTag::XdcRecv) {
+ // Async XDC readv completion (Caveat 3).
+ UringXdcReadInFlight = false;
+ if (msg->Result > 0) {
+ if (UringXdcReadIsCatch) {
+ ProcessXdcCatchBytesUring(msg->Result);
+ } else {
+ ProcessXdcBytesUring(msg->Result);
+ }
+ LastReceiveTimestamp = TActivationContext::Monotonic();
+ ReceiveData();
+ } else if (msg->Result == 0) {
+ throw TExReestablishConnection{TDisconnectReason::EndOfStream()};
+ } else {
+ int err = -msg->Result;
+ if (err != ECANCELED) {
+ throw TExReestablishConnection{TDisconnectReason::FromErrno(err)};
+ }
+ }
+ return;
+ }
+
+ // Main socket multishot recv completion.
+ if (msg->Result > 0) {
+ const size_t bytesRead = msg->Result;
+ BytesReadFromSocket += bytesRead;
+ ++UringMainRecvCompletions;
+ UringMainRecvBytes += bytesRead;
+ Metrics->AddInputChannelsIncomingTraffic(0, bytesRead);
+ IncomingData.Insert(IncomingData.End(), std::move(msg->Data));
+
+ if (!(msg->Flags & IORING_CQE_F_MORE)) {
+ MainRecvMultishotActive = false;
+ StartRecvUring();
+ }
+
+ LastReceiveTimestamp = TActivationContext::Monotonic();
+ ReceiveData();
+ } else if (msg->Result == 0) {
+ // EOF
+ MainRecvMultishotActive = false;
+ throw TExReestablishConnection{TDisconnectReason::EndOfStream()};
+ } else {
+ int err = -msg->Result;
+ MainRecvMultishotActive = false;
+ if (err == ENOBUFS) {
+ // Provided-buffer ring is momentarily exhausted. The terminal CQE has already
+ // woken the reaper, which recycles released buffers (DrainFreelist). Schedule a
+ // deferred re-arm so we retry once buffers are available rather than busy-looping.
+ TActivationContext::Schedule(TDuration::MicroSeconds(250),
+ new IEventHandle(EvResumeReceiveData, 0, SelfId(), {}, nullptr, 0));
+ } else if (err != ECANCELED) {
+ throw TExReestablishConnection{TDisconnectReason::FromErrno(err)};
+ }
+ }
+ }
+
+ void TInputSessionTCP::DriveXdcUring() {
+ if (!UringContext || !XdcSocket) {
+ return;
+ }
+ // Apply the catch stream as soon as it is fully read (idempotent), then keep a single
+ // XDC readv in flight while there is work to do.
+ ApplyXdcCatchStream();
+ SubmitXdcRecvUring();
+ }
+
+ bool TInputSessionTCP::SubmitXdcRecvUring() {
+ if (!UringContext || !XdcSocket || UringXdcReadInFlight) {
+ return false;
+ }
+
+ // Catch stream first: read into a throwaway buffer that is later scattered per channel.
+ if (XdcCatchStream.BytesPending) {
+ if (!XdcCatchStream.Buffer) {
+ XdcCatchStream.Buffer = TRcBuf::Uninitialized(64 * 1024);
+ }
+ const size_t numBytesToRead = Min<size_t>(XdcCatchStream.BytesPending, XdcCatchStream.Buffer.size());
+ struct iovec iov{XdcCatchStream.Buffer.GetDataMut(), numBytesToRead};
+ if (UringContext->SubmitReadv((int)*XdcSocket, &iov, 1, ++UringXdcReadSeqNo, EUringOpTag::XdcRecv)) {
+ UringXdcReadInFlight = true;
+ UringXdcReadIsCatch = true;
+ UringContext->Flush();
+ return true;
+ }
+ return false;
+ }
+
+ // Normal XDC stream: scatter read directly into the destination spans (zero-copy).
+ if (!XdcCatchStream.Applied || XdcInputQ.empty()) {
+ return false;
+ }
+
+ TStackVec<struct iovec, 64> buffs;
+ size_t size = 0;
+ for (auto& [channel, span] : XdcInputQ) {
+ buffs.push_back(iovec{span.data(), span.size()});
+ size += span.size();
+ if (buffs.size() == 64 || size >= 1024 * 1024) {
+ break;
+ }
+ }
+
+ if (UringContext->SubmitReadv((int)*XdcSocket, buffs.data(), buffs.size(), ++UringXdcReadSeqNo, EUringOpTag::XdcRecv)) {
+ UringXdcReadInFlight = true;
+ UringXdcReadIsCatch = false;
+ UringContext->Flush();
+ return true;
+ }
+ return false;
+ }
+
+ void TInputSessionTCP::ProcessXdcCatchBytesUring(ssize_t recvres) {
+ HandleXdcChecksum({XdcCatchStream.Buffer.data(), static_cast<size_t>(recvres)});
+
+ XdcCatchStream.BytesPending -= recvres;
+ XdcCatchStream.BytesProcessed += recvres;
+ BytesReadFromXdcSocket += recvres;
+
+ // scatter read data into per-channel catch buffers
+ const char *in = XdcCatchStream.Buffer.data();
+ while (recvres) {
+ Y_DEBUG_ABORT_UNLESS(!XdcCatchStream.Markup.empty());
+ auto& [channel, apply, bytes] = XdcCatchStream.Markup.front();
+ size_t bytesInChannel = Min<size_t>(recvres, bytes);
+ bytes -= bytesInChannel;
+ recvres -= bytesInChannel;
+
+ if (apply) {
+ auto& context = GetPerChannelContext(channel);
+ while (bytesInChannel) {
+ const size_t offset = context.XdcCatchBytesRead % context.XdcCatchBuffer.size();
+ TMutableContiguousSpan out = context.XdcCatchBuffer.GetContiguousSpanMut().SubSpan(offset, bytesInChannel);
+ memcpy(out.data(), in, out.size());
+ context.XdcCatchBytesRead += out.size();
+ in += out.size();
+ bytesInChannel -= out.size();
+ }
+ } else {
+ in += bytesInChannel;
+ }
+
+ if (!bytes) {
+ XdcCatchStream.Markup.pop_front();
+ }
+ }
+
+ ApplyXdcCatchStream();
+ }
+
+ void TInputSessionTCP::ProcessXdcBytesUring(ssize_t recvres) {
+ // calculate stream checksums over the destination spans that were just filled
+ {
+ size_t bytesToChecksum = recvres;
+ for (auto& [channel, span] : XdcInputQ) {
+ const size_t n = Min<size_t>(bytesToChecksum, span.size());
+ HandleXdcChecksum({span.data(), n});
+ bytesToChecksum -= n;
+ if (!bytesToChecksum) {
+ break;
+ }
+ }
+ }
+
+ Metrics->AddTotalBytesRead(recvres);
+ BytesReadFromXdcSocket += recvres;
+
+ // cut the XdcInputQ deque
+ for (size_t bytesToCut = recvres; bytesToCut; ) {
+ Y_ABORT_UNLESS(!XdcInputQ.empty());
+ auto& [channel, span] = XdcInputQ.front();
+ size_t n = Min(bytesToCut, span.size());
+ bytesToCut -= n;
+ if (n == span.size()) {
+ XdcInputQ.pop_front();
+ } else {
+ span = span.SubSpan(n, Max<size_t>());
+ Y_ABORT_UNLESS(!bytesToCut);
+ }
+
+ Y_DEBUG_ABORT_UNLESS(n);
+ auto& context = GetPerChannelContext(channel);
+ context.DropFront(nullptr, n);
+ ProcessEvents(context);
+ }
+
+ // drop fully processed inbound packets
+ ProcessInboundPacketQ(recvres, 0);
+ }
+
void TInputSessionTCP::Handle(NInterconnect::NRdma::TEvRdmaReadDone::TPtr& ev) {
if (!ev->Get()->Event->IsSuccess()) {
LOG_ERROR_IC_SESSION("ICRDMA", "Rdma IO failed, err source: %s, code %d",
@@ -390,15 +609,29 @@ namespace NActors {
}
// try to read more data into buffers
- progress |= ReadMore();
- progress |= ReadXdc(&numDataBytes);
+ if (!UringContext) {
+ progress |= ReadMore();
+ progress |= ReadXdc(&numDataBytes);
+ }
if (!progress) { // no progress was made during this iteration
- PreallocateBuffers();
+ if (!UringContext) {
+ PreallocateBuffers();
+ }
break;
}
}
+ if (UringContext) {
+ // Main recv multishot may have stopped (e.g. ENOBUFS or end of a multishot run);
+ // re-arm it now that we have drained the processable data.
+ if (Socket && !MainRecvMultishotActive) {
+ StartRecvUring();
+ }
+ // Submit/keep an XDC readv in flight if there is pending XDC target space.
+ DriveXdcUring();
+ }
+
if (enoughCpu) {
SetEnoughCpu(true);
StarvingInRow = 0;
@@ -1344,10 +1577,37 @@ namespace NActors {
if (now >= LastReceiveTimestamp + DeadPeerTimeout) {
ReceiveData();
if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) {
+ // Diagnostic snapshot: capture exactly why the input session believes the peer is
+ // dead. On the idle-keepalive failure this shows whether ANY recv completion ever
+ // arrived (peer truly silent / recv not armed) and whether the multishot is still
+ // active, plus the ring's submit accounting.
+ LOG_NOTICE_IC_SESSION("ICIS30", "DeadPeer snapshot uring# %d lastRecvAge# %.3fs"
+ " bytesRead# %" PRIu64 " mainRecvCompletions# %" PRIu64 " mainRecvBytes# %" PRIu64
+ " mainMsActive# %d pendingRecvs# %" PRIu32 " submitCalls# %" PRIu64
+ " submitErrors# %" PRIu64 " submitPartials# %" PRIu64 " lastSubmitRet# %d sqeFull# %" PRIu64,
+ (UringContext ? 1 : 0), (now - LastReceiveTimestamp).SecondsFloat(),
+ BytesReadFromSocket, UringMainRecvCompletions, UringMainRecvBytes,
+ (int)MainRecvMultishotActive,
+ (UringContext ? UringContext->GetPendingRecvs() : 0),
+ (UringContext ? UringContext->GetSubmitCalls() : 0),
+ (UringContext ? UringContext->GetSubmitErrors() : 0),
+ (UringContext ? UringContext->GetSubmitPartials() : 0),
+ (UringContext ? UringContext->GetLastSubmitRet() : 0),
+ (UringContext ? UringContext->GetSqeFull() : 0));
// nothing has changed, terminate session
throw TExDestroySession{TDisconnectReason::DeadPeer()};
}
}
+ // Recv-side heartbeat (DEBUG): fires roughly once per DeadPeerTimeout on a healthy idle
+ // session. Paired with the output ICS42 send heartbeat to confirm keepalives are flowing.
+ if (UringContext) {
+ LOG_DEBUG_IC_SESSION("ICIS31", "uring recv hb lastRecvAge# %.3fs bytesRead# %" PRIu64
+ " mainRecvCompletions# %" PRIu64 " mainRecvBytes# %" PRIu64 " mainMsActive# %d"
+ " pendingRecvs# %" PRIu32,
+ (now - LastReceiveTimestamp).SecondsFloat(), BytesReadFromSocket,
+ UringMainRecvCompletions, UringMainRecvBytes, (int)MainRecvMultishotActive,
+ UringContext->GetPendingRecvs());
+ }
Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer);
}
diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp
index 8ea2fe0232b..0f9fa0085ee 100644
--- a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp
+++ b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp
@@ -406,13 +406,21 @@ namespace NActors {
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), std::move(inputSessionParams), RdmaQp, std::move(cq));
ReceiverId = RegisterWithSameMailbox(actor.Release());
- // register our socket in poller actor
- LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
- const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
- Y_ABORT_UNLESS(success);
- if (XdcSocket) {
- const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId()));
+ // register our socket with the appropriate I/O backend
+ if (Proxy->Common->Settings.UseUring && !Params.Encryption && TUringContext::IsSupported()) {
+ LOG_DEBUG_IC_SESSION("ICS11", "registering socket with UringPollerActor");
+ // Both the main and XDC sockets are driven entirely by io_uring: the input session
+ // arms recv (main multishot, XDC readv) and the output session submits writes/send_zc.
+ // Neither socket is registered with the epoll TPollerActor anymore (Caveat 3).
+ Send(MakeUringPollerActorId(), new TEvUringRegister(Socket, XdcSocket, ReceiverId, SelfId()));
+ } else {
+ LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
+ const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
Y_ABORT_UNLESS(success);
+ if (XdcSocket) {
+ const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId()));
+ Y_ABORT_UNLESS(success);
+ }
}
LostConnectionWatchdog.Disarm();
@@ -553,7 +561,11 @@ namespace NActors {
return;
}
- WriteData();
+ if (UringContext) {
+ WriteDataUring();
+ } else {
+ WriteData();
+ }
if (!Socket) {
return;
}
@@ -564,8 +576,13 @@ namespace NActors {
canProducePackets = NumEventsInQueue && (InflightDataAmount + RdmaInflightDataAmount) < GetTotalInflightAmountOfData() &&
GetUnsentSize() < GetUnsentLimit();
- canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) ||
- (XdcStream && !ReceiveContext->XdcWriteBlocked);
+ if (UringContext) {
+ canWriteData = ((OutgoingStream || OutOfBandStream) && !UringMainWriteInFlight) ||
+ (XdcStream && !UringXdcWriteInFlight);
+ } else {
+ canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) ||
+ (XdcStream && !ReceiveContext->XdcWriteBlocked);
+ }
if (!canProducePackets && !canWriteData) {
SetEnoughCpu(true); // we do not starve
@@ -664,6 +681,31 @@ namespace NActors {
}
void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
+ if (UringContext) {
+ if (Socket) {
+ UringContext->SubmitCancelFd((int)*Socket);
+ }
+ if (XdcSocket) {
+ UringContext->SubmitCancelFd((int)*XdcSocket);
+ }
+ UringContext->Flush();
+ UringWritesInFlight.clear();
+ UringMainWriteInFlight = false;
+ UringMainWriteInFlightSince = TMonotonic::Zero();
+ UringXdcWriteInFlight = false;
+ UringZcEnabled = false;
+ XdcZcNotifCum = 0;
+ XdcDropWantedCum = 0;
+ XdcDroppedCum = 0;
+ XdcZcNotifQueue.clear();
+ // Tell the reaper to release this session ring. Without this the reaper keeps its
+ // own TUringContext reference forever, so io_uring_queue_exit never runs and the
+ // ring fd, eventfd and provided-buffer ring leak on every reconnect/handshake race.
+ const int eventFd = UringContext->GetEventFd();
+ UringContext.Reset();
+ Send(MakeUringPollerActorId(), new TEvUringUnregister(eventFd));
+ }
+
if (Socket) {
if (const TString& s = reason.ToString()) {
Proxy->Metrics->IncDisconnectByReason(s);
@@ -864,6 +906,260 @@ namespace NActors {
WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer;
}
+ void TInterconnectSessionTCP::WriteDataUring() {
+ static constexpr size_t maxBytesAtOnce = 256 * 1024;
+
+ auto submitStream = [&](NInterconnect::TOutgoingStream& stream, int socketFd, EUringOpTag tag,
+ size_t maxBytes, bool& inFlight, bool isOutOfBand) {
+ if (inFlight || !stream || UringContext->GetPendingWrites() >= TUringContext::MaxPendingWrites) {
+ return;
+ }
+
+ constexpr ui32 iovLimit = 256;
+ TStackVec<TConstIoVec, iovLimit> wbuffers;
+ stream.ProduceIoVec(wbuffers, iovLimit, maxBytes);
+ if (wbuffers.empty()) {
+ return;
+ }
+
+ ui64 seqNo = ++UringWriteSeqNo;
+ size_t totalBytes = 0;
+ std::vector<struct iovec> iovecs(wbuffers.size());
+ for (size_t i = 0; i < wbuffers.size(); ++i) {
+ iovecs[i].iov_base = const_cast<void*>(static_cast<const void*>(wbuffers[i].Data));
+ iovecs[i].iov_len = wbuffers[i].Size;
+ totalBytes += wbuffers[i].Size;
+ }
+
+ if (UringContext->SubmitWritev(socketFd, iovecs.data(), iovecs.size(), seqNo, tag)) {
+ UringContext->IncrementPendingWrites();
+ inFlight = true;
+ // Key the in-flight map by the SAME masked value the completion carries
+ // (the tag occupies the high byte of user_data), so the completion lookup can
+ // never miss and silently skip clearing the single-in-flight latch.
+ const ui64 key = seqNo & UringOpDataMask;
+ UringWritesInFlight[key] = TUringWriteInFlight{totalBytes, tag == EUringOpTag::XdcWritev, isOutOfBand, std::move(iovecs)};
+ if (tag == EUringOpTag::MainWritev) {
+ ++UringMainWritevSubmitted;
+ UringMainWriteInFlightSince = TActivationContext::Monotonic();
+ }
+ }
+ };
+
+ // Main socket carries two logical streams: the ordinary data stream (OutgoingStream,
+ // retained until peer-confirmed) and the priority out-of-band stream (OutOfBandStream,
+ // carrying confirm/flush packets, dropped as soon as written). Because io_uring writes
+ // on the main socket are single-in-flight, we only ever switch streams at a main-packet
+ // boundary (OutgoingOffset == 0); a partially-sent data packet is always finished first.
+ // This preserves on-wire packet framing while still letting confirms jump the queue.
+ auto submitMain = [&]() {
+ if (UringMainWriteInFlight || UringContext->GetPendingWrites() >= TUringContext::MaxPendingWrites) {
+ // Diagnostic: if the single-in-flight main-write latch has been held far longer
+ // than any keepalive period, the writev completion was never delivered and the
+ // sender has gone silent (peer will declare DeadPeer). Report it (throttled) with
+ // the submit accounting so we can tell a swallowed io_uring_submit error from a
+ // lost completion.
+ if (UringMainWriteInFlight && UringMainWriteInFlightSince != TMonotonic::Zero()) {
+ const TMonotonic now = TActivationContext::Monotonic();
+ if (now - UringMainWriteInFlightSince > TDuration::Seconds(2) &&
+ now - UringMainWriteStuckReported > TDuration::Seconds(2)) {
+ UringMainWriteStuckReported = now;
+ LOG_NOTICE_IC_SESSION("ICS41", "uring main write latch stuck for %.3fs"
+ " submitted# %" PRIu64 " completed# %" PRIu64 " pendingWrites# %" PRIu32
+ " oobSize# %zu submitCalls# %" PRIu64 " submitErrors# %" PRIu64
+ " submitPartials# %" PRIu64 " lastSubmitRet# %d sqeFull# %" PRIu64,
+ (now - UringMainWriteInFlightSince).SecondsFloat(),
+ UringMainWritevSubmitted, UringMainWriteCompleted,
+ UringContext->GetPendingWrites(), OutOfBandStream.CalculateOutgoingSize(),
+ UringContext->GetSubmitCalls(), UringContext->GetSubmitErrors(),
+ UringContext->GetSubmitPartials(), UringContext->GetLastSubmitRet(),
+ UringContext->GetSqeFull());
+ }
+ }
+ return;
+ }
+ if (OutOfBandStream && OutgoingOffset == 0) {
+ submitStream(OutOfBandStream, (int)*Socket, EUringOpTag::MainWritev, maxBytesAtOnce,
+ UringMainWriteInFlight, /*isOutOfBand=*/true);
+ } else if (OutgoingStream) {
+ submitStream(OutgoingStream, (int)*Socket, EUringOpTag::MainWritev, maxBytesAtOnce,
+ UringMainWriteInFlight, /*isOutOfBand=*/false);
+ } else if (OutOfBandStream) {
+ // No partial main packet in progress but OutgoingStream is empty: flush OOB.
+ submitStream(OutOfBandStream, (int)*Socket, EUringOpTag::MainWritev, maxBytesAtOnce,
+ UringMainWriteInFlight, /*isOutOfBand=*/true);
+ }
+ };
+
+ auto submitXdcZc = [&](NInterconnect::TOutgoingStream& stream, int socketFd, size_t maxBytes) {
+ if (UringXdcWriteInFlight || !stream || UringContext->GetPendingWrites() >= TUringContext::MaxPendingWrites) {
+ return;
+ }
+
+ constexpr ui32 iovLimit = 1;
+ TStackVec<TConstIoVec, iovLimit> wbuffers;
+ TStackVec<NInterconnect::TOutgoingStream::TBufController, iovLimit> controllers;
+ stream.ProduceIoVec(wbuffers, iovLimit, maxBytes, &controllers);
+ if (wbuffers.empty()) {
+ return;
+ }
+
+ auto& front = wbuffers.front();
+ ui64 seqNo = ++UringWriteSeqNo;
+
+ if (UringContext->SubmitSendZc(socketFd, front.Data, front.Size, seqNo)) {
+ UringContext->IncrementPendingWrites();
+ UringXdcWriteInFlight = true;
+ UringWritesInFlight[seqNo] = TUringWriteInFlight{front.Size, true, false, {}};
+ }
+ };
+
+ if (OutgoingStream || OutOfBandStream) {
+ submitMain();
+ }
+
+ if (XdcSocket && XdcStream) {
+ if (Proxy->Common->Settings.SocketSendOptimization == ESocketSendOptimization::IC_MSG_ZEROCOPY) {
+ submitXdcZc(XdcStream, (int)*XdcSocket, maxBytesAtOnce);
+ } else {
+ submitStream(XdcStream, (int)*XdcSocket, EUringOpTag::XdcWritev, maxBytesAtOnce,
+ UringXdcWriteInFlight, /*isOutOfBand=*/false);
+ }
+ }
+
+ UringContext->Flush();
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvUringRegisterFailed::TPtr& /*ev*/) {
+ // io_uring setup failed in the poller (ring/buffer-ring allocation). Fall back to the
+ // epoll TPollerActor so this session still has a working I/O backend instead of being
+ // left silently dead. Mirrors the non-uring registration branch in SetNewConnection.
+ if (UringContext || !Socket) {
+ return; // already have a backend, or socket already gone
+ }
+ LOG_NOTICE_IC_SESSION("ICS11", "uring registration failed, falling back to epoll PollerActor");
+ const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
+ Y_ABORT_UNLESS(success);
+ if (XdcSocket) {
+ const bool successXdc = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId()));
+ Y_ABORT_UNLESS(successXdc);
+ }
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvUringRegisterResult::TPtr& ev) {
+ auto* msg = ev->Get();
+ UringContext = std::move(msg->Context);
+ // Zero-copy XDC send gating is active only when this connection actually uses send_zc.
+ UringZcEnabled = XdcSocket &&
+ Proxy->Common->Settings.SocketSendOptimization == ESocketSendOptimization::IC_MSG_ZEROCOPY;
+ XdcZcNotifCum = 0;
+ XdcDropWantedCum = 0;
+ XdcDroppedCum = 0;
+ XdcZcNotifQueue.clear();
+ GenerateTraffic();
+ }
+
+ void TInterconnectSessionTCP::DropFrontXdc(size_t bytes) {
+ if (!UringZcEnabled) {
+ XdcStream.DropFront(bytes);
+ return;
+ }
+ // Defer the physical free until the kernel has released the buffers via NOTIF.
+ XdcDropWantedCum += bytes;
+ FlushXdcZcDrop();
+ }
+
+ void TInterconnectSessionTCP::FlushXdcZcDrop() {
+ const ui64 dropTarget = Min(XdcDropWantedCum, XdcZcNotifCum);
+ if (dropTarget > XdcDroppedCum) {
+ const size_t toDrop = dropTarget - XdcDroppedCum;
+ XdcStream.DropFront(toDrop);
+ XdcDroppedCum += toDrop;
+ }
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvUringWriteComplete::TPtr& ev) {
+ auto* msg = ev->Get();
+ ui64 seqNo = msg->UserData & UringOpDataMask;
+
+ auto it = UringWritesInFlight.find(seqNo);
+ if (it == UringWritesInFlight.end()) {
+ return;
+ }
+
+ auto flight = std::move(it->second);
+ UringWritesInFlight.erase(it);
+
+ if (flight.IsXdc) {
+ UringXdcWriteInFlight = false;
+ } else {
+ UringMainWriteInFlight = false;
+ UringMainWriteInFlightSince = TMonotonic::Zero();
+ ++UringMainWriteCompleted;
+ }
+
+ if (msg->Result > 0) {
+ size_t written = msg->Result;
+ if (flight.IsXdc) {
+ XdcStream.Advance(written);
+ XdcBytesSent += written;
+ XdcOffset += written;
+ if (UringZcEnabled) {
+ // Record this zero-copy send; its buffers stay referenced by the kernel
+ // until the matching NOTIF arrives (FIFO per socket).
+ XdcZcNotifQueue.push_back(written);
+ }
+ } else if (flight.IsOutOfBand) {
+ // Out-of-band (confirm/flush) bytes are never retained: advance the sent cursor
+ // (so UnsentBytes is decremented, exactly as the epoll path does via Write()) and
+ // then drop them immediately. Skipping Advance left UnsentBytes stale, eventually
+ // tripping the OutgoingStream invariant once the queue emptied.
+ OutOfBandStream.Advance(written);
+ OutOfBandStream.DropFront(written);
+ OutOfBandBytesSent += written;
+ BytesWrittenToSocket += written;
+ } else {
+ OutgoingStream.Advance(written);
+ OutgoingOffset += written;
+ BytesWrittenToSocket += written;
+
+ auto sendQueueIt = SendQueue.begin() + OutgoingIndex;
+ for (; OutgoingOffset && sendQueueIt != SendQueue.end() && sendQueueIt->PacketSize <= OutgoingOffset;
+ ++sendQueueIt, ++OutgoingIndex) {
+ OutgoingOffset -= sendQueueIt->PacketSize;
+ }
+ }
+ Proxy->Metrics->AddTotalBytesWritten(written);
+ DropConfirmed(LastConfirmed);
+ GenerateTraffic();
+ } else if (msg->Result < 0) {
+ int err = -msg->Result;
+ if (err != ECANCELED) {
+ LOG_NOTICE_NET(Proxy->PeerNodeId, "uring write error: %s", strerror(err));
+ ReestablishConnectionWithHandshake(TDisconnectReason::FromErrno(err));
+ }
+ } else {
+ LOG_NOTICE_NET(Proxy->PeerNodeId, "uring write: connection closed by peer%s", "");
+ if (!NumEventsInQueue && LastConfirmed == OutputCounter) {
+ Terminate(TDisconnectReason::EndOfStream());
+ } else {
+ ReestablishConnectionWithHandshake(TDisconnectReason::EndOfStream());
+ }
+ }
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvUringSendZcNotif::TPtr& ev) {
+ Y_UNUSED(ev);
+ // The kernel has released the buffers of the oldest outstanding zero-copy send.
+ // Advance the notif-confirmed offset and flush any drop that was waiting on it.
+ if (!UringZcEnabled || XdcZcNotifQueue.empty()) {
+ return;
+ }
+ XdcZcNotifCum += XdcZcNotifQueue.front();
+ XdcZcNotifQueue.pop_front();
+ FlushXdcZcDrop();
+ }
+
ssize_t TInterconnectSessionTCP::HandleWriteResult(ssize_t r, const TString& err) {
if (r > 0) {
return r;
@@ -972,6 +1268,21 @@ namespace NActors {
while (FlushSchedule && now >= FlushSchedule.top()) {
FlushSchedule.pop();
}
+ // Send-side heartbeat (DEBUG): paired with the input session's recv counters and the
+ // reaper ICUR50 stats, this shows on idle whether keepalive writevs keep being submitted
+ // and completed, or whether the single-in-flight latch is wedged. The flush timer fires
+ // roughly every ForceConfirmPeriod on an otherwise idle session, so this is low-rate.
+ if (UringContext) {
+ const double latchHeldS = (UringMainWriteInFlight && UringMainWriteInFlightSince != TMonotonic::Zero())
+ ? (now - UringMainWriteInFlightSince).SecondsFloat() : 0.0;
+ LOG_DEBUG_IC_SESSION("ICS42", "uring send hb submitted# %" PRIu64 " completed# %" PRIu64
+ " mainInFlight# %d latchHeld# %.3fs pendingWrites# %" PRIu32 " oobSize# %zu"
+ " submitErrors# %" PRIu64 " submitPartials# %" PRIu64 " sqeFull# %" PRIu64,
+ UringMainWritevSubmitted, UringMainWriteCompleted, (int)UringMainWriteInFlight,
+ latchHeldS, UringContext->GetPendingWrites(), OutOfBandStream.CalculateOutgoingSize(),
+ UringContext->GetSubmitErrors(), UringContext->GetSubmitPartials(),
+ UringContext->GetSqeFull());
+ }
if (Socket) {
if (now >= ForcePacketTimestamp) {
++ConfirmPacketsForcedByTimeout;
@@ -1133,7 +1444,7 @@ namespace NActors {
const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped;
OutgoingStream.DropFront(bytesDropped);
- XdcStream.DropFront(bytesDroppedFromXdc);
+ DropFrontXdc(bytesDroppedFromXdc);
if (lastDroppedSerial) {
ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
channel.DropConfirmed(*lastDroppedSerial, *Pool);
diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.h b/ydb/library/actors/interconnect/interconnect_tcp_session.h
index b29147a7d29..32f1a81c468 100644
--- a/ydb/library/actors/interconnect/interconnect_tcp_session.h
+++ b/ydb/library/actors/interconnect/interconnect_tcp_session.h
@@ -7,6 +7,7 @@
#include <ydb/library/actors/interconnect/logging/logging.h>
#include <ydb/library/actors/interconnect/poller/poller_tcp.h>
#include <ydb/library/actors/interconnect/poller/poller_actor.h>
+#include <ydb/library/actors/interconnect/poller/uring_poller_actor.h>
#include <ydb/library/actors/interconnect/retro_tracing/spans.h>
#include <ydb/library/actors/protos/services_common.pb.h>
#include <ydb/library/actors/util/datetime.h>
@@ -30,6 +31,7 @@
#include "events_local.h"
#include "interconnect_impl.h"
#include "interconnect_zc_processor.h"
+#include "uring_context.h"
#include "interconnect_channel.h"
#include "watchdog_timer.h"
#include "event_holder_pool.h"
@@ -267,6 +269,8 @@ namespace NActors {
cFunc(EvCheckDeadPeer, HandleCheckDeadPeer)
cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate)
hFunc(NMon::TEvHttpInfoRes, GenerateHttpInfo)
+ hFunc(TEvUringRecvComplete, Handle)
+ hFunc(TEvUringRegisterResult, Handle)
)
private:
@@ -281,6 +285,20 @@ namespace NActors {
TInterconnectProxyCommon::TPtr Common;
const ui32 NodeId;
const TSessionParams Params;
+
+ TUringContext::TPtr UringContext;
+ ui16 MainRecvBufGroupId = 0;
+ ui16 XdcRecvBufGroupId = 0;
+ bool MainRecvMultishotActive = false;
+ bool XdcRecvMultishotActive = false;
+ // Diagnostic recv-path counters (instrumentation for the idle-keepalive DeadPeer hunt).
+ ui64 UringMainRecvCompletions = 0;
+ ui64 UringMainRecvBytes = 0;
+ // XDC receive over io_uring: a single async readv directly into the XdcInputQ
+ // destination spans (or the catch-stream buffer) is kept in flight at a time.
+ ui64 UringXdcReadSeqNo = 0;
+ bool UringXdcReadInFlight = false;
+ bool UringXdcReadIsCatch = false;
NInterconnect::NRdma::TQueuePair::TPtr RdmaQp;
NInterconnect::NRdma::ICq::TPtr RdmaCq;
XXH3_state_t XxhashState;
@@ -306,7 +324,7 @@ namespace NActors {
};
std::deque<TInboundPacket> InboundPacketQ;
std::deque<std::tuple<ui16, TMutableContiguousSpan>> XdcInputQ; // target buffers for the XDC stream with channel reference
- std::deque<std::tuple<ui16, std::optional<ui32>>> XdcChecksumQ; // (size, optional(expectedChecksum)). nullopt if checksums are disabled.
+ std::deque<std::tuple<ui16, std::optional<ui32>>> XdcChecksumQ; // (size, optional(expectedChecksum)). nullopt if checksums are disabled.
ui32 XdcCurrentChecksum = 0;
// catch stream -- used after TCP reconnect to match XDC stream with main packet stream
@@ -339,9 +357,17 @@ namespace NActors {
void Handle(TEvPollerReady::TPtr ev);
void Handle(TEvPollerRegisterResult::TPtr ev);
+ void Handle(TEvUringRecvComplete::TPtr& ev);
+ void Handle(TEvUringRegisterResult::TPtr& ev);
void HandleConfirmUpdate();
void Handle(NInterconnect::NRdma::TEvRdmaReadDone::TPtr& ev);
void ReceiveData();
+ void StartRecvUring();
+ // XDC-over-io_uring receive helpers (Caveat 3).
+ void DriveXdcUring();
+ bool SubmitXdcRecvUring();
+ void ProcessXdcCatchBytesUring(ssize_t recvres);
+ void ProcessXdcBytesUring(ssize_t recvres);
void ProcessHeader();
void ProcessPayload(ui64 *numDataBytes);
void ProcessInboundPacketQ(ui64 numXdcBytesRead, ui64 numRdmaBytesRead);
@@ -541,6 +567,10 @@ namespace NActors {
hFunc(TEvSocketDisconnect, OnDisconnect)
hFunc(TEvTerminate, Handle)
hFunc(TEvProcessPingRequest, Handle)
+ hFunc(TEvUringRegisterResult, Handle)
+ hFunc(TEvUringRegisterFailed, Handle)
+ hFunc(TEvUringWriteComplete, Handle)
+ hFunc(TEvUringSendZcNotif, Handle)
)
UpdateUtilization();
}
@@ -573,7 +603,12 @@ namespace NActors {
void Handle(TEvPollerReady::TPtr& ev);
void Handle(TEvPollerRegisterResult::TPtr ev);
+ void Handle(TEvUringRegisterResult::TPtr& ev);
+ void Handle(TEvUringRegisterFailed::TPtr& ev);
+ void Handle(TEvUringWriteComplete::TPtr& ev);
+ void Handle(TEvUringSendZcNotif::TPtr& ev);
void WriteData();
+ void WriteDataUring();
ssize_t HandleWriteResult(ssize_t r, const TString& err);
ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes);
@@ -686,6 +721,38 @@ namespace NActors {
TPollerToken::TPtr PollerToken;
TPollerToken::TPtr XdcPollerToken;
ui32 SendBufferSize;
+
+ TUringContext::TPtr UringContext;
+ ui64 UringWriteSeqNo = 0;
+ bool UringMainWriteInFlight = false;
+ bool UringXdcWriteInFlight = false;
+ struct TUringWriteInFlight {
+ size_t Bytes;
+ bool IsXdc;
+ bool IsOutOfBand = false;
+ std::vector<struct iovec> Iovecs;
+ };
+ THashMap<ui64, TUringWriteInFlight> UringWritesInFlight;
+
+ // Diagnostic send-path counters (instrumentation for the idle-keepalive DeadPeer hunt).
+ ui64 UringMainWritevSubmitted = 0; // main-socket writevs handed to io_uring
+ ui64 UringMainWriteCompleted = 0; // main-socket writev completions processed
+ // Monotonic timestamp at which UringMainWriteInFlight last went true; TMonotonic::Zero()
+ // when the latch is clear. Used to detect a wedged single-in-flight write latch.
+ TMonotonic UringMainWriteInFlightSince;
+ TMonotonic UringMainWriteStuckReported; // throttles the "latch stuck" NOTICE
+
+ // IORING_OP_SEND_ZC buffer lifetime (Caveat 5). When the XDC stream is sent with
+ // io_uring zero-copy, an XdcStream buffer must not be freed (DropFront) until the
+ // kernel posts the corresponding IORING_CQE_F_NOTIF. We advance the read cursor on
+ // the data CQE (safe) but gate DropFront on the cumulative notif-confirmed offset.
+ bool UringZcEnabled = false;
+ ui64 XdcZcNotifCum = 0; // cumulative XDC bytes whose send_zc NOTIF has arrived
+ ui64 XdcDropWantedCum = 0; // cumulative XDC bytes the peer has confirmed for dropping
+ ui64 XdcDroppedCum = 0; // cumulative XDC bytes physically dropped from XdcStream
+ std::deque<ui64> XdcZcNotifQueue; // FIFO of in-flight zc send sizes awaiting NOTIF
+ void DropFrontXdc(size_t bytes);
+ void FlushXdcZcDrop();
ui64 InflightDataAmount = 0;
ui64 RdmaInflightDataAmount = 0;
diff --git a/ydb/library/actors/interconnect/poller/uring_poller_actor.cpp b/ydb/library/actors/interconnect/poller/uring_poller_actor.cpp
new file mode 100644
index 00000000000..42205824119
--- /dev/null
+++ b/ydb/library/actors/interconnect/poller/uring_poller_actor.cpp
@@ -0,0 +1,726 @@
+#include "uring_poller_actor.h"
+#include "poller_actor.h"
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/core/log.h>
+#include <ydb/library/actors/protos/services_common.pb.h>
+#include <ydb/library/actors/interconnect/uring_context.h>
+#include <ydb/library/actors/interconnect/uring_recv_buffer_pool.h>
+
+#include <util/system/thread.h>
+#include <util/system/mutex.h>
+#include <util/system/guard.h>
+
+#include <liburing.h>
+#include <sys/eventfd.h>
+#include <poll.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+namespace NActors {
+
+ // Per-session ring state, owned by the reaper thread.
+ struct TUringSessionRing {
+ TUringContext::TPtr Context;
+ TUringRecvBufferPool MainRecvPool;
+ int EventFd = -1;
+ int MainSocketFd = -1;
+ int XdcSocketFd = -1;
+ };
+
+ // Dedicated OS thread that owns the anchor ring and reaps completions from all
+ // session rings. It removes the dependency on the shared epoll TPollerActor:
+ // each session ring signals its eventfd on a CQE; the anchor ring arms a
+ // multishot POLL on every such eventfd, and this thread blocks on the anchor.
+ class TUringReaper : public ISimpleThread {
+ // Sentinel user_data values for anchor-ring control completions. Real session entries
+ // use the eventfd value (a small positive int) in the low 32 bits, so these high-bit
+ // sentinels can never collide with them.
+ //
+ // IMPORTANT: liburing reserves user_data == (__u64)-1 (LIBURING_UDATA_TIMEOUT) for the
+ // internal timeout completion that io_uring_wait_cqe_timeout posts on kernels without
+ // IORING_FEAT_EXT_ARG. ShutdownUserData previously used ~0 too, so every backstop timeout
+ // was misread as a shutdown-fd event: the idle sweep never ran (backstopTicks stayed 0)
+ // and the reaper hot-spun re-arming the shutdown poll. Keep ~0 reserved for liburing.
+ static constexpr ui64 LiburingTimeoutUserData = ~ui64(0); // == LIBURING_UDATA_TIMEOUT
+ static constexpr ui64 ShutdownUserData = ~ui64(0) - 1;
+ static constexpr ui64 WakeUserData = ~ui64(0) - 2;
+ static constexpr ui64 PollRemoveUserData = ~ui64(0) - 3;
+ // Our own periodic backstop timer, armed on the anchor ring (see ArmTimeout). We block in
+ // io_uring_wait_cqe() and let this timeout (plus session eventfd polls and WakeFd) wake us,
+ // instead of io_uring_wait_cqe_timeout() which busy-spun on this kernel (returning without
+ // blocking), pinning a CPU core.
+ static constexpr ui64 TimerUserData = ~ui64(0) - 4;
+
+ public:
+ explicit TUringReaper(TActorSystem* actorSystem, bool enableSqpoll)
+ : ActorSystem(actorSystem)
+ , EnableSqpoll(enableSqpoll)
+ {
+ AnchorRing = std::make_unique<struct io_uring>();
+ struct io_uring_params params = {};
+ // SQPOLL is opt-in (TInterconnectSettings::EnableUringSQPOLL). When off (default),
+ // the reaper submits anchor SQEs explicitly and the session threads submit their own
+ // I/O explicitly. When on, the anchor ring sets SQPOLL here and every session ring
+ // sets SQPOLL + ATTACH_WQ (wq_fd = this anchor ring), so they all share ONE kernel
+ // poll thread rather than spawning one per session.
+ if (EnableSqpoll) {
+ params.flags |= IORING_SETUP_SQPOLL;
+ params.sq_thread_idle = SqThreadIdleMs;
+ }
+ int ret = io_uring_queue_init_params(AnchorQueueDepth, AnchorRing.get(), &params);
+ Y_ABORT_UNLESS(ret >= 0, "io_uring_queue_init_params(anchor) failed: %d", ret);
+ AnchorRingFd = AnchorRing->ring_fd;
+
+ ShutdownFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+ Y_ABORT_UNLESS(ShutdownFd >= 0);
+ WakeFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+ Y_ABORT_UNLESS(WakeFd >= 0);
+
+ with_lock (Mutex) {
+ ArmPoll(ShutdownFd, ShutdownUserData);
+ ArmPoll(WakeFd, WakeUserData);
+ ArmTimeout();
+ io_uring_submit(AnchorRing.get());
+ }
+
+ GraveyardThread = std::thread([this] { GraveyardProc(); });
+ }
+
+ ~TUringReaper() {
+ StopAndJoin();
+ if (AnchorRing) {
+ io_uring_queue_exit(AnchorRing.get());
+ AnchorRing.reset();
+ }
+ if (ShutdownFd >= 0) {
+ close(ShutdownFd);
+ }
+ if (WakeFd >= 0) {
+ close(WakeFd);
+ }
+ }
+
+ int GetAnchorRingFd() const {
+ return AnchorRingFd;
+ }
+
+ bool GetEnableSqpoll() const {
+ return EnableSqpoll;
+ }
+
+ // Aggregate reaper liveness counters (instrumentation for the idle-keepalive DeadPeer
+ // hunt). Read from the TUringPollerActor thread for periodic logging; the reaper thread
+ // is the only writer, so relaxed loads are sufficient.
+ struct TStats {
+ ui64 BackstopTicks;
+ ui64 AnchorWakeups;
+ ui64 TotalCqesReaped;
+ ui64 RecvCqes;
+ ui64 WriteCqes;
+ ui64 ActiveSessions;
+ ui64 EmptyWakes;
+ ui64 LastAnchorUserData;
+ ui64 PollWakeups;
+ };
+ TStats GetStats() const {
+ return TStats{
+ BackstopTicks.load(std::memory_order_relaxed),
+ AnchorWakeups.load(std::memory_order_relaxed),
+ TotalCqesReaped.load(std::memory_order_relaxed),
+ RecvCqes.load(std::memory_order_relaxed),
+ WriteCqes.load(std::memory_order_relaxed),
+ ActiveSessions.load(std::memory_order_relaxed),
+ EmptyWakes.load(std::memory_order_relaxed),
+ LastAnchorUserData.load(std::memory_order_relaxed),
+ PollWakeups.load(std::memory_order_relaxed),
+ };
+ }
+
+ // Called from the actor (TUringPollerActor) thread.
+ void AddSession(std::unique_ptr<TUringSessionRing> session) {
+ int efd = session->EventFd;
+ with_lock (Mutex) {
+ Sessions.push_back(std::move(session));
+ EventFdToSession[efd] = Sessions.back().get();
+ ArmPoll(efd, static_cast<ui64>(static_cast<ui32>(efd)));
+ io_uring_submit(AnchorRing.get());
+ ActiveSessions.store(Sessions.size(), std::memory_order_relaxed);
+ }
+ }
+
+ // Called from the actor (TUringPollerActor) thread on session teardown. The actual
+ // removal/destruction happens on the reaper thread (it owns the rings) to avoid racing
+ // with ReapSession; here we just enqueue the request and wake the reaper.
+ void RemoveSession(int eventFd) {
+ with_lock (RemovalMutex) {
+ PendingRemoval.push_back(eventFd);
+ }
+ const uint64_t one = 1;
+ [[maybe_unused]] ssize_t r = write(WakeFd, &one, sizeof(one));
+ }
+
+ void StopAndJoin() {
+ if (Stop.exchange(true)) {
+ return;
+ }
+ const uint64_t one = 1;
+ [[maybe_unused]] ssize_t r = write(ShutdownFd, &one, sizeof(one));
+ Join();
+ // The reaper thread has exited and will no longer hand work to the graveyard;
+ // stop it and let it drain whatever remains.
+ {
+ std::lock_guard<std::mutex> lk(GraveyardMutex);
+ GraveyardStop = true;
+ }
+ GraveyardCv.notify_one();
+ if (GraveyardThread.joinable()) {
+ GraveyardThread.join();
+ }
+ }
+
+ private:
+ void* ThreadProc() override {
+ TThread::SetCurrentThreadName("IcUringReaper");
+ while (!Stop.load(std::memory_order_acquire)) {
+ struct io_uring_cqe* cqe = nullptr;
+ // Block until the anchor ring wakes us: a session eventfd POLLIN (real work), the
+ // WakeFd (teardown), the ShutdownFd, or our own periodic backstop timer (see
+ // ArmTimeout). The timer doubles as a safety net: even if an eventfd wakeup were
+ // ever lost, the timer fires every BackstopPeriod and we sweep every session ring,
+ // so a session can never be wedged with undelivered completions. A blocking wait
+ // (vs io_uring_wait_cqe_timeout, which busy-spun on this kernel) keeps the reaper
+ // off-CPU while idle.
+ int r = io_uring_wait_cqe(AnchorRing.get(), &cqe);
+ if (r < 0) {
+ if (r == -EINTR) {
+ continue;
+ }
+ break;
+ }
+ bool sawTimeout = false;
+ AnchorWakeups.fetch_add(1, std::memory_order_relaxed);
+
+ std::vector<int> reArm;
+ bool reArmShutdown = false;
+ bool reArmWake = false;
+ bool reArmTimeout = false;
+ unsigned head;
+ unsigned count = 0;
+ ui64 LastSeenUd = 0;
+ io_uring_for_each_cqe(AnchorRing.get(), head, cqe) {
+ ++count;
+ const ui64 ud = io_uring_cqe_get_data64(cqe);
+ LastSeenUd = ud;
+ const bool more = cqe->flags & IORING_CQE_F_MORE;
+ if (ud == LiburingTimeoutUserData) {
+ // liburing's internal timeout completion; not a real event.
+ sawTimeout = true;
+ continue;
+ }
+ if (ud == TimerUserData) {
+ // Our periodic backstop timer fired; sweep below and re-arm it.
+ sawTimeout = true;
+ reArmTimeout = true;
+ continue;
+ }
+ if (ud == ShutdownUserData) {
+ DrainEventFd(ShutdownFd);
+ if (!more) {
+ reArmShutdown = true;
+ }
+ continue;
+ }
+ if (ud == WakeUserData) {
+ DrainEventFd(WakeFd);
+ if (!more) {
+ reArmWake = true;
+ }
+ continue;
+ }
+ if (ud == PollRemoveUserData) {
+ // Completion of a poll_remove we issued while tearing a session down;
+ // nothing to reap, the session ring is already (being) destroyed.
+ continue;
+ }
+ const int efd = static_cast<int>(static_cast<ui32>(ud));
+ PollWakeups.fetch_add(1, std::memory_order_relaxed);
+ DrainEventFd(efd);
+ if (TUringSessionRing* session = Lookup(efd)) {
+ ReapSession(session);
+ }
+ if (!more) {
+ reArm.push_back(efd);
+ }
+ }
+ if (count == 0) {
+ EmptyWakes.fetch_add(1, std::memory_order_relaxed);
+ } else {
+ LastAnchorUserData.store(LastSeenUd, std::memory_order_relaxed);
+ }
+ io_uring_cq_advance(AnchorRing.get(), count);
+
+ // Drop any sessions whose output actor asked to unregister. Done on this thread
+ // (which owns the rings) so io_uring_queue_exit and eventfd close are race-free.
+ ProcessPendingRemovals();
+
+ // Backstop: sweep every session ring directly so a lost eventfd wakeup cannot
+ // starve keepalives. Driven by wall-clock elapsed time (not solely the -ETIME
+ // return) so it fires reliably regardless of how the kernel delivers the timeout,
+ // and is throttled to ~BackstopPeriod so it stays cheap even if the wait returns
+ // early. This is the safety net that makes idle multi-session nodes robust.
+ const auto nowTp = std::chrono::steady_clock::now();
+ if (sawTimeout || (nowTp - LastSweep) >= std::chrono::nanoseconds(BackstopPeriodNs)) {
+ BackstopTicks.fetch_add(1, std::memory_order_relaxed);
+ LastSweep = nowTp;
+ SweepAllSessions();
+ }
+
+ if (!reArm.empty() || reArmShutdown || reArmWake || reArmTimeout) {
+ with_lock (Mutex) {
+ for (int efd : reArm) {
+ // Skip eventfds whose session was removed in ProcessPendingRemovals.
+ if (EventFdToSession.contains(efd)) {
+ ArmPoll(efd, static_cast<ui64>(static_cast<ui32>(efd)));
+ }
+ }
+ if (reArmShutdown) {
+ ArmPoll(ShutdownFd, ShutdownUserData);
+ }
+ if (reArmWake) {
+ ArmPoll(WakeFd, WakeUserData);
+ }
+ if (reArmTimeout) {
+ ArmTimeout();
+ }
+ io_uring_submit(AnchorRing.get());
+ }
+ }
+ }
+ return nullptr;
+ }
+
+ // Reaper-thread side of RemoveSession: cancel the anchor poll for each torn-down
+ // session, detach it from the maps and destroy it (exiting its ring and releasing
+ // its registered socket files / eventfd / buffer ring).
+ void ProcessPendingRemovals() {
+ std::vector<int> toRemove;
+ with_lock (RemovalMutex) {
+ toRemove.swap(PendingRemoval);
+ }
+ if (toRemove.empty()) {
+ return;
+ }
+
+ std::vector<std::unique_ptr<TUringSessionRing>> dying;
+ with_lock (Mutex) {
+ for (int efd : toRemove) {
+ auto it = EventFdToSession.find(efd);
+ if (it == EventFdToSession.end()) {
+ continue; // never added, or already removed
+ }
+ EventFdToSession.erase(it);
+ // Cancel the multishot poll armed on this eventfd before it is closed.
+ PollRemove(static_cast<ui64>(static_cast<ui32>(efd)));
+ for (auto sit = Sessions.begin(); sit != Sessions.end(); ++sit) {
+ if ((*sit)->EventFd == efd) {
+ dying.push_back(std::move(*sit));
+ Sessions.erase(sit);
+ break;
+ }
+ }
+ }
+ io_uring_submit(AnchorRing.get());
+ ActiveSessions.store(Sessions.size(), std::memory_order_relaxed);
+ }
+
+ // Hand the dying rings to the graveyard thread instead of destroying them here.
+ // ~TUringSessionRing frees the buffer ring (while the session Ring is still valid)
+ // and then drops the TUringContext reference, which runs io_uring_queue_exit() —
+ // releasing the registered socket files (so the kernel can finally drop them) and
+ // closing the session eventfd. io_uring_queue_exit() can block, so keeping it off
+ // this thread ensures connection churn never stalls completion processing (and the
+ // idle keepalive backstop) for the still-live sessions.
+ if (!dying.empty()) {
+ {
+ std::lock_guard<std::mutex> lk(GraveyardMutex);
+ for (auto& d : dying) {
+ Graveyard.push_back(std::move(d));
+ }
+ }
+ GraveyardCv.notify_one();
+ }
+ }
+
+ // Dedicated thread that runs the (potentially blocking) destruction of removed session
+ // rings, off the reaper hot path.
+ void GraveyardProc() {
+ TThread::SetCurrentThreadName("IcUringGrave");
+ for (;;) {
+ std::vector<std::unique_ptr<TUringSessionRing>> batch;
+ {
+ std::unique_lock<std::mutex> lk(GraveyardMutex);
+ GraveyardCv.wait(lk, [this] { return GraveyardStop || !Graveyard.empty(); });
+ batch.swap(Graveyard);
+ if (batch.empty() && GraveyardStop) {
+ return;
+ }
+ }
+ batch.clear(); // runs ~TUringSessionRing -> io_uring_queue_exit off the reaper
+ }
+ }
+
+ // Reap every live session ring. Used as the idle backstop. Session removal happens only
+ // on this same reaper thread (ProcessPendingRemovals), so a snapshot taken under the
+ // lock stays valid for the duration of the sweep even if AddSession reallocates the
+ // Sessions vector concurrently.
+ void SweepAllSessions() {
+ std::vector<TUringSessionRing*> snapshot;
+ with_lock (Mutex) {
+ snapshot.reserve(Sessions.size());
+ for (auto& s : Sessions) {
+ snapshot.push_back(s.get());
+ }
+ }
+ for (TUringSessionRing* session : snapshot) {
+ ReapSession(session);
+ }
+ }
+
+ static void DrainEventFd(int fd) {
+ uint64_t val;
+ [[maybe_unused]] ssize_t r = read(fd, &val, sizeof(val));
+ }
+
+ TUringSessionRing* Lookup(int efd) {
+ with_lock (Mutex) {
+ auto it = EventFdToSession.find(efd);
+ return it == EventFdToSession.end() ? nullptr : it->second;
+ }
+ }
+
+ // Must be called with Mutex held. Returns an SQE from the anchor ring, submitting any
+ // already-queued SQEs to make room if the SQ is momentarily full. Returns nullptr only
+ // if the SQ stays full after repeated submits (effectively unreachable with the current
+ // queue depth).
+ struct io_uring_sqe* GetAnchorSqe() {
+ struct io_uring_sqe* sqe = io_uring_get_sqe(AnchorRing.get());
+ for (int attempt = 0; !sqe && attempt < 1024; ++attempt) {
+ io_uring_submit(AnchorRing.get());
+ sqe = io_uring_get_sqe(AnchorRing.get());
+ }
+ return sqe;
+ }
+
+ // Must be called with Mutex held. Arms a ONE-SHOT poll for POLLIN on `fd`. We deliberately
+ // do NOT use multishot here: a multishot poll armed on a ring-registered eventfd was
+ // observed to fire exactly once and then go silent forever (the kernel terminated the
+ // multishot after the first completion without setting a final !F_MORE CQE we could detect
+ // to re-arm), so every session ran solely off the 100ms backstop timer — fine for idle
+ // keepalive but it throttled throughput to ~10 writes/s. A one-shot poll that we re-arm
+ // after every reap is robust: after we drain the eventfd and reap, the next CQE re-signals
+ // the (now readable) eventfd and the freshly armed one-shot poll fires immediately.
+ void ArmPoll(int fd, ui64 userData) {
+ struct io_uring_sqe* sqe = GetAnchorSqe();
+ if (!sqe) {
+ // Graceful degradation instead of aborting the whole process. A missed arm here
+ // only delays reaping for this fd until the next backstop sweep re-arms it.
+ return;
+ }
+ io_uring_prep_poll_add(sqe, fd, POLLIN);
+ io_uring_sqe_set_data64(sqe, userData);
+ }
+
+ // Must be called with Mutex held. Arms a one-shot relative timeout on the anchor ring used
+ // as the idle backstop. Re-armed each time it fires. BackstopTs is a member because
+ // io_uring_prep_timeout keeps a pointer to it until the timeout completes.
+ void ArmTimeout() {
+ struct io_uring_sqe* sqe = GetAnchorSqe();
+ if (!sqe) {
+ return;
+ }
+ BackstopTs.tv_sec = 0;
+ BackstopTs.tv_nsec = BackstopPeriodNs;
+ io_uring_prep_timeout(sqe, &BackstopTs, 0, 0);
+ io_uring_sqe_set_data64(sqe, TimerUserData);
+ }
+
+ // Must be called with Mutex held. Cancels a previously armed multishot poll identified
+ // by the user_data it was armed with.
+ void PollRemove(ui64 targetUserData) {
+ struct io_uring_sqe* sqe = GetAnchorSqe();
+ if (!sqe) {
+ return;
+ }
+ io_uring_prep_poll_remove(sqe, targetUserData);
+ io_uring_sqe_set_data64(sqe, PollRemoveUserData);
+ }
+
+ void SendEv(const TActorId& recipient, IEventBase* ev) {
+ ActorSystem->Send(new IEventHandle(recipient, TActorId(), ev));
+ }
+
+ void ReapSession(TUringSessionRing* session) {
+ struct io_uring* ring = session->Context->GetRing();
+ if (!ring) {
+ return;
+ }
+
+ // Recycle any provided-buffer-ring buffers that consumers have released.
+ session->MainRecvPool.DrainFreelist();
+
+ struct io_uring_cqe* cqes[BatchSize];
+ unsigned count = io_uring_peek_batch_cqe(ring, cqes, BatchSize);
+
+ for (unsigned i = 0; i < count; ++i) {
+ struct io_uring_cqe* cqe = cqes[i];
+ const ui64 userData = io_uring_cqe_get_data64(cqe);
+ const EUringOpTag tag = static_cast<EUringOpTag>(userData & UringOpTagMask);
+
+ switch (tag) {
+ case EUringOpTag::MainWritev:
+ case EUringOpTag::XdcWritev:
+ WriteCqes.fetch_add(1, std::memory_order_relaxed);
+ session->Context->DecrementPendingWrites();
+ SendEv(session->Context->GetWriteActorId(),
+ new TEvUringWriteComplete(cqe->res, userData));
+ break;
+
+ case EUringOpTag::MainRecv: {
+ RecvCqes.fetch_add(1, std::memory_order_relaxed);
+ if (!(cqe->flags & IORING_CQE_F_MORE)) {
+ session->Context->DecrementPendingRecvs();
+ }
+ TRcBuf data;
+ if (cqe->res > 0 && (cqe->flags & IORING_CQE_F_BUFFER)) {
+ ui16 bufIdx = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
+ // Zero-copy: wrap the provided buffer directly; it is recycled
+ // back into the buffer ring when the last TRcBuf reference drops.
+ data = session->MainRecvPool.WrapBuffer(bufIdx, cqe->res);
+ }
+ SendEv(session->Context->GetReadActorId(),
+ new TEvUringRecvComplete(cqe->res, cqe->flags, userData, std::move(data)));
+ break;
+ }
+
+ case EUringOpTag::XdcRecv:
+ RecvCqes.fetch_add(1, std::memory_order_relaxed);
+ // XDC recv reads directly into destination spans (no provided buffer);
+ // forward the byte count to the input session for post-processing.
+ SendEv(session->Context->GetReadActorId(),
+ new TEvUringRecvComplete(cqe->res, cqe->flags, userData, TRcBuf()));
+ break;
+
+ case EUringOpTag::XdcSendZc:
+ if (cqe->flags & IORING_CQE_F_NOTIF) {
+ // Buffer-release notification: gates buffer reuse, but does not
+ // count against pending writes.
+ SendEv(session->Context->GetWriteActorId(),
+ new TEvUringSendZcNotif(userData));
+ } else {
+ // Data CQE: the send operation itself has completed. The F_NOTIF
+ // CQE that follows only signals buffer release.
+ session->Context->DecrementPendingWrites();
+ SendEv(session->Context->GetWriteActorId(),
+ new TEvUringWriteComplete(cqe->res, userData));
+ }
+ break;
+
+ case EUringOpTag::XdcSendNotif:
+ SendEv(session->Context->GetWriteActorId(),
+ new TEvUringSendZcNotif(userData));
+ break;
+
+ case EUringOpTag::CancelAll:
+ break;
+ }
+ }
+
+ if (count > 0) {
+ TotalCqesReaped.fetch_add(count, std::memory_order_relaxed);
+ io_uring_cq_advance(ring, count);
+ }
+ }
+
+ private:
+ static constexpr unsigned AnchorQueueDepth = 4096;
+ static constexpr unsigned BatchSize = 64;
+ // SQPOLL kernel-thread idle window (ms) before it sleeps; only used when SQPOLL is on.
+ static constexpr ui32 SqThreadIdleMs = 2000;
+ // Idle backstop period for the reaper wait. Must stay comfortably below the
+ // interconnect DeadPeerTimeout so a worst-case lost wakeup is recovered long before a
+ // session could be declared dead.
+ static constexpr long long BackstopPeriodNs = 100'000'000; // 100 ms
+
+ TActorSystem* const ActorSystem;
+ const bool EnableSqpoll;
+ std::unique_ptr<struct io_uring> AnchorRing;
+ int AnchorRingFd = -1;
+ int ShutdownFd = -1;
+ int WakeFd = -1; // signalled by RemoveSession to wake the reaper for teardown
+ std::atomic<bool> Stop{false};
+
+ TMutex Mutex; // guards anchor SQ submissions + session maps
+ THashMap<int, TUringSessionRing*> EventFdToSession;
+ std::vector<std::unique_ptr<TUringSessionRing>> Sessions;
+
+ TMutex RemovalMutex; // guards PendingRemoval
+ std::vector<int> PendingRemoval;
+
+ // Off-thread destruction of removed session rings (see GraveyardProc).
+ std::thread GraveyardThread;
+ std::mutex GraveyardMutex;
+ std::condition_variable GraveyardCv;
+ std::vector<std::unique_ptr<TUringSessionRing>> Graveyard;
+ bool GraveyardStop = false;
+
+ // Backing storage for the periodic backstop timeout SQE (see ArmTimeout); io_uring keeps
+ // a pointer to it, so it must outlive the in-flight timeout. Touched only with Mutex held.
+ struct __kernel_timespec BackstopTs{};
+
+ // Last time SweepAllSessions ran; throttles the time-based backstop. Reaper-thread only.
+ std::chrono::steady_clock::time_point LastSweep = std::chrono::steady_clock::now();
+
+ // Liveness counters (see GetStats). Written only by the reaper thread.
+ std::atomic<ui64> BackstopTicks{0}; // idle backstop sweeps performed
+ std::atomic<ui64> AnchorWakeups{0}; // woke on at least one anchor CQE
+ std::atomic<ui64> TotalCqesReaped{0}; // session-ring CQEs consumed across all sessions
+ std::atomic<ui64> RecvCqes{0}; // of which main/XDC recv completions
+ std::atomic<ui64> WriteCqes{0}; // of which writev/send_zc data completions
+ std::atomic<ui64> ActiveSessions{0}; // current live session rings
+ std::atomic<ui64> EmptyWakes{0}; // diag: wait returned but CQ was empty
+ std::atomic<ui64> LastAnchorUserData{0}; // diag: last non-empty anchor CQE user_data
+ std::atomic<ui64> PollWakeups{0}; // diag: session eventfd poll CQEs processed
+ };
+
+ class TUringPollerActor : public TActorBootstrapped<TUringPollerActor> {
+ std::unique_ptr<TUringReaper> Reaper;
+ const bool EnableSqpoll;
+ ui16 NextBufGroupId = 0;
+
+ public:
+ static constexpr TDuration StatsLogPeriod = TDuration::Seconds(5);
+
+ explicit TUringPollerActor(bool enableSqpoll)
+ : EnableSqpoll(enableSqpoll)
+ {}
+
+ void Bootstrap() {
+ Reaper = std::make_unique<TUringReaper>(TActivationContext::ActorSystem(), EnableSqpoll);
+ Reaper->Start();
+ Become(&TUringPollerActor::StateFunc);
+ Schedule(StatsLogPeriod, new TEvents::TEvWakeup());
+ }
+
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvUringRegister::EventType: {
+ auto* x = reinterpret_cast<TEvUringRegister::TPtr*>(&ev);
+ Handle(*x);
+ break;
+ }
+ case TEvUringUnregister::EventType: {
+ auto* x = reinterpret_cast<TEvUringUnregister::TPtr*>(&ev);
+ Handle(*x);
+ break;
+ }
+ case TEvents::TSystem::Wakeup:
+ LogStats();
+ Schedule(StatsLogPeriod, new TEvents::TEvWakeup());
+ break;
+ case TEvents::TSystem::PoisonPill:
+ PassAway();
+ break;
+ default:
+ break;
+ }
+ }
+
+ // Periodic proof-of-life for the reaper thread plus enough counters to tell, after the
+ // fact, whether completions were flowing while sessions were being declared DeadPeer:
+ // if RecvCqes/WriteCqes keep climbing the reaper is healthy and the stall is upstream
+ // (e.g. the sender never submitted), whereas frozen counters with live sessions point at
+ // a reaper-thread stall.
+ void LogStats() {
+ const TUringReaper::TStats s = Reaper->GetStats();
+ LOG_NOTICE(*TlsActivationContext, NActorsServices::INTERCONNECT,
+ "ICUR50 uring reaper stats activeSessions# %" PRIu64 " backstopTicks# %" PRIu64
+ " anchorWakeups# %" PRIu64 " totalCqes# %" PRIu64 " recvCqes# %" PRIu64
+ " writeCqes# %" PRIu64 " emptyWakes# %" PRIu64 " pollWakeups# %" PRIu64
+ " lastUd# 0x%" PRIx64,
+ s.ActiveSessions, s.BackstopTicks, s.AnchorWakeups, s.TotalCqesReaped,
+ s.RecvCqes, s.WriteCqes, s.EmptyWakes, s.PollWakeups, s.LastAnchorUserData);
+ }
+
+ void Handle(TEvUringRegister::TPtr& ev) {
+ auto* msg = ev->Get();
+
+ auto session = std::make_unique<TUringSessionRing>();
+ session->MainSocketFd = msg->Socket ? msg->Socket->GetDescriptor() : -1;
+ session->XdcSocketFd = msg->XdcSocket ? msg->XdcSocket->GetDescriptor() : -1;
+
+ auto context = MakeIntrusive<TUringContext>(msg->WriteActorId, msg->ReadActorId);
+ if (!context->Init(Reaper->GetAnchorRingFd(), EnableSqpoll)) {
+ // Ring creation failed: tell the session to fall back to the epoll TPollerActor
+ // so it is never left without an I/O backend.
+ Send(msg->WriteActorId, new TEvUringRegisterFailed());
+ return;
+ }
+
+ // Register socket fds as fixed files to avoid per-I/O fget/fput. This is a pure
+ // optimization; on failure the context transparently uses raw fds, so it is not
+ // fatal and does not force the epoll fallback.
+ context->RegisterFiles(session->MainSocketFd, session->XdcSocketFd);
+
+ // Provided buffer ring for the main socket recv path. If this fails the recv
+ // multishot would loop forever on -ENOBUFS, so treat it as fatal and fall back.
+ ui16 mainBufGid = NextBufGroupId++;
+ ui16 xdcBufGid = NextBufGroupId++;
+ if (!session->MainRecvPool.Init(context->GetRing(), mainBufGid)) {
+ Send(msg->WriteActorId, new TEvUringRegisterFailed());
+ return;
+ }
+
+ session->Context = context;
+ // CRITICAL: publish the ring's eventfd so the reaper arms its anchor poll on THIS
+ // session's real, unique eventfd. Without it EventFd stayed -1: every session armed a
+ // multishot poll on the invalid fd -1 (POLLNVAL hot-spin) and all sessions collided on
+ // map key -1, so only the most recently registered session was ever reaped. On a
+ // multi-session node every other session got no completions delivered and was declared
+ // DeadPeer at the keepalive timeout (the cluster-wide meltdown); a 2-node loopback test
+ // has a single session and so accidentally worked.
+ session->EventFd = context->GetEventFd();
+
+ Reaper->AddSession(std::move(session));
+
+ Send(msg->WriteActorId, new TEvUringRegisterResult(context, mainBufGid, xdcBufGid));
+ Send(msg->ReadActorId, new TEvUringRegisterResult(context, mainBufGid, xdcBufGid));
+ }
+
+ void Handle(TEvUringUnregister::TPtr& ev) {
+ Reaper->RemoveSession(ev->Get()->EventFd);
+ }
+
+ void PassAway() override {
+ if (Reaper) {
+ Reaper->StopAndJoin();
+ Reaper.reset();
+ }
+ TActorBootstrapped::PassAway();
+ }
+ };
+
+ IActor* CreateUringPollerActor(bool enableSqpoll) {
+ return new TUringPollerActor(enableSqpoll);
+ }
+
+} // namespace NActors
diff --git a/ydb/library/actors/interconnect/poller/uring_poller_actor.h b/ydb/library/actors/interconnect/poller/uring_poller_actor.h
new file mode 100644
index 00000000000..afce8fb9da1
--- /dev/null
+++ b/ydb/library/actors/interconnect/poller/uring_poller_actor.h
@@ -0,0 +1,106 @@
+#pragma once
+
+#include <ydb/library/actors/interconnect/events/events.h>
+#include <ydb/library/actors/interconnect/uring_context.h>
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/util/rc_buf.h>
+
+#include "poller_actor.h"
+
+namespace NActors {
+
+ struct TEvUringRegister : TEventLocal<TEvUringRegister, ui32(ENetwork::EvUringRegister)> {
+ TIntrusivePtr<TSharedDescriptor> Socket;
+ TIntrusivePtr<TSharedDescriptor> XdcSocket;
+ TActorId ReadActorId;
+ TActorId WriteActorId;
+
+ TEvUringRegister(TIntrusivePtr<TSharedDescriptor> socket,
+ TIntrusivePtr<TSharedDescriptor> xdcSocket,
+ TActorId readActorId,
+ TActorId writeActorId)
+ : Socket(std::move(socket))
+ , XdcSocket(std::move(xdcSocket))
+ , ReadActorId(readActorId)
+ , WriteActorId(writeActorId)
+ {}
+ };
+
+ struct TEvUringRegisterResult : TEventLocal<TEvUringRegisterResult, ui32(ENetwork::EvUringRegisterResult)> {
+ TUringContext::TPtr Context;
+ ui16 MainRecvBufGroupId;
+ ui16 XdcRecvBufGroupId;
+
+ TEvUringRegisterResult(TUringContext::TPtr context, ui16 mainBufGid, ui16 xdcBufGid)
+ : Context(std::move(context))
+ , MainRecvBufGroupId(mainBufGid)
+ , XdcRecvBufGroupId(xdcBufGid)
+ {}
+ };
+
+ struct TEvUringWriteComplete : TEventLocal<TEvUringWriteComplete, ui32(ENetwork::EvUringWriteComplete)> {
+ i32 Result;
+ ui64 UserData;
+
+ TEvUringWriteComplete(i32 result, ui64 userData)
+ : Result(result)
+ , UserData(userData)
+ {}
+ };
+
+ struct TEvUringRecvComplete : TEventLocal<TEvUringRecvComplete, ui32(ENetwork::EvUringRecvComplete)> {
+ i32 Result;
+ ui32 Flags;
+ ui64 UserData;
+ TRcBuf Data;
+
+ TEvUringRecvComplete(i32 result, ui32 flags, ui64 userData)
+ : Result(result)
+ , Flags(flags)
+ , UserData(userData)
+ {}
+
+ TEvUringRecvComplete(i32 result, ui32 flags, ui64 userData, TRcBuf data)
+ : Result(result)
+ , Flags(flags)
+ , UserData(userData)
+ , Data(std::move(data))
+ {}
+ };
+
+ struct TEvUringSendZcNotif : TEventLocal<TEvUringSendZcNotif, ui32(ENetwork::EvUringSendZcNotif)> {
+ ui64 UserData;
+
+ explicit TEvUringSendZcNotif(ui64 userData)
+ : UserData(userData)
+ {}
+ };
+
+ // Sent by the output session on teardown so the reaper can release the session ring
+ // (exit the ring, release the registered socket files, free the buffer ring, close the
+ // eventfd). EventFd identifies the session ring in the reaper's maps.
+ struct TEvUringUnregister : TEventLocal<TEvUringUnregister, ui32(ENetwork::EvUringUnregister)> {
+ int EventFd;
+
+ explicit TEvUringUnregister(int eventFd)
+ : EventFd(eventFd)
+ {}
+ };
+
+ // Sent by the poller to the output session when io_uring setup failed: the session must
+ // fall back to registering its sockets with the epoll TPollerActor.
+ struct TEvUringRegisterFailed : TEventLocal<TEvUringRegisterFailed, ui32(ENetwork::EvUringRegisterFailed)> {
+ };
+
+#ifdef __linux__
+ IActor* CreateUringPollerActor(bool enableSqpoll);
+#else
+ inline IActor* CreateUringPollerActor(bool /*enableSqpoll*/) { return nullptr; }
+#endif
+
+ inline TActorId MakeUringPollerActorId() {
+ char x[12] = {'I', 'C', 'U', 'r', 'i', 'n', 'g', 'P', '\xDE', '\xAD', '\xBE', '\xEF'};
+ return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
+ }
+
+} // namespace NActors
diff --git a/ydb/library/actors/interconnect/poller/ya.make b/ydb/library/actors/interconnect/poller/ya.make
index 6cd1f20811d..9322bd96b40 100644
--- a/ydb/library/actors/interconnect/poller/ya.make
+++ b/ydb/library/actors/interconnect/poller/ya.make
@@ -27,6 +27,11 @@ IF (OS_LINUX)
SRCS(
poller_tcp_unit_epoll.cpp
poller_tcp_unit_epoll.h
+ uring_poller_actor.cpp
+ uring_poller_actor.h
+ )
+ PEERDIR(
+ contrib/libs/liburing
)
ENDIF()
diff --git a/ydb/library/actors/interconnect/uring_context.cpp b/ydb/library/actors/interconnect/uring_context.cpp
new file mode 100644
index 00000000000..b14fde1da65
--- /dev/null
+++ b/ydb/library/actors/interconnect/uring_context.cpp
@@ -0,0 +1,238 @@
+#include "uring_context.h"
+
+#include <liburing.h>
+#include <sys/eventfd.h>
+#include <unistd.h>
+
+namespace NActors {
+
+ TUringContext::TUringContext(TActorId writeActorId, TActorId readActorId)
+ : WriteActorId(writeActorId)
+ , ReadActorId(readActorId)
+ {
+ EventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+ Y_ABORT_UNLESS(EventFd >= 0, "eventfd() failed: %s", strerror(errno));
+ }
+
+ TUringContext::~TUringContext() {
+ if (Ring) {
+ io_uring_queue_exit(Ring.get());
+ }
+ if (EventFd >= 0) {
+ close(EventFd);
+ }
+ }
+
+ bool TUringContext::Init(int anchorWqFd, bool enableSqpoll) {
+ Ring = std::make_unique<struct io_uring>();
+
+ struct io_uring_params params = {};
+ // SQPOLL is opt-in (TInterconnectSettings::EnableUringSQPOLL, default off). When off,
+ // submissions are issued explicitly via io_uring_submit() (io_uring_enter) on the
+ // session mailbox thread, which makes the writev complete and signal the eventfd
+ // deterministically with no dependency on a kernel poll thread, and avoids a busy
+ // poll thread. When on, the anchor ring also has SQPOLL and we set SQPOLL + ATTACH_WQ
+ // here (wq_fd = anchor), so all session rings + the anchor share ONE kernel poll thread
+ // (not one per session); liburing's io_uring_submit() wakes an idle SQPOLL thread via
+ // IORING_SQ_NEED_WAKEUP. Note: SQPOLL was originally suspected of causing an idle
+ // keepalive meltdown, but that was traced to unrelated reaper bugs (see uring_poller_actor.cpp).
+ if (enableSqpoll) {
+ params.flags |= IORING_SETUP_SQPOLL;
+ params.sq_thread_idle = SqThreadIdleMs;
+ }
+ if (anchorWqFd >= 0) {
+ // Share the async io-wq backend with the anchor ring so we do not spawn a separate
+ // worker pool per session ring.
+ params.flags |= IORING_SETUP_ATTACH_WQ;
+ params.wq_fd = anchorWqFd;
+ }
+
+ int ret = io_uring_queue_init_params(DefaultQueueDepth, Ring.get(), &params);
+ if (ret < 0) {
+ Ring.reset();
+ return false;
+ }
+
+ ret = io_uring_register_eventfd(Ring.get(), EventFd);
+ if (ret < 0) {
+ io_uring_queue_exit(Ring.get());
+ Ring.reset();
+ return false;
+ }
+
+ return true;
+ }
+
+ int TUringContext::GetRingFd() const {
+ return Ring ? Ring->ring_fd : -1;
+ }
+
+ void TUringContext::RegisterFiles(int mainFd, int xdcFd) {
+ if (!Ring) {
+ return;
+ }
+ // Index 0 = main socket, index 1 = XDC socket (sparse if absent).
+ int fds[2] = {mainFd, xdcFd >= 0 ? xdcFd : -1};
+ int ret = io_uring_register_files(Ring.get(), fds, 2);
+ if (ret < 0) {
+ FixedFiles = false;
+ return;
+ }
+ MainFd = mainFd;
+ XdcFd = xdcFd;
+ FixedFiles = true;
+ }
+
+ bool TUringContext::SubmitWritev(int fd, const struct iovec* iov, unsigned iovcnt, ui64 seqNo, EUringOpTag tag) {
+ if (!Ring) {
+ return false;
+ }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get());
+ if (!sqe) {
+ ++SqeFull;
+ return false;
+ }
+
+ const int fixedIdx = FixedIndexForFd(fd);
+ if (fixedIdx >= 0) {
+ io_uring_prep_writev(sqe, fixedIdx, iov, iovcnt, 0);
+ sqe->flags |= IOSQE_FIXED_FILE;
+ } else {
+ io_uring_prep_writev(sqe, fd, iov, iovcnt, 0);
+ }
+ io_uring_sqe_set_data64(sqe, static_cast<ui64>(tag) | (seqNo & UringOpDataMask));
+ return true;
+ }
+
+ bool TUringContext::SubmitReadv(int fd, const struct iovec* iov, unsigned iovcnt, ui64 seqNo, EUringOpTag tag) {
+ if (!Ring) {
+ return false;
+ }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get());
+ if (!sqe) {
+ ++SqeFull;
+ return false;
+ }
+
+ const int fixedIdx = FixedIndexForFd(fd);
+ if (fixedIdx >= 0) {
+ io_uring_prep_readv(sqe, fixedIdx, iov, iovcnt, 0);
+ sqe->flags |= IOSQE_FIXED_FILE;
+ } else {
+ io_uring_prep_readv(sqe, fd, iov, iovcnt, 0);
+ }
+ io_uring_sqe_set_data64(sqe, static_cast<ui64>(tag) | (seqNo & UringOpDataMask));
+ return true;
+ }
+
+ bool TUringContext::SubmitSendZc(int fd, const void* buf, size_t len, ui64 seqNo) {
+ if (!Ring) {
+ return false;
+ }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get());
+ if (!sqe) {
+ ++SqeFull;
+ return false;
+ }
+
+ const int fixedIdx = FixedIndexForFd(fd);
+ if (fixedIdx >= 0) {
+ io_uring_prep_send_zc(sqe, fixedIdx, buf, len, 0, 0);
+ sqe->flags |= IOSQE_FIXED_FILE;
+ } else {
+ io_uring_prep_send_zc(sqe, fd, buf, len, 0, 0);
+ }
+ io_uring_sqe_set_data64(sqe, static_cast<ui64>(EUringOpTag::XdcSendZc) | (seqNo & UringOpDataMask));
+ return true;
+ }
+
+ bool TUringContext::SubmitRecvMultishot(int fd, ui16 bufGroupId, EUringOpTag tag) {
+ if (!Ring) {
+ return false;
+ }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get());
+ if (!sqe) {
+ ++SqeFull;
+ return false;
+ }
+
+ const int fixedIdx = FixedIndexForFd(fd);
+ if (fixedIdx >= 0) {
+ io_uring_prep_recv_multishot(sqe, fixedIdx, nullptr, 0, 0);
+ sqe->flags |= IOSQE_FIXED_FILE;
+ } else {
+ io_uring_prep_recv_multishot(sqe, fd, nullptr, 0, 0);
+ }
+ sqe->buf_group = bufGroupId;
+ sqe->flags |= IOSQE_BUFFER_SELECT;
+ io_uring_sqe_set_data64(sqe, static_cast<ui64>(tag));
+ return true;
+ }
+
+ bool TUringContext::SubmitCancelFd(int fd) {
+ if (!Ring) {
+ return false;
+ }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get());
+ if (!sqe) {
+ return false;
+ }
+
+ const int fixedIdx = FixedIndexForFd(fd);
+ if (fixedIdx >= 0) {
+ io_uring_prep_cancel_fd(sqe, fixedIdx, IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_FD_FIXED);
+ } else {
+ io_uring_prep_cancel_fd(sqe, fd, IORING_ASYNC_CANCEL_ALL);
+ }
+ io_uring_sqe_set_data64(sqe, static_cast<ui64>(EUringOpTag::CancelAll));
+ return true;
+ }
+
+ void TUringContext::Flush() {
+ if (!Ring) {
+ return;
+ }
+ // Number of SQEs queued but not yet submitted. A healthy submit returns exactly this
+ // many; anything less means the kernel refused part of the batch (e.g. -EBUSY on CQ
+ // overflow), which would silently strand a keepalive writev and starve the peer.
+ const unsigned expected = io_uring_sq_ready(Ring.get());
+ const int ret = io_uring_submit(Ring.get());
+ ++SubmitCalls;
+ LastSubmitRet = ret;
+ if (ret < 0) {
+ ++SubmitErrors;
+ } else if (static_cast<unsigned>(ret) < expected) {
+ ++SubmitPartials;
+ }
+ }
+
+ bool TUringContext::IsSupported() {
+ static const bool supported = [] {
+ // Probe a plain ring (without SQPOLL, which is opt-in and not required for the
+ // capability check) and require the provided-buffer-ring API, which is the hard
+ // dependency of the multishot recv data path. If SQPOLL is enabled but the kernel
+ // refuses the SQPOLL ring at Init time, the session falls back to epoll.
+ struct io_uring ring;
+ struct io_uring_params params = {};
+ int ret = io_uring_queue_init_params(8, &ring, &params);
+ if (ret != 0) {
+ return false;
+ }
+ int probeErr = 0;
+ struct io_uring_buf_ring* br = io_uring_setup_buf_ring(&ring, 1, /*bgid=*/0, 0, &probeErr);
+ const bool ok = (br != nullptr);
+ if (ok) {
+ io_uring_free_buf_ring(&ring, br, 1, /*bgid=*/0);
+ }
+ io_uring_queue_exit(&ring);
+ return ok;
+ }();
+ return supported;
+ }
+
+} // namespace NActors
diff --git a/ydb/library/actors/interconnect/uring_context.h b/ydb/library/actors/interconnect/uring_context.h
new file mode 100644
index 00000000000..0c776214899
--- /dev/null
+++ b/ydb/library/actors/interconnect/uring_context.h
@@ -0,0 +1,178 @@
+#pragma once
+
+#include <ydb/library/actors/core/actorid.h>
+#include <ydb/library/actors/interconnect/poller/poller.h>
+
+#include <util/generic/ptr.h>
+
+#include <atomic>
+#include <cstdint>
+
+#ifdef __linux__
+#include <sys/uio.h>
+
+struct io_uring;
+struct io_uring_buf_ring;
+#endif
+
+namespace NActors {
+
+ enum class EUringOpTag : ui64 {
+ MainWritev = 1ULL << 56,
+ XdcWritev = 2ULL << 56,
+ MainRecv = 3ULL << 56,
+ XdcRecv = 4ULL << 56,
+ XdcSendZc = 5ULL << 56,
+ XdcSendNotif = 6ULL << 56,
+ CancelAll = 7ULL << 56,
+ };
+
+ static constexpr ui64 UringOpTagMask = 0xFF00000000000000ULL;
+ static constexpr ui64 UringOpDataMask = ~UringOpTagMask;
+
+ class TUringContext : public TThrRefBase {
+ public:
+ static constexpr ui32 DefaultQueueDepth = 64;
+ static constexpr ui32 MaxPendingWrites = 32;
+ // SQPOLL kernel-thread idle window (ms) before it sleeps; only used when SQPOLL is on.
+ static constexpr ui32 SqThreadIdleMs = 2000;
+
+#ifdef __linux__
+ TUringContext(TActorId writeActorId, TActorId readActorId);
+ ~TUringContext();
+
+ bool Init(int anchorWqFd, bool enableSqpoll);
+ int GetEventFd() const { return EventFd; }
+ int GetRingFd() const;
+
+ // Register the session socket fds as fixed files (index 0 = main, 1 = XDC) to avoid
+ // per-I/O fget/fput. Submit helpers below transparently use IOSQE_FIXED_FILE afterwards.
+ void RegisterFiles(int mainFd, int xdcFd);
+
+ bool SubmitWritev(int fd, const struct iovec* iov, unsigned iovcnt, ui64 seqNo, EUringOpTag tag);
+ bool SubmitReadv(int fd, const struct iovec* iov, unsigned iovcnt, ui64 seqNo, EUringOpTag tag);
+ bool SubmitSendZc(int fd, const void* buf, size_t len, ui64 seqNo);
+ bool SubmitRecvMultishot(int fd, ui16 bufGroupId, EUringOpTag tag);
+ bool SubmitCancelFd(int fd);
+ void Flush();
+
+ // PendingWrites/PendingRecvs are incremented on the session mailbox thread (submission)
+ // and decremented on the reaper thread (completion), so they must be atomic.
+ ui32 GetPendingWrites() const { return PendingWrites.load(std::memory_order_relaxed); }
+ ui32 GetPendingRecvs() const { return PendingRecvs.load(std::memory_order_relaxed); }
+
+ // Diagnostic submit accounting (instrumentation for the idle-keepalive DeadPeer hunt).
+ // All updated on the session mailbox thread inside Flush()/submit helpers. Plain ui64 is
+ // fine: read for logging on the same thread, or torn-read-tolerant from another thread.
+ ui64 GetSubmitCalls() const { return SubmitCalls; }
+ ui64 GetSubmitErrors() const { return SubmitErrors; }
+ ui64 GetSubmitPartials() const { return SubmitPartials; }
+ i32 GetLastSubmitRet() const { return LastSubmitRet; }
+ ui64 GetSqeFull() const { return SqeFull; }
+ void IncrementPendingWrites() { PendingWrites.fetch_add(1, std::memory_order_relaxed); }
+ void DecrementPendingWrites() {
+ ui32 v = PendingWrites.load(std::memory_order_relaxed);
+ while (v > 0 && !PendingWrites.compare_exchange_weak(v, v - 1, std::memory_order_relaxed)) {}
+ }
+ void IncrementPendingRecvs() { PendingRecvs.fetch_add(1, std::memory_order_relaxed); }
+ void DecrementPendingRecvs() {
+ ui32 v = PendingRecvs.load(std::memory_order_relaxed);
+ while (v > 0 && !PendingRecvs.compare_exchange_weak(v, v - 1, std::memory_order_relaxed)) {}
+ }
+
+ TActorId GetWriteActorId() const { return WriteActorId; }
+ TActorId GetReadActorId() const { return ReadActorId; }
+
+ struct io_uring* GetRing() { return Ring.get(); }
+
+ static bool IsSupported();
+#else
+ TUringContext(TActorId, TActorId) {}
+ ~TUringContext() = default;
+
+ bool Init(int, bool) { return false; }
+ int GetEventFd() const { return -1; }
+ int GetRingFd() const { return -1; }
+
+ void RegisterFiles(int, int) {}
+
+ bool SubmitWritev(int, const void*, unsigned, ui64, EUringOpTag) { return false; }
+ bool SubmitReadv(int, const void*, unsigned, ui64, EUringOpTag) { return false; }
+ bool SubmitSendZc(int, const void*, size_t, ui64) { return false; }
+ bool SubmitRecvMultishot(int, ui16, EUringOpTag) { return false; }
+ bool SubmitCancelFd(int) { return false; }
+ void Flush() {}
+
+ ui32 GetPendingWrites() const { return 0; }
+ ui32 GetPendingRecvs() const { return 0; }
+ ui64 GetSubmitCalls() const { return 0; }
+ ui64 GetSubmitErrors() const { return 0; }
+ ui64 GetSubmitPartials() const { return 0; }
+ i32 GetLastSubmitRet() const { return 0; }
+ ui64 GetSqeFull() const { return 0; }
+ void IncrementPendingWrites() {}
+ void DecrementPendingWrites() {}
+ void IncrementPendingRecvs() {}
+ void DecrementPendingRecvs() {}
+
+ TActorId GetWriteActorId() const { return {}; }
+ TActorId GetReadActorId() const { return {}; }
+
+ void* GetRing() { return nullptr; }
+
+ static bool IsSupported() { return false; }
+#endif
+
+ using TPtr = TIntrusivePtr<TUringContext>;
+
+ private:
+#ifdef __linux__
+ // Translate a raw socket fd to its registered fixed-file index, or -1 if fixed
+ // files are not in use for this fd.
+ int FixedIndexForFd(int fd) const {
+ if (!FixedFiles) {
+ return -1;
+ }
+ if (fd == MainFd) {
+ return 0;
+ }
+ if (fd == XdcFd) {
+ return 1;
+ }
+ return -1;
+ }
+
+ std::unique_ptr<struct io_uring> Ring;
+ int EventFd = -1;
+ TActorId WriteActorId;
+ TActorId ReadActorId;
+ std::atomic<ui32> PendingWrites{0};
+ std::atomic<ui32> PendingRecvs{0};
+ // Diagnostic submit accounting (see getters above). Mutated only on the session mailbox
+ // thread; read elsewhere only for best-effort logging, so no synchronization is needed.
+ ui64 SubmitCalls = 0; // number of io_uring_submit() calls via Flush()
+ ui64 SubmitErrors = 0; // io_uring_submit() returned < 0
+ ui64 SubmitPartials = 0; // io_uring_submit() returned fewer than the queued SQE count
+ i32 LastSubmitRet = 0; // last io_uring_submit() return value
+ ui64 SqeFull = 0; // io_uring_get_sqe() returned nullptr (SQ full) in a submit helper
+ int MainFd = -1;
+ int XdcFd = -1;
+ bool FixedFiles = false;
+#endif
+ };
+
+ class TEventFdWrapper : public TSharedDescriptor {
+ public:
+ TEventFdWrapper(int fd)
+ : Fd(fd)
+ {}
+
+ int GetDescriptor() override {
+ return Fd;
+ }
+
+ private:
+ int Fd;
+ };
+
+} // namespace NActors
diff --git a/ydb/library/actors/interconnect/uring_recv_buffer_pool.h b/ydb/library/actors/interconnect/uring_recv_buffer_pool.h
new file mode 100644
index 00000000000..a1d436e815f
--- /dev/null
+++ b/ydb/library/actors/interconnect/uring_recv_buffer_pool.h
@@ -0,0 +1,174 @@
+#pragma once
+
+#include <ydb/library/actors/util/rc_buf.h>
+
+#include <util/system/mutex.h>
+#include <util/system/guard.h>
+#include <util/generic/vector.h>
+
+#include <liburing.h>
+#include <cstdlib>
+#include <cstring>
+#include <memory>
+
+namespace NActors {
+
+ // Provided-buffer ring for io_uring multishot recv with true zero-copy delivery.
+ //
+ // The kernel fills one of the registered buffers and reports its index in the CQE.
+ // Instead of copying the bytes out, we hand the buffer to the input session wrapped
+ // in a TRcBuf backed by TUringRecvBufferChunk. When the last reference to that TRcBuf
+ // is dropped (possibly on an arbitrary worker thread, since the event payload outlives
+ // the input session), the chunk pushes the buffer index onto a thread-safe freelist.
+ // The reaper thread drains that freelist and re-adds the buffers to the kernel ring
+ // (DrainFreelist), keeping all io_uring_buf_ring_* calls on a single thread.
+ class TUringRecvBufferPool {
+ public:
+ static constexpr ui32 NumBuffers = 256;
+ static constexpr ui32 BufferSize = 16384;
+
+ // Shared between the pool (reaper-owned) and any outstanding chunks. Keeps the
+ // backing memory alive until the last TRcBuf referencing it is released, even if
+ // the pool/session is torn down first.
+ struct TCore : public TThrRefBase {
+ char* Memory = nullptr;
+ TMutex FreeLock;
+ TVector<ui16> FreeList; // indices released by consumers, awaiting re-add to the ring
+
+ ~TCore() {
+ free(Memory);
+ }
+
+ char* GetBuffer(ui16 index) const {
+ return Memory + static_cast<size_t>(index) * BufferSize;
+ }
+
+ void Release(ui16 index) {
+ with_lock (FreeLock) {
+ FreeList.push_back(index);
+ }
+ }
+ };
+
+ class TUringRecvBufferChunk : public IContiguousChunk {
+ public:
+ TUringRecvBufferChunk(TIntrusivePtr<TCore> core, ui16 index, ui32 len)
+ : Core(std::move(core))
+ , Index(index)
+ , Len(len)
+ {}
+
+ ~TUringRecvBufferChunk() override {
+ Core->Release(Index);
+ }
+
+ TContiguousSpan GetData() const override {
+ return {Core->GetBuffer(Index), Len};
+ }
+
+ TMutableContiguousSpan UnsafeGetDataMut() override {
+ return {Core->GetBuffer(Index), Len};
+ }
+
+ size_t GetOccupiedMemorySize() const override {
+ return BufferSize;
+ }
+
+ IContiguousChunk::TPtr Clone() override {
+ return MakeIntrusive<TOwnedCopyChunk>(TString(Core->GetBuffer(Index), Len));
+ }
+
+ private:
+ // Independent heap copy used when a private mutable copy is required.
+ class TOwnedCopyChunk : public IContiguousChunk {
+ public:
+ explicit TOwnedCopyChunk(TString data)
+ : Data(std::move(data))
+ {}
+ TContiguousSpan GetData() const override { return {Data.data(), Data.size()}; }
+ TMutableContiguousSpan UnsafeGetDataMut() override { return {Data.Detach(), Data.size()}; }
+ size_t GetOccupiedMemorySize() const override { return Data.capacity(); }
+ IContiguousChunk::TPtr Clone() override { return MakeIntrusive<TOwnedCopyChunk>(Data); }
+ private:
+ TString Data;
+ };
+
+ TIntrusivePtr<TCore> Core;
+ ui16 Index;
+ ui32 Len;
+ };
+
+ TUringRecvBufferPool() = default;
+
+ ~TUringRecvBufferPool() {
+ if (BufRing) {
+ io_uring_free_buf_ring(Ring, BufRing, NumBuffers, BufGroupId);
+ }
+ // Core (and its backing memory) is kept alive by any outstanding chunks.
+ }
+
+ bool Init(struct io_uring* ring, ui16 bufGroupId) {
+ Ring = ring;
+ BufGroupId = bufGroupId;
+
+ Core = MakeIntrusive<TCore>();
+ Core->Memory = static_cast<char*>(aligned_alloc(4096, NumBuffers * BufferSize));
+ if (!Core->Memory) {
+ Core.Reset();
+ return false;
+ }
+ memset(Core->Memory, 0, NumBuffers * BufferSize);
+
+ int ret = 0;
+ BufRing = io_uring_setup_buf_ring(ring, NumBuffers, bufGroupId, 0, &ret);
+ if (!BufRing) {
+ Core.Reset();
+ return false;
+ }
+
+ for (ui32 i = 0; i < NumBuffers; ++i) {
+ io_uring_buf_ring_add(BufRing, Core->GetBuffer(i), BufferSize, i,
+ io_uring_buf_ring_mask(NumBuffers), i);
+ }
+ io_uring_buf_ring_advance(BufRing, NumBuffers);
+
+ return true;
+ }
+
+ ui16 GetBufGroupId() const {
+ return BufGroupId;
+ }
+
+ // Zero-copy: wrap the kernel-filled buffer; recycled when the TRcBuf is released.
+ TRcBuf WrapBuffer(ui16 index, ui32 len) {
+ return TRcBuf(MakeIntrusive<TUringRecvBufferChunk>(Core, index, len));
+ }
+
+ // Called on the reaper thread: return consumer-released buffers to the kernel ring.
+ void DrainFreelist() {
+ if (!BufRing || !Core) {
+ return;
+ }
+ TVector<ui16> local;
+ with_lock (Core->FreeLock) {
+ local.swap(Core->FreeList);
+ }
+ if (local.empty()) {
+ return;
+ }
+ const int mask = io_uring_buf_ring_mask(NumBuffers);
+ for (size_t j = 0; j < local.size(); ++j) {
+ const ui16 idx = local[j];
+ io_uring_buf_ring_add(BufRing, Core->GetBuffer(idx), BufferSize, idx, mask, j);
+ }
+ io_uring_buf_ring_advance(BufRing, local.size());
+ }
+
+ private:
+ struct io_uring* Ring = nullptr;
+ struct io_uring_buf_ring* BufRing = nullptr;
+ TIntrusivePtr<TCore> Core;
+ ui16 BufGroupId = 0;
+ };
+
+} // namespace NActors
diff --git a/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h b/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h
index 5f95b246e76..dfae5616fb0 100644
--- a/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h
+++ b/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h
@@ -24,6 +24,8 @@ public:
USE_TLS = 1 << 1,
RDMA_POLLING_CQ = 1 << 2,
DISABLE_RDMA = 1 << 3,
+ USE_URING = 1 << 4,
+ USE_URING_SQPOLL = 1 << 5, // implies USE_URING + EnableUringSQPOLL
};
using TCheckerFactory = std::function<IActor*(ui32)>;
@@ -80,6 +82,18 @@ public:
}
}
+ auto effectiveSettingsCustomizer = [flags, settingsCustomizer](ui32 nodeId, NActors::TInterconnectSettings& settings) {
+ if (flags & (USE_URING | USE_URING_SQPOLL)) {
+ settings.UseUring = true;
+ }
+ if (flags & USE_URING_SQPOLL) {
+ settings.EnableUringSQPOLL = true;
+ }
+ if (settingsCustomizer) {
+ settingsCustomizer(nodeId, settings);
+ }
+ };
+
for (ui32 i = 1; i <= NumNodes; ++i) {
auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap;
Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig,
@@ -87,7 +101,7 @@ public:
flags & USE_ZC ? ESocketSendOptimization::IC_MSG_ZEROCOPY : ESocketSendOptimization::DISABLED,
flags & USE_TLS, checkerFactory, flags & RDMA_POLLING_CQ ? NInterconnect::NRdma::ECqMode::POLLING : NInterconnect::NRdma::ECqMode::EVENT,
!(flags & DISABLE_RDMA),
- settingsCustomizer,
+ effectiveSettingsCustomizer,
LogBackendFactory));
}
}
diff --git a/ydb/library/actors/interconnect/ut/lib/node.h b/ydb/library/actors/interconnect/ut/lib/node.h
index b5796eb17b9..781e09610b7 100644
--- a/ydb/library/actors/interconnect/ut/lib/node.h
+++ b/ydb/library/actors/interconnect/ut/lib/node.h
@@ -11,6 +11,7 @@
#include <ydb/library/actors/interconnect/interconnect_tcp_server.h>
#include <ydb/library/actors/interconnect/interconnect_tcp_proxy.h>
#include <ydb/library/actors/interconnect/interconnect_proxy_wrapper.h>
+#include <ydb/library/actors/interconnect/poller/uring_poller_actor.h>
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
#include <ydb/library/actors/interconnect/rdma/cq_actor/cq_actor.h>
@@ -109,6 +110,10 @@ public:
setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(counters),
TMailboxType::ReadAsFilled, 0));
+ if (common->Settings.UseUring && TUringContext::IsSupported()) {
+ setup.LocalServices.emplace_back(MakeUringPollerActorId(), TActorSetupCmd(CreateUringPollerActor(common->Settings.EnableUringSQPOLL),
+ TMailboxType::ReadAsFilled, 0));
+ }
setup.LocalServices.emplace_back(NInterconnect::NRdma::MakeCqActorId(),
TActorSetupCmd(NInterconnect::NRdma::CreateCqActor(-1, 1024, rdmaCqMode, nullptr),
TMailboxType::ReadAsFilled, 0));
diff --git a/ydb/library/actors/interconnect/ut/uring_ut.cpp b/ydb/library/actors/interconnect/ut/uring_ut.cpp
new file mode 100644
index 00000000000..ffc24d1a712
--- /dev/null
+++ b/ydb/library/actors/interconnect/ut/uring_ut.cpp
@@ -0,0 +1,499 @@
+#include <ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h>
+#include <ydb/library/actors/interconnect/uring_context.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <atomic>
+
+using namespace NActors;
+
+Y_UNIT_TEST_SUITE(InterconnectUring) {
+
+ // Every uring data-path test runs in three modes so the SQPOLL path stays covered:
+ // *_NoUring - epoll backend (UseUring off): a control that the test itself is sound
+ // *_Uring - io_uring, explicit submit (UseUring on, SQPOLL off; the default)
+ // *_UringSqpoll - io_uring with the shared SQPOLL kernel poll thread (EnableUringSQPOLL on)
+ // The bodies are extracted into Do<Name>(mode) and emitted via URING_TEST_3MODES.
+
+ // Skip only the uring variants when the kernel lacks io_uring; the no-uring variant always
+ // runs. (If SQPOLL itself is unavailable the session falls back to epoll, so the SQPOLL
+ // variant still passes functionally.)
+ static bool UringUnavailable(TTestICCluster::Flags mode) {
+ const bool wantUring = mode & (TTestICCluster::USE_URING | TTestICCluster::USE_URING_SQPOLL);
+ if (wantUring && !TUringContext::IsSupported()) {
+ Cerr << "io_uring not supported on this kernel, skipping" << Endl;
+ return true;
+ }
+ return false;
+ }
+
+ static TTestICCluster::Flags WithMode(TTestICCluster::Flags base, TTestICCluster::Flags mode) {
+ return static_cast<TTestICCluster::Flags>(base | mode);
+ }
+
+#define URING_TEST_3MODES(Name) \
+ Y_UNIT_TEST(Name##_NoUring) { Do##Name(TTestICCluster::EMPTY); } \
+ Y_UNIT_TEST(Name##_Uring) { Do##Name(TTestICCluster::USE_URING); } \
+ Y_UNIT_TEST(Name##_UringSqpoll) { Do##Name(static_cast<TTestICCluster::Flags>(TTestICCluster::USE_URING | TTestICCluster::USE_URING_SQPOLL)); }
+
+ Y_UNIT_TEST(UringSupported) {
+ // Just verify the probe runs without crashing
+ [[maybe_unused]] bool supported = TUringContext::IsSupported();
+ }
+
+ void DoBasicSendRecv(TTestICCluster::Flags mode) {
+ if (UringUnavailable(mode)) {
+ return;
+ }
+
+ TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode);
+
+ // Simple ping-pong: send an event from node 1 to node 2 and get a response
+ class TPingActor : public TActorBootstrapped<TPingActor> {
+ public:
+ TPingActor(TActorId target, std::atomic<int>& counter)
+ : Target(target)
+ , Counter(counter)
+ {}
+
+ void Bootstrap() {
+ Send(Target, new TEvents::TEvPing);
+ Become(&TPingActor::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ cFunc(TEvents::TEvPong::EventType, HandlePong)
+ )
+
+ void HandlePong() {
+ Counter.fetch_add(1);
+ PassAway();
+ }
+
+ private:
+ TActorId Target;
+ std::atomic<int>& Counter;
+ };
+
+ class TPongActor : public TActorBootstrapped<TPongActor> {
+ public:
+ void Bootstrap() {
+ Become(&TPongActor::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvPing, Handle)
+ )
+
+ void Handle(TEvents::TEvPing::TPtr& ev) {
+ Send(ev->Sender, new TEvents::TEvPong);
+ }
+ };
+
+ std::atomic<int> counter{0};
+
+ TActorId pongId = cluster.RegisterActor(new TPongActor, 2);
+ cluster.RegisterActor(new TPingActor(pongId, counter), 1);
+
+ const TInstant deadline = TInstant::Now() + TDuration::Seconds(10);
+ while (counter.load() == 0 && TInstant::Now() < deadline) {
+ Sleep(TDuration::MilliSeconds(50));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(counter.load(), 1);
+ }
+ URING_TEST_3MODES(BasicSendRecv)
+
+ void DoLargeMessage(TTestICCluster::Flags mode) {
+ if (UringUnavailable(mode)) {
+ return;
+ }
+
+ TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode);
+
+ class TReceiver : public TActorBootstrapped<TReceiver> {
+ public:
+ TReceiver(std::atomic<size_t>& bytesReceived)
+ : BytesReceived(bytesReceived)
+ {}
+
+ void Bootstrap() {
+ Become(&TReceiver::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvBlob, Handle)
+ )
+
+ void Handle(TEvents::TEvBlob::TPtr& ev) {
+ BytesReceived.fetch_add(ev->Get()->Blob.size());
+ }
+
+ private:
+ std::atomic<size_t>& BytesReceived;
+ };
+
+ std::atomic<size_t> bytesReceived{0};
+ const size_t messageSize = 1024 * 1024; // 1 MB
+
+ TActorId receiverId = cluster.RegisterActor(new TReceiver(bytesReceived), 2);
+
+ // Send a large blob from node 1 to node 2
+ TString data(messageSize, 'X');
+ cluster.GetNode(1)->Send(receiverId, new TEvents::TEvBlob(data));
+
+ const TInstant deadline = TInstant::Now() + TDuration::Seconds(30);
+ while (bytesReceived.load() < messageSize && TInstant::Now() < deadline) {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(bytesReceived.load(), messageSize);
+ }
+ URING_TEST_3MODES(LargeMessage)
+
+ void DoMultipleMessages(TTestICCluster::Flags mode) {
+ if (UringUnavailable(mode)) {
+ return;
+ }
+
+ TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode);
+
+ class TCountingReceiver : public TActorBootstrapped<TCountingReceiver> {
+ public:
+ TCountingReceiver(std::atomic<int>& counter)
+ : Counter(counter)
+ {}
+
+ void Bootstrap() {
+ Become(&TCountingReceiver::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ cFunc(TEvents::TEvPing::EventType, HandlePing)
+ )
+
+ void HandlePing() {
+ Counter.fetch_add(1);
+ }
+
+ private:
+ std::atomic<int>& Counter;
+ };
+
+ std::atomic<int> counter{0};
+ const int numMessages = 1000;
+
+ TActorId receiverId = cluster.RegisterActor(new TCountingReceiver(counter), 2);
+
+ for (int i = 0; i < numMessages; ++i) {
+ cluster.GetNode(1)->Send(receiverId, new TEvents::TEvPing);
+ }
+
+ const TInstant deadline = TInstant::Now() + TDuration::Seconds(30);
+ while (counter.load() < numMessages && TInstant::Now() < deadline) {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(counter.load(), numMessages);
+ }
+ URING_TEST_3MODES(MultipleMessages)
+
+ void DoSustainedStream(TTestICCluster::Flags mode) {
+ // Stream a large amount of data through many messages to force the provided-buffer
+ // recv ring to recycle buffers repeatedly (and exercise the ENOBUFS re-arm path).
+ if (UringUnavailable(mode)) {
+ return;
+ }
+
+ TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode);
+
+ class TReceiver : public TActorBootstrapped<TReceiver> {
+ public:
+ TReceiver(std::atomic<size_t>& bytesReceived)
+ : BytesReceived(bytesReceived)
+ {}
+ void Bootstrap() { Become(&TReceiver::StateFunc); }
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvBlob, Handle)
+ )
+ void Handle(TEvents::TEvBlob::TPtr& ev) {
+ BytesReceived.fetch_add(ev->Get()->Blob.size());
+ }
+ private:
+ std::atomic<size_t>& BytesReceived;
+ };
+
+ std::atomic<size_t> bytesReceived{0};
+ const size_t messageSize = 64 * 1024;
+ const int numMessages = 1000;
+ const size_t totalBytes = messageSize * numMessages;
+
+ TActorId receiverId = cluster.RegisterActor(new TReceiver(bytesReceived), 2);
+
+ TString data(messageSize, 'Z');
+ for (int i = 0; i < numMessages; ++i) {
+ cluster.GetNode(1)->Send(receiverId, new TEvents::TEvBlob(data));
+ }
+
+ const TInstant deadline = TInstant::Now() + TDuration::Seconds(60);
+ while (bytesReceived.load() < totalBytes && TInstant::Now() < deadline) {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(bytesReceived.load(), totalBytes);
+ }
+ URING_TEST_3MODES(SustainedStream)
+
+ void DoZeroCopyLargeMessages(TTestICCluster::Flags mode) {
+ // Exercise the io_uring IORING_OP_SEND_ZC path (XDC) together with the NOTIF-gated
+ // buffer lifetime. Content is verified to catch any premature buffer reuse.
+ if (UringUnavailable(mode)) {
+ return;
+ }
+
+ TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr,
+ WithMode(TTestICCluster::USE_ZC, mode));
+
+ class TVerifier : public TActorBootstrapped<TVerifier> {
+ public:
+ TVerifier(std::atomic<int>& ok, std::atomic<int>& bad, char fill)
+ : Ok(ok)
+ , Bad(bad)
+ , Fill(fill)
+ {}
+ void Bootstrap() { Become(&TVerifier::StateFunc); }
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvBlob, Handle)
+ )
+ void Handle(TEvents::TEvBlob::TPtr& ev) {
+ const TString& blob = ev->Get()->Blob;
+ bool good = true;
+ for (char c : blob) {
+ if (c != Fill) {
+ good = false;
+ break;
+ }
+ }
+ (good ? Ok : Bad).fetch_add(1);
+ }
+ private:
+ std::atomic<int>& Ok;
+ std::atomic<int>& Bad;
+ const char Fill;
+ };
+
+ std::atomic<int> ok{0};
+ std::atomic<int> bad{0};
+ const char fill = 'Q';
+ const size_t messageSize = 512 * 1024;
+ const int numMessages = 50;
+
+ TActorId receiverId = cluster.RegisterActor(new TVerifier(ok, bad, fill), 2);
+
+ TString data(messageSize, fill);
+ for (int i = 0; i < numMessages; ++i) {
+ cluster.GetNode(1)->Send(receiverId, new TEvents::TEvBlob(data));
+ }
+
+ const TInstant deadline = TInstant::Now() + TDuration::Seconds(60);
+ while (ok.load() + bad.load() < numMessages && TInstant::Now() < deadline) {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(bad.load(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(ok.load(), numMessages);
+ }
+ URING_TEST_3MODES(ZeroCopyLargeMessages)
+
+ void DoSessionChurnReconnect(TTestICCluster::Flags mode) {
+ // Repeatedly tear the session down (TEvPoisonSession) and verify the connection
+ // re-establishes and delivers every round. Before the reaper learned to release
+ // session rings, each churn leaked an io_uring fd + eventfd + buffer ring and held
+ // the socket open, so after a handful of cycles the node could no longer connect
+ // (fd exhaustion / anchor-SQ abort) — exactly the cluster meltdown this guards.
+ if (UringUnavailable(mode)) {
+ return;
+ }
+
+ TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode);
+
+ class TPongActor : public TActorBootstrapped<TPongActor> {
+ public:
+ void Bootstrap() { Become(&TPongActor::StateFunc); }
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvPing, Handle)
+ )
+ void Handle(TEvents::TEvPing::TPtr& ev) {
+ Send(ev->Sender, new TEvents::TEvPong);
+ }
+ };
+
+ class TPingActor : public TActorBootstrapped<TPingActor> {
+ public:
+ TPingActor(TActorId target, std::atomic<int>& counter)
+ : Target(target)
+ , Counter(counter)
+ {}
+ void Bootstrap() {
+ Send(Target, new TEvents::TEvPing, IEventHandle::FlagTrackDelivery);
+ Become(&TPingActor::StateFunc);
+ }
+ STRICT_STFUNC(StateFunc,
+ cFunc(TEvents::TEvPong::EventType, HandlePong)
+ )
+ void HandlePong() {
+ Counter.fetch_add(1);
+ PassAway();
+ }
+ private:
+ TActorId Target;
+ std::atomic<int>& Counter;
+ };
+
+ TActorId pongId = cluster.RegisterActor(new TPongActor, 2);
+
+ const int rounds = 40;
+ for (int round = 0; round < rounds; ++round) {
+ std::atomic<int> counter{0};
+ cluster.RegisterActor(new TPingActor(pongId, counter), 1);
+
+ const TInstant deadline = TInstant::Now() + TDuration::Seconds(20);
+ while (counter.load() == 0 && TInstant::Now() < deadline) {
+ Sleep(TDuration::MilliSeconds(20));
+ }
+ UNIT_ASSERT_C(counter.load() == 1,
+ "round " << round << ": ping/pong did not complete (reconnect failed?)");
+
+ // Force a session teardown so the next round must reconnect over a fresh ring.
+ cluster.GetNode(1)->Send(cluster.InterconnectProxy(2, 1), new TEvInterconnect::TEvPoisonSession);
+ Sleep(TDuration::MilliSeconds(50));
+ }
+ }
+ URING_TEST_3MODES(SessionChurnReconnect)
+
+ void DoIdleKeepAlive(TTestICCluster::Flags mode) {
+ // Regression test for the io_uring idle-keepalive (DeadPeer) cluster meltdown.
+ //
+ // An idle connection exchanges nothing but periodic keepalive pings (every PingPeriod,
+ // default 3s). The original cluster meltdown was traced to reaper bugs (eventfd never
+ // registered, the anchor poll going silent after one fire) that starved completions on
+ // idle sessions until the peer declared DeadPeer. This guards that an idle session is
+ // kept alive across several ping cycles in every backend mode (epoll, uring, uring+SQPOLL).
+ //
+ // The settings below make this deterministic: PingPeriod (3s) < DeadPeer (6s) so a
+ // healthy connection (one whose pings keep flowing) is never torn down. We subscribe to
+ // the peer (establishing a session but sending no payload) and sit idle across several
+ // ping/DeadPeer cycles, asserting the session is never declared dead.
+ if (UringUnavailable(mode)) {
+ return;
+ }
+
+ TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr,
+ mode, /*checkerFactory=*/{}, /*deadPeerTimeout=*/TDuration::Seconds(6));
+
+ class TConnSubscriber : public TActorBootstrapped<TConnSubscriber> {
+ public:
+ TConnSubscriber(ui32 peerNodeId, std::atomic<int>& connects, std::atomic<int>& disconnects)
+ : PeerNodeId(peerNodeId)
+ , Connects(connects)
+ , Disconnects(disconnects)
+ {}
+ void Bootstrap() {
+ Become(&TConnSubscriber::StateFunc);
+ Send(TActivationContext::InterconnectProxy(PeerNodeId), new TEvents::TEvSubscribe);
+ }
+ STRICT_STFUNC(StateFunc,
+ cFunc(TEvInterconnect::TEvNodeConnected::EventType, HandleConnected)
+ cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected)
+ )
+ void HandleConnected() { Connects.fetch_add(1); }
+ void HandleDisconnected() { Disconnects.fetch_add(1); }
+ private:
+ const ui32 PeerNodeId;
+ std::atomic<int>& Connects;
+ std::atomic<int>& Disconnects;
+ };
+
+ std::atomic<int> connects{0};
+ std::atomic<int> disconnects{0};
+
+ cluster.RegisterActor(new TConnSubscriber(2, connects, disconnects), 1);
+
+ // Wait for the session to come up.
+ const TInstant connectDeadline = TInstant::Now() + TDuration::Seconds(10);
+ while (connects.load() == 0 && TInstant::Now() < connectDeadline) {
+ Sleep(TDuration::MilliSeconds(20));
+ }
+ UNIT_ASSERT_C(connects.load() >= 1, "session never connected");
+
+ // Baseline right before going idle, then stay idle across several ping (3s) and
+ // DeadPeer (6s) cycles. A healthy keepalive keeps the session up the whole time.
+ const int connectsBaseline = connects.load();
+ const int disconnectsBaseline = disconnects.load();
+
+ Sleep(TDuration::Seconds(16));
+
+ UNIT_ASSERT_C(disconnects.load() == disconnectsBaseline,
+ "session was torn down while idle (keepalive stall): disconnects "
+ << disconnectsBaseline << " -> " << disconnects.load());
+ UNIT_ASSERT_C(connects.load() == connectsBaseline,
+ "session reconnected while idle (keepalive stall): connects "
+ << connectsBaseline << " -> " << connects.load());
+ }
+ URING_TEST_3MODES(IdleKeepAlive)
+
+ Y_UNIT_TEST(FallbackToEpollWithEncryption) {
+ // When TLS is enabled, even with UseUring=true, the session should
+ // fall back to the epoll path. This test verifies no crash occurs.
+ if (!TUringContext::IsSupported()) {
+ Cerr << "io_uring not supported on this kernel, skipping" << Endl;
+ return;
+ }
+
+ auto flags = static_cast<TTestICCluster::Flags>(
+ TTestICCluster::USE_URING | TTestICCluster::USE_TLS);
+ TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, flags);
+
+ std::atomic<int> counter{0};
+
+ class TPongActor : public TActorBootstrapped<TPongActor> {
+ public:
+ void Bootstrap() { Become(&TPongActor::StateFunc); }
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvPing, Handle)
+ )
+ void Handle(TEvents::TEvPing::TPtr& ev) {
+ Send(ev->Sender, new TEvents::TEvPong);
+ }
+ };
+
+ class TPingActor : public TActorBootstrapped<TPingActor> {
+ public:
+ TPingActor(TActorId target, std::atomic<int>& c)
+ : Target(target), Counter(c) {}
+ void Bootstrap() {
+ Send(Target, new TEvents::TEvPing);
+ Become(&TPingActor::StateFunc);
+ }
+ STRICT_STFUNC(StateFunc,
+ cFunc(TEvents::TEvPong::EventType, HandlePong)
+ )
+ void HandlePong() {
+ Counter.fetch_add(1);
+ PassAway();
+ }
+ private:
+ TActorId Target;
+ std::atomic<int>& Counter;
+ };
+
+ TActorId pongId = cluster.RegisterActor(new TPongActor, 2);
+ cluster.RegisterActor(new TPingActor(pongId, counter), 1);
+
+ const TInstant deadline = TInstant::Now() + TDuration::Seconds(10);
+ while (counter.load() == 0 && TInstant::Now() < deadline) {
+ Sleep(TDuration::MilliSeconds(50));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(counter.load(), 1);
+ }
+}
diff --git a/ydb/library/actors/interconnect/ut/ya.make b/ydb/library/actors/interconnect/ut/ya.make
index 3d073111acc..2c75a693b06 100644
--- a/ydb/library/actors/interconnect/ut/ya.make
+++ b/ydb/library/actors/interconnect/ut/ya.make
@@ -19,6 +19,7 @@ SRCS(
poller_actor_ut.cpp
dynamic_proxy_ut.cpp
sticking_ut.cpp
+ uring_ut.cpp
)
PEERDIR(
diff --git a/ydb/library/actors/interconnect/ya.make b/ydb/library/actors/interconnect/ya.make
index 6291eeba70f..b1ee9208ca6 100644
--- a/ydb/library/actors/interconnect/ya.make
+++ b/ydb/library/actors/interconnect/ya.make
@@ -58,6 +58,17 @@ SRCS(
watchdog_timer.h
)
+IF (OS_LINUX)
+ SRCS(
+ uring_context.cpp
+ uring_context.h
+ uring_recv_buffer_pool.h
+ )
+ PEERDIR(
+ contrib/libs/liburing
+ )
+ENDIF()
+
PEERDIR(
contrib/libs/libc_compat
contrib/libs/openssl
diff --git a/ydb/tools/stress_tool/device_test_tool.cpp b/ydb/tools/stress_tool/device_test_tool.cpp
index 9b147bd6d50..82da4da11e2 100644
--- a/ydb/tools/stress_tool/device_test_tool.cpp
+++ b/ydb/tools/stress_tool/device_test_tool.cpp
@@ -167,6 +167,9 @@ int main(int argc, char **argv) {
.RequiredArgument("PORT").StoreResult(&icPort).Hidden();
opts.AddLongOption("num-server-devices", "number of devices per server (default 1)")
.RequiredArgument("N").DefaultValue("1").Hidden();
+ bool useUring = false;
+ opts.AddLongOption("use-uring", "use io_uring transport for interconnect instead of epoll (Linux 5.19+)")
+ .StoreTrue(&useUring).NoArgument().Hidden();
{
const size_t kCol = 26;
@@ -190,7 +193,9 @@ int main(int argc, char **argv) {
<< row("ic-port PORT",
"interconnect port for server to listen on (server only)")
<< row("num-server-devices N",
- "number of devices per server (default 1)");
+ "number of devices per server (default 1)")
+ << row("use-uring",
+ "use io_uring transport for interconnect instead of epoll (Linux 5.19+)");
opts.AddSection("DDisk client/server options", ddiskHelp);
}
@@ -293,7 +298,7 @@ int main(int argc, char **argv) {
if (serverMode) {
InstallServerSignalHandler();
auto printer = MakeIntrusive<NKikimr::TResultPrinter>(config.OutputFormat, config.RunCount);
- THolder<NKikimr::TPerfTest> test(new NKikimr::TDDiskServer<>(config, testProto, serverNodeId, clientNodeId, icPort));
+ THolder<NKikimr::TPerfTest> test(new NKikimr::TDDiskServer<>(config, testProto, serverNodeId, clientNodeId, icPort, useUring));
test->SetPrinter(printer);
test->RunTest();
return 0;
@@ -330,7 +335,7 @@ int main(int argc, char **argv) {
overrideDDiskInFlight(testProto, inFlight);
for (ui32 run = 0; run < config.RunCount; ++run) {
THolder<NKikimr::TPerfTest> test(
- new NKikimr::TDDiskClient(config, testProto, clientNodeId, serverPeers, numServerDevices));
+ new NKikimr::TDDiskClient(config, testProto, clientNodeId, serverPeers, numServerDevices, useUring));
test->SetPrinter(printer);
test->RunTest();
}
@@ -338,7 +343,7 @@ int main(int argc, char **argv) {
} else {
for (ui32 run = 0; run < config.RunCount; ++run) {
THolder<NKikimr::TPerfTest> test(
- new NKikimr::TDDiskClient(config, testProto, clientNodeId, serverPeers, numServerDevices));
+ new NKikimr::TDDiskClient(config, testProto, clientNodeId, serverPeers, numServerDevices, useUring));
test->SetPrinter(printer);
test->RunTest();
}
diff --git a/ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h b/ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h
index cc8a063e56c..2de9fe09452 100644
--- a/ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h
+++ b/ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h
@@ -11,6 +11,7 @@
#include <ydb/library/actors/interconnect/interconnect_proxy_wrapper.h>
#include <ydb/library/actors/interconnect/handshake_broker.h>
#include <ydb/library/actors/interconnect/poller/poller_actor.h>
+#include <ydb/library/actors/interconnect/poller/uring_poller_actor.h>
#include <util/system/event.h>
@@ -27,7 +28,7 @@ struct TInterconnectPeer {
};
static TInterconnectProxyCommon::TPtr MakeInterconnectCommon(
- const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, ui32 selfNodeId) {
+ const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, ui32 selfNodeId, bool useUring = false) {
auto common = MakeIntrusive<TInterconnectProxyCommon>();
common->NameserviceId = GetNameserviceActorId();
common->MonCounters = counters->GetSubgroup("nodeId", ToString(selfNodeId));
@@ -41,6 +42,7 @@ static TInterconnectProxyCommon::TPtr MakeInterconnectCommon(
common->Settings.TotalInflightAmountOfData = 256ull * 1024 * 1024;
common->Settings.TCPSocketBufferSize = 8 * 1024 * 1024;
common->Settings.EnableExternalDataChannel = true;
+ common->Settings.UseUring = useUring;
common->OutgoingHandshakeInflightLimit = 3;
return common;
}
@@ -73,6 +75,12 @@ static void SetupInterconnectServices(TActorSystemSetup* setup,
MakePollerActorId(),
TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, InterconnectPoolId));
+ if (common->Settings.UseUring && TUringContext::IsSupported()) {
+ setup->LocalServices.emplace_back(
+ MakeUringPollerActorId(),
+ TActorSetupCmd(CreateUringPollerActor(common->Settings.EnableUringSQPOLL), TMailboxType::ReadAsFilled, InterconnectPoolId));
+ }
+
setup->LocalServices.emplace_back(
MakeHandshakeBrokerOutId(),
TActorSetupCmd(CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit),
@@ -107,15 +115,17 @@ struct TDDiskServer : public TPDiskTest<ChunkSize> {
ui32 ServerNodeId;
ui32 ClientNodeId;
ui16 Port;
+ bool UseUring;
static constexpr ui32 DDiskSlotId = 1;
TDDiskServer(const TPerfTestConfig& cfg, const NDevicePerfTest::TDDiskTest& testProto,
- ui32 serverNodeId, ui32 clientNodeId, ui16 port)
+ ui32 serverNodeId, ui32 clientNodeId, ui16 port, bool useUring = false)
: TBase(cfg, DefaultPDiskTestProto())
, DDiskTestProto(testProto)
, ServerNodeId(serverNodeId)
, ClientNodeId(clientNodeId)
, Port(port)
+ , UseUring(useUring)
{
// Re-key PDisk actor IDs and the logger to ServerNodeId before Init() runs;
// otherwise local sends from DDisk -> PDisk/logger get routed through
@@ -193,7 +203,7 @@ struct TDDiskServer : public TPDiskTest<ChunkSize> {
// Server only accepts incoming connections, but the interconnect handshake still
// resolves the peer NodeId via the local nameserver, so we must register the
// client with a placeholder address (port 0; server never initiates connection).
- auto common = MakeInterconnectCommon(TBase::Counters, ServerNodeId);
+ auto common = MakeInterconnectCommon(TBase::Counters, ServerNodeId, UseUring);
TVector<TInterconnectPeer> peers = {{ClientNodeId, "::", 0}};
SetupInterconnectServices(TBase::Setup.Get(), common, ServerNodeId,
@@ -245,10 +255,11 @@ struct TDDiskClient : public TPerfTest {
ui32 ClientNodeId;
TVector<TInterconnectPeer> ServerPeers;
ui32 NumDevicesPerServer;
+ bool UseUring;
TDDiskClient(const TPerfTestConfig& cfg, const NDevicePerfTest::TDDiskTest& testProto,
ui32 clientNodeId, const TVector<TInterconnectPeer>& serverPeers,
- ui32 numDevicesPerServer)
+ ui32 numDevicesPerServer, bool useUring = false)
: TPerfTest(cfg)
, Setup(new TActorSystemSetup())
, LogSettings(new NActors::NLog::TSettings(NActors::TActorId(clientNodeId, "logger"),
@@ -261,6 +272,7 @@ struct TDDiskClient : public TPerfTest {
, ClientNodeId(clientNodeId)
, ServerPeers(serverPeers)
, NumDevicesPerServer(numDevicesPerServer)
+ , UseUring(useUring)
{
}
@@ -279,7 +291,7 @@ struct TDDiskClient : public TPerfTest {
Setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(64, 20)));
// Set up interconnect with all server peers
- auto common = MakeInterconnectCommon(Counters, ClientNodeId);
+ auto common = MakeInterconnectCommon(Counters, ClientNodeId, UseUring);
SetupInterconnectServices(Setup.Get(), common, ClientNodeId,
"::", 0, ServerPeers, /*listen=*/false);