diff options
| author | Evgeniy Ivanov <[email protected]> | 2026-06-25 17:03:00 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-06-25 18:03:00 +0300 |
| commit | 931f110afb00cbaf8e998daec79b1438985ca355 (patch) | |
| tree | a75f16d3ac40c49512e722cca34790650e0c1daf | |
| parent | 2335250e8a0c286aac3501f289e0643ab286b67a (diff) | |
IC with io_uring prototype, #44325 (#44424)
20 files changed, 2663 insertions, 24 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index fe668e13615..4a0bf6cb8f1 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -259,6 +259,7 @@ #include <ydb/library/actors/interconnect/load.h> #include <ydb/library/actors/interconnect/poller/poller_actor.h> #include <ydb/library/actors/interconnect/poller/poller_tcp.h> +#include <ydb/library/actors/interconnect/poller/uring_poller_actor.h> #include <ydb/library/actors/interconnect/rdma/cq_actor/cq_actor.h> #include <ydb/library/actors/interconnect/rdma/mem_pool.h> #include <ydb/core/retro_tracing_impl/distributed_collector/distributed_retro_collector.h> @@ -586,6 +587,14 @@ static TInterconnectSettings GetInterconnectSettings(const NKikimrConfig::TInter result.CollectSubscriptionStackTrace = config.GetCollectSubscriptionStackTrace(); } + if (config.HasUseUring()) { + result.UseUring = config.GetUseUring(); + } + + if (config.HasEnableUringSQPOLL()) { + result.EnableUringSQPOLL = config.GetEnableUringSQPOLL(); + } + return result; } @@ -702,6 +711,11 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd( CreatePollerActor(schedulerConfig.MonCounters), TMailboxType::ReadAsFilled, systemPoolId)); + if (settings.UseUring && TUringContext::IsSupported()) { + setup->LocalServices.emplace_back(MakeUringPollerActorId(), TActorSetupCmd( + CreateUringPollerActor(settings.EnableUringSQPOLL), TMailboxType::ReadAsFilled, systemPoolId)); + } + auto destructorQueueSize = std::make_shared<std::atomic<TAtomicBase>>(0); TIntrusivePtr<TInterconnectProxyCommon> icCommon; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 82520ca7bbd..237597138a6 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -528,6 +528,8 @@ message TInterconnectConfig { optional bool CollectSubscriptionStackTrace = 59 [default = false]; optional uint32 RdmaPayloadCopySizeThreshold = 60 [default = 65536]; // 0 disables copying RDMA payload to regular memory optional uint32 MaxRdmaRetryBackoffLevel = 61 [default = 8]; // 5s * 2^8 = 1280s, about 21 minutes by default + optional bool UseUring = 62 [default = false]; + optional bool EnableUringSQPOLL = 63 [default = false]; // only effective when UseUring is set // ballast is added to IC handshake frames to ensure correctness of jumbo frames transmission over network optional uint32 HandshakeBallastSize = 14; diff --git a/ydb/library/actors/interconnect/events/events.h b/ydb/library/actors/interconnect/events/events.h index 8063a43da7f..285edbe0dd5 100644 --- a/ydb/library/actors/interconnect/events/events.h +++ b/ydb/library/actors/interconnect/events/events.h @@ -56,6 +56,15 @@ namespace NActors { EvSubscribeForConnection, EvReportConnection, + // io_uring transport events + EvUringRegister, + EvUringRegisterResult, + EvUringWriteComplete, + EvUringRecvComplete, + EvUringSendZcNotif, + EvUringUnregister, + EvUringRegisterFailed, + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/library/actors/interconnect/interconnect_common.h b/ydb/library/actors/interconnect/interconnect_common.h index a5ed548f890..4d11de471ba 100644 --- a/ydb/library/actors/interconnect/interconnect_common.h +++ b/ydb/library/actors/interconnect/interconnect_common.h @@ -81,6 +81,8 @@ namespace NActors { // 5s * 2^8 = 1280s, about 21 minutes with the current RDMA retry base delay. ui32 MaxRdmaRetryBackoffLevel = 8; bool CollectSubscriptionStackTrace = false; + bool UseUring = false; + bool EnableUringSQPOLL = false; // only effective when UseUring is set }; struct TWhiteboardSessionStatus { diff --git a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp index 0e8c94cf3b9..b9121ab5cb5 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp @@ -5,6 +5,10 @@ #include <ydb/library/actors/core/probes.h> #include <ydb/library/actors/util/datetime.h> +#ifdef __linux__ +#include <liburing.h> +#endif + #include <variant> namespace NActors { @@ -334,6 +338,221 @@ namespace NActors { ReceiveData(); } + void TInputSessionTCP::Handle(TEvUringRegisterResult::TPtr& ev) { + auto* msg = ev->Get(); + UringContext = std::move(msg->Context); + MainRecvBufGroupId = msg->MainRecvBufGroupId; + XdcRecvBufGroupId = msg->XdcRecvBufGroupId; + StartRecvUring(); + } + + void TInputSessionTCP::StartRecvUring() { + if (!UringContext) { + return; + } + if (Socket && !MainRecvMultishotActive) { + if (UringContext->SubmitRecvMultishot((int)*Socket, MainRecvBufGroupId, EUringOpTag::MainRecv)) { + UringContext->IncrementPendingRecvs(); + MainRecvMultishotActive = true; + } + } + UringContext->Flush(); + } + + void TInputSessionTCP::Handle(TEvUringRecvComplete::TPtr& ev) { + auto* msg = ev->Get(); + EUringOpTag tag = static_cast<EUringOpTag>(msg->UserData & UringOpTagMask); + + if (tag == EUringOpTag::XdcRecv) { + // Async XDC readv completion (Caveat 3). + UringXdcReadInFlight = false; + if (msg->Result > 0) { + if (UringXdcReadIsCatch) { + ProcessXdcCatchBytesUring(msg->Result); + } else { + ProcessXdcBytesUring(msg->Result); + } + LastReceiveTimestamp = TActivationContext::Monotonic(); + ReceiveData(); + } else if (msg->Result == 0) { + throw TExReestablishConnection{TDisconnectReason::EndOfStream()}; + } else { + int err = -msg->Result; + if (err != ECANCELED) { + throw TExReestablishConnection{TDisconnectReason::FromErrno(err)}; + } + } + return; + } + + // Main socket multishot recv completion. + if (msg->Result > 0) { + const size_t bytesRead = msg->Result; + BytesReadFromSocket += bytesRead; + ++UringMainRecvCompletions; + UringMainRecvBytes += bytesRead; + Metrics->AddInputChannelsIncomingTraffic(0, bytesRead); + IncomingData.Insert(IncomingData.End(), std::move(msg->Data)); + + if (!(msg->Flags & IORING_CQE_F_MORE)) { + MainRecvMultishotActive = false; + StartRecvUring(); + } + + LastReceiveTimestamp = TActivationContext::Monotonic(); + ReceiveData(); + } else if (msg->Result == 0) { + // EOF + MainRecvMultishotActive = false; + throw TExReestablishConnection{TDisconnectReason::EndOfStream()}; + } else { + int err = -msg->Result; + MainRecvMultishotActive = false; + if (err == ENOBUFS) { + // Provided-buffer ring is momentarily exhausted. The terminal CQE has already + // woken the reaper, which recycles released buffers (DrainFreelist). Schedule a + // deferred re-arm so we retry once buffers are available rather than busy-looping. + TActivationContext::Schedule(TDuration::MicroSeconds(250), + new IEventHandle(EvResumeReceiveData, 0, SelfId(), {}, nullptr, 0)); + } else if (err != ECANCELED) { + throw TExReestablishConnection{TDisconnectReason::FromErrno(err)}; + } + } + } + + void TInputSessionTCP::DriveXdcUring() { + if (!UringContext || !XdcSocket) { + return; + } + // Apply the catch stream as soon as it is fully read (idempotent), then keep a single + // XDC readv in flight while there is work to do. + ApplyXdcCatchStream(); + SubmitXdcRecvUring(); + } + + bool TInputSessionTCP::SubmitXdcRecvUring() { + if (!UringContext || !XdcSocket || UringXdcReadInFlight) { + return false; + } + + // Catch stream first: read into a throwaway buffer that is later scattered per channel. + if (XdcCatchStream.BytesPending) { + if (!XdcCatchStream.Buffer) { + XdcCatchStream.Buffer = TRcBuf::Uninitialized(64 * 1024); + } + const size_t numBytesToRead = Min<size_t>(XdcCatchStream.BytesPending, XdcCatchStream.Buffer.size()); + struct iovec iov{XdcCatchStream.Buffer.GetDataMut(), numBytesToRead}; + if (UringContext->SubmitReadv((int)*XdcSocket, &iov, 1, ++UringXdcReadSeqNo, EUringOpTag::XdcRecv)) { + UringXdcReadInFlight = true; + UringXdcReadIsCatch = true; + UringContext->Flush(); + return true; + } + return false; + } + + // Normal XDC stream: scatter read directly into the destination spans (zero-copy). + if (!XdcCatchStream.Applied || XdcInputQ.empty()) { + return false; + } + + TStackVec<struct iovec, 64> buffs; + size_t size = 0; + for (auto& [channel, span] : XdcInputQ) { + buffs.push_back(iovec{span.data(), span.size()}); + size += span.size(); + if (buffs.size() == 64 || size >= 1024 * 1024) { + break; + } + } + + if (UringContext->SubmitReadv((int)*XdcSocket, buffs.data(), buffs.size(), ++UringXdcReadSeqNo, EUringOpTag::XdcRecv)) { + UringXdcReadInFlight = true; + UringXdcReadIsCatch = false; + UringContext->Flush(); + return true; + } + return false; + } + + void TInputSessionTCP::ProcessXdcCatchBytesUring(ssize_t recvres) { + HandleXdcChecksum({XdcCatchStream.Buffer.data(), static_cast<size_t>(recvres)}); + + XdcCatchStream.BytesPending -= recvres; + XdcCatchStream.BytesProcessed += recvres; + BytesReadFromXdcSocket += recvres; + + // scatter read data into per-channel catch buffers + const char *in = XdcCatchStream.Buffer.data(); + while (recvres) { + Y_DEBUG_ABORT_UNLESS(!XdcCatchStream.Markup.empty()); + auto& [channel, apply, bytes] = XdcCatchStream.Markup.front(); + size_t bytesInChannel = Min<size_t>(recvres, bytes); + bytes -= bytesInChannel; + recvres -= bytesInChannel; + + if (apply) { + auto& context = GetPerChannelContext(channel); + while (bytesInChannel) { + const size_t offset = context.XdcCatchBytesRead % context.XdcCatchBuffer.size(); + TMutableContiguousSpan out = context.XdcCatchBuffer.GetContiguousSpanMut().SubSpan(offset, bytesInChannel); + memcpy(out.data(), in, out.size()); + context.XdcCatchBytesRead += out.size(); + in += out.size(); + bytesInChannel -= out.size(); + } + } else { + in += bytesInChannel; + } + + if (!bytes) { + XdcCatchStream.Markup.pop_front(); + } + } + + ApplyXdcCatchStream(); + } + + void TInputSessionTCP::ProcessXdcBytesUring(ssize_t recvres) { + // calculate stream checksums over the destination spans that were just filled + { + size_t bytesToChecksum = recvres; + for (auto& [channel, span] : XdcInputQ) { + const size_t n = Min<size_t>(bytesToChecksum, span.size()); + HandleXdcChecksum({span.data(), n}); + bytesToChecksum -= n; + if (!bytesToChecksum) { + break; + } + } + } + + Metrics->AddTotalBytesRead(recvres); + BytesReadFromXdcSocket += recvres; + + // cut the XdcInputQ deque + for (size_t bytesToCut = recvres; bytesToCut; ) { + Y_ABORT_UNLESS(!XdcInputQ.empty()); + auto& [channel, span] = XdcInputQ.front(); + size_t n = Min(bytesToCut, span.size()); + bytesToCut -= n; + if (n == span.size()) { + XdcInputQ.pop_front(); + } else { + span = span.SubSpan(n, Max<size_t>()); + Y_ABORT_UNLESS(!bytesToCut); + } + + Y_DEBUG_ABORT_UNLESS(n); + auto& context = GetPerChannelContext(channel); + context.DropFront(nullptr, n); + ProcessEvents(context); + } + + // drop fully processed inbound packets + ProcessInboundPacketQ(recvres, 0); + } + void TInputSessionTCP::Handle(NInterconnect::NRdma::TEvRdmaReadDone::TPtr& ev) { if (!ev->Get()->Event->IsSuccess()) { LOG_ERROR_IC_SESSION("ICRDMA", "Rdma IO failed, err source: %s, code %d", @@ -390,15 +609,29 @@ namespace NActors { } // try to read more data into buffers - progress |= ReadMore(); - progress |= ReadXdc(&numDataBytes); + if (!UringContext) { + progress |= ReadMore(); + progress |= ReadXdc(&numDataBytes); + } if (!progress) { // no progress was made during this iteration - PreallocateBuffers(); + if (!UringContext) { + PreallocateBuffers(); + } break; } } + if (UringContext) { + // Main recv multishot may have stopped (e.g. ENOBUFS or end of a multishot run); + // re-arm it now that we have drained the processable data. + if (Socket && !MainRecvMultishotActive) { + StartRecvUring(); + } + // Submit/keep an XDC readv in flight if there is pending XDC target space. + DriveXdcUring(); + } + if (enoughCpu) { SetEnoughCpu(true); StarvingInRow = 0; @@ -1344,10 +1577,37 @@ namespace NActors { if (now >= LastReceiveTimestamp + DeadPeerTimeout) { ReceiveData(); if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) { + // Diagnostic snapshot: capture exactly why the input session believes the peer is + // dead. On the idle-keepalive failure this shows whether ANY recv completion ever + // arrived (peer truly silent / recv not armed) and whether the multishot is still + // active, plus the ring's submit accounting. + LOG_NOTICE_IC_SESSION("ICIS30", "DeadPeer snapshot uring# %d lastRecvAge# %.3fs" + " bytesRead# %" PRIu64 " mainRecvCompletions# %" PRIu64 " mainRecvBytes# %" PRIu64 + " mainMsActive# %d pendingRecvs# %" PRIu32 " submitCalls# %" PRIu64 + " submitErrors# %" PRIu64 " submitPartials# %" PRIu64 " lastSubmitRet# %d sqeFull# %" PRIu64, + (UringContext ? 1 : 0), (now - LastReceiveTimestamp).SecondsFloat(), + BytesReadFromSocket, UringMainRecvCompletions, UringMainRecvBytes, + (int)MainRecvMultishotActive, + (UringContext ? UringContext->GetPendingRecvs() : 0), + (UringContext ? UringContext->GetSubmitCalls() : 0), + (UringContext ? UringContext->GetSubmitErrors() : 0), + (UringContext ? UringContext->GetSubmitPartials() : 0), + (UringContext ? UringContext->GetLastSubmitRet() : 0), + (UringContext ? UringContext->GetSqeFull() : 0)); // nothing has changed, terminate session throw TExDestroySession{TDisconnectReason::DeadPeer()}; } } + // Recv-side heartbeat (DEBUG): fires roughly once per DeadPeerTimeout on a healthy idle + // session. Paired with the output ICS42 send heartbeat to confirm keepalives are flowing. + if (UringContext) { + LOG_DEBUG_IC_SESSION("ICIS31", "uring recv hb lastRecvAge# %.3fs bytesRead# %" PRIu64 + " mainRecvCompletions# %" PRIu64 " mainRecvBytes# %" PRIu64 " mainMsActive# %d" + " pendingRecvs# %" PRIu32, + (now - LastReceiveTimestamp).SecondsFloat(), BytesReadFromSocket, + UringMainRecvCompletions, UringMainRecvBytes, (int)MainRecvMultishotActive, + UringContext->GetPendingRecvs()); + } Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer); } diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp index 8ea2fe0232b..0f9fa0085ee 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp @@ -406,13 +406,21 @@ namespace NActors { Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), std::move(inputSessionParams), RdmaQp, std::move(cq)); ReceiverId = RegisterWithSameMailbox(actor.Release()); - // register our socket in poller actor - LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); - const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); - Y_ABORT_UNLESS(success); - if (XdcSocket) { - const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId())); + // register our socket with the appropriate I/O backend + if (Proxy->Common->Settings.UseUring && !Params.Encryption && TUringContext::IsSupported()) { + LOG_DEBUG_IC_SESSION("ICS11", "registering socket with UringPollerActor"); + // Both the main and XDC sockets are driven entirely by io_uring: the input session + // arms recv (main multishot, XDC readv) and the output session submits writes/send_zc. + // Neither socket is registered with the epoll TPollerActor anymore (Caveat 3). + Send(MakeUringPollerActorId(), new TEvUringRegister(Socket, XdcSocket, ReceiverId, SelfId())); + } else { + LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); + const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); Y_ABORT_UNLESS(success); + if (XdcSocket) { + const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId())); + Y_ABORT_UNLESS(success); + } } LostConnectionWatchdog.Disarm(); @@ -553,7 +561,11 @@ namespace NActors { return; } - WriteData(); + if (UringContext) { + WriteDataUring(); + } else { + WriteData(); + } if (!Socket) { return; } @@ -564,8 +576,13 @@ namespace NActors { canProducePackets = NumEventsInQueue && (InflightDataAmount + RdmaInflightDataAmount) < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit(); - canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) || - (XdcStream && !ReceiveContext->XdcWriteBlocked); + if (UringContext) { + canWriteData = ((OutgoingStream || OutOfBandStream) && !UringMainWriteInFlight) || + (XdcStream && !UringXdcWriteInFlight); + } else { + canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) || + (XdcStream && !ReceiveContext->XdcWriteBlocked); + } if (!canProducePackets && !canWriteData) { SetEnoughCpu(true); // we do not starve @@ -664,6 +681,31 @@ namespace NActors { } void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) { + if (UringContext) { + if (Socket) { + UringContext->SubmitCancelFd((int)*Socket); + } + if (XdcSocket) { + UringContext->SubmitCancelFd((int)*XdcSocket); + } + UringContext->Flush(); + UringWritesInFlight.clear(); + UringMainWriteInFlight = false; + UringMainWriteInFlightSince = TMonotonic::Zero(); + UringXdcWriteInFlight = false; + UringZcEnabled = false; + XdcZcNotifCum = 0; + XdcDropWantedCum = 0; + XdcDroppedCum = 0; + XdcZcNotifQueue.clear(); + // Tell the reaper to release this session ring. Without this the reaper keeps its + // own TUringContext reference forever, so io_uring_queue_exit never runs and the + // ring fd, eventfd and provided-buffer ring leak on every reconnect/handshake race. + const int eventFd = UringContext->GetEventFd(); + UringContext.Reset(); + Send(MakeUringPollerActorId(), new TEvUringUnregister(eventFd)); + } + if (Socket) { if (const TString& s = reason.ToString()) { Proxy->Metrics->IncDisconnectByReason(s); @@ -864,6 +906,260 @@ namespace NActors { WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer; } + void TInterconnectSessionTCP::WriteDataUring() { + static constexpr size_t maxBytesAtOnce = 256 * 1024; + + auto submitStream = [&](NInterconnect::TOutgoingStream& stream, int socketFd, EUringOpTag tag, + size_t maxBytes, bool& inFlight, bool isOutOfBand) { + if (inFlight || !stream || UringContext->GetPendingWrites() >= TUringContext::MaxPendingWrites) { + return; + } + + constexpr ui32 iovLimit = 256; + TStackVec<TConstIoVec, iovLimit> wbuffers; + stream.ProduceIoVec(wbuffers, iovLimit, maxBytes); + if (wbuffers.empty()) { + return; + } + + ui64 seqNo = ++UringWriteSeqNo; + size_t totalBytes = 0; + std::vector<struct iovec> iovecs(wbuffers.size()); + for (size_t i = 0; i < wbuffers.size(); ++i) { + iovecs[i].iov_base = const_cast<void*>(static_cast<const void*>(wbuffers[i].Data)); + iovecs[i].iov_len = wbuffers[i].Size; + totalBytes += wbuffers[i].Size; + } + + if (UringContext->SubmitWritev(socketFd, iovecs.data(), iovecs.size(), seqNo, tag)) { + UringContext->IncrementPendingWrites(); + inFlight = true; + // Key the in-flight map by the SAME masked value the completion carries + // (the tag occupies the high byte of user_data), so the completion lookup can + // never miss and silently skip clearing the single-in-flight latch. + const ui64 key = seqNo & UringOpDataMask; + UringWritesInFlight[key] = TUringWriteInFlight{totalBytes, tag == EUringOpTag::XdcWritev, isOutOfBand, std::move(iovecs)}; + if (tag == EUringOpTag::MainWritev) { + ++UringMainWritevSubmitted; + UringMainWriteInFlightSince = TActivationContext::Monotonic(); + } + } + }; + + // Main socket carries two logical streams: the ordinary data stream (OutgoingStream, + // retained until peer-confirmed) and the priority out-of-band stream (OutOfBandStream, + // carrying confirm/flush packets, dropped as soon as written). Because io_uring writes + // on the main socket are single-in-flight, we only ever switch streams at a main-packet + // boundary (OutgoingOffset == 0); a partially-sent data packet is always finished first. + // This preserves on-wire packet framing while still letting confirms jump the queue. + auto submitMain = [&]() { + if (UringMainWriteInFlight || UringContext->GetPendingWrites() >= TUringContext::MaxPendingWrites) { + // Diagnostic: if the single-in-flight main-write latch has been held far longer + // than any keepalive period, the writev completion was never delivered and the + // sender has gone silent (peer will declare DeadPeer). Report it (throttled) with + // the submit accounting so we can tell a swallowed io_uring_submit error from a + // lost completion. + if (UringMainWriteInFlight && UringMainWriteInFlightSince != TMonotonic::Zero()) { + const TMonotonic now = TActivationContext::Monotonic(); + if (now - UringMainWriteInFlightSince > TDuration::Seconds(2) && + now - UringMainWriteStuckReported > TDuration::Seconds(2)) { + UringMainWriteStuckReported = now; + LOG_NOTICE_IC_SESSION("ICS41", "uring main write latch stuck for %.3fs" + " submitted# %" PRIu64 " completed# %" PRIu64 " pendingWrites# %" PRIu32 + " oobSize# %zu submitCalls# %" PRIu64 " submitErrors# %" PRIu64 + " submitPartials# %" PRIu64 " lastSubmitRet# %d sqeFull# %" PRIu64, + (now - UringMainWriteInFlightSince).SecondsFloat(), + UringMainWritevSubmitted, UringMainWriteCompleted, + UringContext->GetPendingWrites(), OutOfBandStream.CalculateOutgoingSize(), + UringContext->GetSubmitCalls(), UringContext->GetSubmitErrors(), + UringContext->GetSubmitPartials(), UringContext->GetLastSubmitRet(), + UringContext->GetSqeFull()); + } + } + return; + } + if (OutOfBandStream && OutgoingOffset == 0) { + submitStream(OutOfBandStream, (int)*Socket, EUringOpTag::MainWritev, maxBytesAtOnce, + UringMainWriteInFlight, /*isOutOfBand=*/true); + } else if (OutgoingStream) { + submitStream(OutgoingStream, (int)*Socket, EUringOpTag::MainWritev, maxBytesAtOnce, + UringMainWriteInFlight, /*isOutOfBand=*/false); + } else if (OutOfBandStream) { + // No partial main packet in progress but OutgoingStream is empty: flush OOB. + submitStream(OutOfBandStream, (int)*Socket, EUringOpTag::MainWritev, maxBytesAtOnce, + UringMainWriteInFlight, /*isOutOfBand=*/true); + } + }; + + auto submitXdcZc = [&](NInterconnect::TOutgoingStream& stream, int socketFd, size_t maxBytes) { + if (UringXdcWriteInFlight || !stream || UringContext->GetPendingWrites() >= TUringContext::MaxPendingWrites) { + return; + } + + constexpr ui32 iovLimit = 1; + TStackVec<TConstIoVec, iovLimit> wbuffers; + TStackVec<NInterconnect::TOutgoingStream::TBufController, iovLimit> controllers; + stream.ProduceIoVec(wbuffers, iovLimit, maxBytes, &controllers); + if (wbuffers.empty()) { + return; + } + + auto& front = wbuffers.front(); + ui64 seqNo = ++UringWriteSeqNo; + + if (UringContext->SubmitSendZc(socketFd, front.Data, front.Size, seqNo)) { + UringContext->IncrementPendingWrites(); + UringXdcWriteInFlight = true; + UringWritesInFlight[seqNo] = TUringWriteInFlight{front.Size, true, false, {}}; + } + }; + + if (OutgoingStream || OutOfBandStream) { + submitMain(); + } + + if (XdcSocket && XdcStream) { + if (Proxy->Common->Settings.SocketSendOptimization == ESocketSendOptimization::IC_MSG_ZEROCOPY) { + submitXdcZc(XdcStream, (int)*XdcSocket, maxBytesAtOnce); + } else { + submitStream(XdcStream, (int)*XdcSocket, EUringOpTag::XdcWritev, maxBytesAtOnce, + UringXdcWriteInFlight, /*isOutOfBand=*/false); + } + } + + UringContext->Flush(); + } + + void TInterconnectSessionTCP::Handle(TEvUringRegisterFailed::TPtr& /*ev*/) { + // io_uring setup failed in the poller (ring/buffer-ring allocation). Fall back to the + // epoll TPollerActor so this session still has a working I/O backend instead of being + // left silently dead. Mirrors the non-uring registration branch in SetNewConnection. + if (UringContext || !Socket) { + return; // already have a backend, or socket already gone + } + LOG_NOTICE_IC_SESSION("ICS11", "uring registration failed, falling back to epoll PollerActor"); + const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); + Y_ABORT_UNLESS(success); + if (XdcSocket) { + const bool successXdc = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId())); + Y_ABORT_UNLESS(successXdc); + } + } + + void TInterconnectSessionTCP::Handle(TEvUringRegisterResult::TPtr& ev) { + auto* msg = ev->Get(); + UringContext = std::move(msg->Context); + // Zero-copy XDC send gating is active only when this connection actually uses send_zc. + UringZcEnabled = XdcSocket && + Proxy->Common->Settings.SocketSendOptimization == ESocketSendOptimization::IC_MSG_ZEROCOPY; + XdcZcNotifCum = 0; + XdcDropWantedCum = 0; + XdcDroppedCum = 0; + XdcZcNotifQueue.clear(); + GenerateTraffic(); + } + + void TInterconnectSessionTCP::DropFrontXdc(size_t bytes) { + if (!UringZcEnabled) { + XdcStream.DropFront(bytes); + return; + } + // Defer the physical free until the kernel has released the buffers via NOTIF. + XdcDropWantedCum += bytes; + FlushXdcZcDrop(); + } + + void TInterconnectSessionTCP::FlushXdcZcDrop() { + const ui64 dropTarget = Min(XdcDropWantedCum, XdcZcNotifCum); + if (dropTarget > XdcDroppedCum) { + const size_t toDrop = dropTarget - XdcDroppedCum; + XdcStream.DropFront(toDrop); + XdcDroppedCum += toDrop; + } + } + + void TInterconnectSessionTCP::Handle(TEvUringWriteComplete::TPtr& ev) { + auto* msg = ev->Get(); + ui64 seqNo = msg->UserData & UringOpDataMask; + + auto it = UringWritesInFlight.find(seqNo); + if (it == UringWritesInFlight.end()) { + return; + } + + auto flight = std::move(it->second); + UringWritesInFlight.erase(it); + + if (flight.IsXdc) { + UringXdcWriteInFlight = false; + } else { + UringMainWriteInFlight = false; + UringMainWriteInFlightSince = TMonotonic::Zero(); + ++UringMainWriteCompleted; + } + + if (msg->Result > 0) { + size_t written = msg->Result; + if (flight.IsXdc) { + XdcStream.Advance(written); + XdcBytesSent += written; + XdcOffset += written; + if (UringZcEnabled) { + // Record this zero-copy send; its buffers stay referenced by the kernel + // until the matching NOTIF arrives (FIFO per socket). + XdcZcNotifQueue.push_back(written); + } + } else if (flight.IsOutOfBand) { + // Out-of-band (confirm/flush) bytes are never retained: advance the sent cursor + // (so UnsentBytes is decremented, exactly as the epoll path does via Write()) and + // then drop them immediately. Skipping Advance left UnsentBytes stale, eventually + // tripping the OutgoingStream invariant once the queue emptied. + OutOfBandStream.Advance(written); + OutOfBandStream.DropFront(written); + OutOfBandBytesSent += written; + BytesWrittenToSocket += written; + } else { + OutgoingStream.Advance(written); + OutgoingOffset += written; + BytesWrittenToSocket += written; + + auto sendQueueIt = SendQueue.begin() + OutgoingIndex; + for (; OutgoingOffset && sendQueueIt != SendQueue.end() && sendQueueIt->PacketSize <= OutgoingOffset; + ++sendQueueIt, ++OutgoingIndex) { + OutgoingOffset -= sendQueueIt->PacketSize; + } + } + Proxy->Metrics->AddTotalBytesWritten(written); + DropConfirmed(LastConfirmed); + GenerateTraffic(); + } else if (msg->Result < 0) { + int err = -msg->Result; + if (err != ECANCELED) { + LOG_NOTICE_NET(Proxy->PeerNodeId, "uring write error: %s", strerror(err)); + ReestablishConnectionWithHandshake(TDisconnectReason::FromErrno(err)); + } + } else { + LOG_NOTICE_NET(Proxy->PeerNodeId, "uring write: connection closed by peer%s", ""); + if (!NumEventsInQueue && LastConfirmed == OutputCounter) { + Terminate(TDisconnectReason::EndOfStream()); + } else { + ReestablishConnectionWithHandshake(TDisconnectReason::EndOfStream()); + } + } + } + + void TInterconnectSessionTCP::Handle(TEvUringSendZcNotif::TPtr& ev) { + Y_UNUSED(ev); + // The kernel has released the buffers of the oldest outstanding zero-copy send. + // Advance the notif-confirmed offset and flush any drop that was waiting on it. + if (!UringZcEnabled || XdcZcNotifQueue.empty()) { + return; + } + XdcZcNotifCum += XdcZcNotifQueue.front(); + XdcZcNotifQueue.pop_front(); + FlushXdcZcDrop(); + } + ssize_t TInterconnectSessionTCP::HandleWriteResult(ssize_t r, const TString& err) { if (r > 0) { return r; @@ -972,6 +1268,21 @@ namespace NActors { while (FlushSchedule && now >= FlushSchedule.top()) { FlushSchedule.pop(); } + // Send-side heartbeat (DEBUG): paired with the input session's recv counters and the + // reaper ICUR50 stats, this shows on idle whether keepalive writevs keep being submitted + // and completed, or whether the single-in-flight latch is wedged. The flush timer fires + // roughly every ForceConfirmPeriod on an otherwise idle session, so this is low-rate. + if (UringContext) { + const double latchHeldS = (UringMainWriteInFlight && UringMainWriteInFlightSince != TMonotonic::Zero()) + ? (now - UringMainWriteInFlightSince).SecondsFloat() : 0.0; + LOG_DEBUG_IC_SESSION("ICS42", "uring send hb submitted# %" PRIu64 " completed# %" PRIu64 + " mainInFlight# %d latchHeld# %.3fs pendingWrites# %" PRIu32 " oobSize# %zu" + " submitErrors# %" PRIu64 " submitPartials# %" PRIu64 " sqeFull# %" PRIu64, + UringMainWritevSubmitted, UringMainWriteCompleted, (int)UringMainWriteInFlight, + latchHeldS, UringContext->GetPendingWrites(), OutOfBandStream.CalculateOutgoingSize(), + UringContext->GetSubmitErrors(), UringContext->GetSubmitPartials(), + UringContext->GetSqeFull()); + } if (Socket) { if (now >= ForcePacketTimestamp) { ++ConfirmPacketsForcedByTimeout; @@ -1133,7 +1444,7 @@ namespace NActors { const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped; OutgoingStream.DropFront(bytesDropped); - XdcStream.DropFront(bytesDroppedFromXdc); + DropFrontXdc(bytesDroppedFromXdc); if (lastDroppedSerial) { ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { channel.DropConfirmed(*lastDroppedSerial, *Pool); diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.h b/ydb/library/actors/interconnect/interconnect_tcp_session.h index b29147a7d29..32f1a81c468 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.h +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.h @@ -7,6 +7,7 @@ #include <ydb/library/actors/interconnect/logging/logging.h> #include <ydb/library/actors/interconnect/poller/poller_tcp.h> #include <ydb/library/actors/interconnect/poller/poller_actor.h> +#include <ydb/library/actors/interconnect/poller/uring_poller_actor.h> #include <ydb/library/actors/interconnect/retro_tracing/spans.h> #include <ydb/library/actors/protos/services_common.pb.h> #include <ydb/library/actors/util/datetime.h> @@ -30,6 +31,7 @@ #include "events_local.h" #include "interconnect_impl.h" #include "interconnect_zc_processor.h" +#include "uring_context.h" #include "interconnect_channel.h" #include "watchdog_timer.h" #include "event_holder_pool.h" @@ -267,6 +269,8 @@ namespace NActors { cFunc(EvCheckDeadPeer, HandleCheckDeadPeer) cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate) hFunc(NMon::TEvHttpInfoRes, GenerateHttpInfo) + hFunc(TEvUringRecvComplete, Handle) + hFunc(TEvUringRegisterResult, Handle) ) private: @@ -281,6 +285,20 @@ namespace NActors { TInterconnectProxyCommon::TPtr Common; const ui32 NodeId; const TSessionParams Params; + + TUringContext::TPtr UringContext; + ui16 MainRecvBufGroupId = 0; + ui16 XdcRecvBufGroupId = 0; + bool MainRecvMultishotActive = false; + bool XdcRecvMultishotActive = false; + // Diagnostic recv-path counters (instrumentation for the idle-keepalive DeadPeer hunt). + ui64 UringMainRecvCompletions = 0; + ui64 UringMainRecvBytes = 0; + // XDC receive over io_uring: a single async readv directly into the XdcInputQ + // destination spans (or the catch-stream buffer) is kept in flight at a time. + ui64 UringXdcReadSeqNo = 0; + bool UringXdcReadInFlight = false; + bool UringXdcReadIsCatch = false; NInterconnect::NRdma::TQueuePair::TPtr RdmaQp; NInterconnect::NRdma::ICq::TPtr RdmaCq; XXH3_state_t XxhashState; @@ -306,7 +324,7 @@ namespace NActors { }; std::deque<TInboundPacket> InboundPacketQ; std::deque<std::tuple<ui16, TMutableContiguousSpan>> XdcInputQ; // target buffers for the XDC stream with channel reference - std::deque<std::tuple<ui16, std::optional<ui32>>> XdcChecksumQ; // (size, optional(expectedChecksum)). nullopt if checksums are disabled. + std::deque<std::tuple<ui16, std::optional<ui32>>> XdcChecksumQ; // (size, optional(expectedChecksum)). nullopt if checksums are disabled. ui32 XdcCurrentChecksum = 0; // catch stream -- used after TCP reconnect to match XDC stream with main packet stream @@ -339,9 +357,17 @@ namespace NActors { void Handle(TEvPollerReady::TPtr ev); void Handle(TEvPollerRegisterResult::TPtr ev); + void Handle(TEvUringRecvComplete::TPtr& ev); + void Handle(TEvUringRegisterResult::TPtr& ev); void HandleConfirmUpdate(); void Handle(NInterconnect::NRdma::TEvRdmaReadDone::TPtr& ev); void ReceiveData(); + void StartRecvUring(); + // XDC-over-io_uring receive helpers (Caveat 3). + void DriveXdcUring(); + bool SubmitXdcRecvUring(); + void ProcessXdcCatchBytesUring(ssize_t recvres); + void ProcessXdcBytesUring(ssize_t recvres); void ProcessHeader(); void ProcessPayload(ui64 *numDataBytes); void ProcessInboundPacketQ(ui64 numXdcBytesRead, ui64 numRdmaBytesRead); @@ -541,6 +567,10 @@ namespace NActors { hFunc(TEvSocketDisconnect, OnDisconnect) hFunc(TEvTerminate, Handle) hFunc(TEvProcessPingRequest, Handle) + hFunc(TEvUringRegisterResult, Handle) + hFunc(TEvUringRegisterFailed, Handle) + hFunc(TEvUringWriteComplete, Handle) + hFunc(TEvUringSendZcNotif, Handle) ) UpdateUtilization(); } @@ -573,7 +603,12 @@ namespace NActors { void Handle(TEvPollerReady::TPtr& ev); void Handle(TEvPollerRegisterResult::TPtr ev); + void Handle(TEvUringRegisterResult::TPtr& ev); + void Handle(TEvUringRegisterFailed::TPtr& ev); + void Handle(TEvUringWriteComplete::TPtr& ev); + void Handle(TEvUringSendZcNotif::TPtr& ev); void WriteData(); + void WriteDataUring(); ssize_t HandleWriteResult(ssize_t r, const TString& err); ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes); @@ -686,6 +721,38 @@ namespace NActors { TPollerToken::TPtr PollerToken; TPollerToken::TPtr XdcPollerToken; ui32 SendBufferSize; + + TUringContext::TPtr UringContext; + ui64 UringWriteSeqNo = 0; + bool UringMainWriteInFlight = false; + bool UringXdcWriteInFlight = false; + struct TUringWriteInFlight { + size_t Bytes; + bool IsXdc; + bool IsOutOfBand = false; + std::vector<struct iovec> Iovecs; + }; + THashMap<ui64, TUringWriteInFlight> UringWritesInFlight; + + // Diagnostic send-path counters (instrumentation for the idle-keepalive DeadPeer hunt). + ui64 UringMainWritevSubmitted = 0; // main-socket writevs handed to io_uring + ui64 UringMainWriteCompleted = 0; // main-socket writev completions processed + // Monotonic timestamp at which UringMainWriteInFlight last went true; TMonotonic::Zero() + // when the latch is clear. Used to detect a wedged single-in-flight write latch. + TMonotonic UringMainWriteInFlightSince; + TMonotonic UringMainWriteStuckReported; // throttles the "latch stuck" NOTICE + + // IORING_OP_SEND_ZC buffer lifetime (Caveat 5). When the XDC stream is sent with + // io_uring zero-copy, an XdcStream buffer must not be freed (DropFront) until the + // kernel posts the corresponding IORING_CQE_F_NOTIF. We advance the read cursor on + // the data CQE (safe) but gate DropFront on the cumulative notif-confirmed offset. + bool UringZcEnabled = false; + ui64 XdcZcNotifCum = 0; // cumulative XDC bytes whose send_zc NOTIF has arrived + ui64 XdcDropWantedCum = 0; // cumulative XDC bytes the peer has confirmed for dropping + ui64 XdcDroppedCum = 0; // cumulative XDC bytes physically dropped from XdcStream + std::deque<ui64> XdcZcNotifQueue; // FIFO of in-flight zc send sizes awaiting NOTIF + void DropFrontXdc(size_t bytes); + void FlushXdcZcDrop(); ui64 InflightDataAmount = 0; ui64 RdmaInflightDataAmount = 0; diff --git a/ydb/library/actors/interconnect/poller/uring_poller_actor.cpp b/ydb/library/actors/interconnect/poller/uring_poller_actor.cpp new file mode 100644 index 00000000000..42205824119 --- /dev/null +++ b/ydb/library/actors/interconnect/poller/uring_poller_actor.cpp @@ -0,0 +1,726 @@ +#include "uring_poller_actor.h" +#include "poller_actor.h" + +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/library/actors/core/actorsystem.h> +#include <ydb/library/actors/core/events.h> +#include <ydb/library/actors/core/log.h> +#include <ydb/library/actors/protos/services_common.pb.h> +#include <ydb/library/actors/interconnect/uring_context.h> +#include <ydb/library/actors/interconnect/uring_recv_buffer_pool.h> + +#include <util/system/thread.h> +#include <util/system/mutex.h> +#include <util/system/guard.h> + +#include <liburing.h> +#include <sys/eventfd.h> +#include <poll.h> +#include <unistd.h> + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <memory> +#include <mutex> +#include <thread> +#include <vector> + +namespace NActors { + + // Per-session ring state, owned by the reaper thread. + struct TUringSessionRing { + TUringContext::TPtr Context; + TUringRecvBufferPool MainRecvPool; + int EventFd = -1; + int MainSocketFd = -1; + int XdcSocketFd = -1; + }; + + // Dedicated OS thread that owns the anchor ring and reaps completions from all + // session rings. It removes the dependency on the shared epoll TPollerActor: + // each session ring signals its eventfd on a CQE; the anchor ring arms a + // multishot POLL on every such eventfd, and this thread blocks on the anchor. + class TUringReaper : public ISimpleThread { + // Sentinel user_data values for anchor-ring control completions. Real session entries + // use the eventfd value (a small positive int) in the low 32 bits, so these high-bit + // sentinels can never collide with them. + // + // IMPORTANT: liburing reserves user_data == (__u64)-1 (LIBURING_UDATA_TIMEOUT) for the + // internal timeout completion that io_uring_wait_cqe_timeout posts on kernels without + // IORING_FEAT_EXT_ARG. ShutdownUserData previously used ~0 too, so every backstop timeout + // was misread as a shutdown-fd event: the idle sweep never ran (backstopTicks stayed 0) + // and the reaper hot-spun re-arming the shutdown poll. Keep ~0 reserved for liburing. + static constexpr ui64 LiburingTimeoutUserData = ~ui64(0); // == LIBURING_UDATA_TIMEOUT + static constexpr ui64 ShutdownUserData = ~ui64(0) - 1; + static constexpr ui64 WakeUserData = ~ui64(0) - 2; + static constexpr ui64 PollRemoveUserData = ~ui64(0) - 3; + // Our own periodic backstop timer, armed on the anchor ring (see ArmTimeout). We block in + // io_uring_wait_cqe() and let this timeout (plus session eventfd polls and WakeFd) wake us, + // instead of io_uring_wait_cqe_timeout() which busy-spun on this kernel (returning without + // blocking), pinning a CPU core. + static constexpr ui64 TimerUserData = ~ui64(0) - 4; + + public: + explicit TUringReaper(TActorSystem* actorSystem, bool enableSqpoll) + : ActorSystem(actorSystem) + , EnableSqpoll(enableSqpoll) + { + AnchorRing = std::make_unique<struct io_uring>(); + struct io_uring_params params = {}; + // SQPOLL is opt-in (TInterconnectSettings::EnableUringSQPOLL). When off (default), + // the reaper submits anchor SQEs explicitly and the session threads submit their own + // I/O explicitly. When on, the anchor ring sets SQPOLL here and every session ring + // sets SQPOLL + ATTACH_WQ (wq_fd = this anchor ring), so they all share ONE kernel + // poll thread rather than spawning one per session. + if (EnableSqpoll) { + params.flags |= IORING_SETUP_SQPOLL; + params.sq_thread_idle = SqThreadIdleMs; + } + int ret = io_uring_queue_init_params(AnchorQueueDepth, AnchorRing.get(), ¶ms); + Y_ABORT_UNLESS(ret >= 0, "io_uring_queue_init_params(anchor) failed: %d", ret); + AnchorRingFd = AnchorRing->ring_fd; + + ShutdownFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + Y_ABORT_UNLESS(ShutdownFd >= 0); + WakeFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + Y_ABORT_UNLESS(WakeFd >= 0); + + with_lock (Mutex) { + ArmPoll(ShutdownFd, ShutdownUserData); + ArmPoll(WakeFd, WakeUserData); + ArmTimeout(); + io_uring_submit(AnchorRing.get()); + } + + GraveyardThread = std::thread([this] { GraveyardProc(); }); + } + + ~TUringReaper() { + StopAndJoin(); + if (AnchorRing) { + io_uring_queue_exit(AnchorRing.get()); + AnchorRing.reset(); + } + if (ShutdownFd >= 0) { + close(ShutdownFd); + } + if (WakeFd >= 0) { + close(WakeFd); + } + } + + int GetAnchorRingFd() const { + return AnchorRingFd; + } + + bool GetEnableSqpoll() const { + return EnableSqpoll; + } + + // Aggregate reaper liveness counters (instrumentation for the idle-keepalive DeadPeer + // hunt). Read from the TUringPollerActor thread for periodic logging; the reaper thread + // is the only writer, so relaxed loads are sufficient. + struct TStats { + ui64 BackstopTicks; + ui64 AnchorWakeups; + ui64 TotalCqesReaped; + ui64 RecvCqes; + ui64 WriteCqes; + ui64 ActiveSessions; + ui64 EmptyWakes; + ui64 LastAnchorUserData; + ui64 PollWakeups; + }; + TStats GetStats() const { + return TStats{ + BackstopTicks.load(std::memory_order_relaxed), + AnchorWakeups.load(std::memory_order_relaxed), + TotalCqesReaped.load(std::memory_order_relaxed), + RecvCqes.load(std::memory_order_relaxed), + WriteCqes.load(std::memory_order_relaxed), + ActiveSessions.load(std::memory_order_relaxed), + EmptyWakes.load(std::memory_order_relaxed), + LastAnchorUserData.load(std::memory_order_relaxed), + PollWakeups.load(std::memory_order_relaxed), + }; + } + + // Called from the actor (TUringPollerActor) thread. + void AddSession(std::unique_ptr<TUringSessionRing> session) { + int efd = session->EventFd; + with_lock (Mutex) { + Sessions.push_back(std::move(session)); + EventFdToSession[efd] = Sessions.back().get(); + ArmPoll(efd, static_cast<ui64>(static_cast<ui32>(efd))); + io_uring_submit(AnchorRing.get()); + ActiveSessions.store(Sessions.size(), std::memory_order_relaxed); + } + } + + // Called from the actor (TUringPollerActor) thread on session teardown. The actual + // removal/destruction happens on the reaper thread (it owns the rings) to avoid racing + // with ReapSession; here we just enqueue the request and wake the reaper. + void RemoveSession(int eventFd) { + with_lock (RemovalMutex) { + PendingRemoval.push_back(eventFd); + } + const uint64_t one = 1; + [[maybe_unused]] ssize_t r = write(WakeFd, &one, sizeof(one)); + } + + void StopAndJoin() { + if (Stop.exchange(true)) { + return; + } + const uint64_t one = 1; + [[maybe_unused]] ssize_t r = write(ShutdownFd, &one, sizeof(one)); + Join(); + // The reaper thread has exited and will no longer hand work to the graveyard; + // stop it and let it drain whatever remains. + { + std::lock_guard<std::mutex> lk(GraveyardMutex); + GraveyardStop = true; + } + GraveyardCv.notify_one(); + if (GraveyardThread.joinable()) { + GraveyardThread.join(); + } + } + + private: + void* ThreadProc() override { + TThread::SetCurrentThreadName("IcUringReaper"); + while (!Stop.load(std::memory_order_acquire)) { + struct io_uring_cqe* cqe = nullptr; + // Block until the anchor ring wakes us: a session eventfd POLLIN (real work), the + // WakeFd (teardown), the ShutdownFd, or our own periodic backstop timer (see + // ArmTimeout). The timer doubles as a safety net: even if an eventfd wakeup were + // ever lost, the timer fires every BackstopPeriod and we sweep every session ring, + // so a session can never be wedged with undelivered completions. A blocking wait + // (vs io_uring_wait_cqe_timeout, which busy-spun on this kernel) keeps the reaper + // off-CPU while idle. + int r = io_uring_wait_cqe(AnchorRing.get(), &cqe); + if (r < 0) { + if (r == -EINTR) { + continue; + } + break; + } + bool sawTimeout = false; + AnchorWakeups.fetch_add(1, std::memory_order_relaxed); + + std::vector<int> reArm; + bool reArmShutdown = false; + bool reArmWake = false; + bool reArmTimeout = false; + unsigned head; + unsigned count = 0; + ui64 LastSeenUd = 0; + io_uring_for_each_cqe(AnchorRing.get(), head, cqe) { + ++count; + const ui64 ud = io_uring_cqe_get_data64(cqe); + LastSeenUd = ud; + const bool more = cqe->flags & IORING_CQE_F_MORE; + if (ud == LiburingTimeoutUserData) { + // liburing's internal timeout completion; not a real event. + sawTimeout = true; + continue; + } + if (ud == TimerUserData) { + // Our periodic backstop timer fired; sweep below and re-arm it. + sawTimeout = true; + reArmTimeout = true; + continue; + } + if (ud == ShutdownUserData) { + DrainEventFd(ShutdownFd); + if (!more) { + reArmShutdown = true; + } + continue; + } + if (ud == WakeUserData) { + DrainEventFd(WakeFd); + if (!more) { + reArmWake = true; + } + continue; + } + if (ud == PollRemoveUserData) { + // Completion of a poll_remove we issued while tearing a session down; + // nothing to reap, the session ring is already (being) destroyed. + continue; + } + const int efd = static_cast<int>(static_cast<ui32>(ud)); + PollWakeups.fetch_add(1, std::memory_order_relaxed); + DrainEventFd(efd); + if (TUringSessionRing* session = Lookup(efd)) { + ReapSession(session); + } + if (!more) { + reArm.push_back(efd); + } + } + if (count == 0) { + EmptyWakes.fetch_add(1, std::memory_order_relaxed); + } else { + LastAnchorUserData.store(LastSeenUd, std::memory_order_relaxed); + } + io_uring_cq_advance(AnchorRing.get(), count); + + // Drop any sessions whose output actor asked to unregister. Done on this thread + // (which owns the rings) so io_uring_queue_exit and eventfd close are race-free. + ProcessPendingRemovals(); + + // Backstop: sweep every session ring directly so a lost eventfd wakeup cannot + // starve keepalives. Driven by wall-clock elapsed time (not solely the -ETIME + // return) so it fires reliably regardless of how the kernel delivers the timeout, + // and is throttled to ~BackstopPeriod so it stays cheap even if the wait returns + // early. This is the safety net that makes idle multi-session nodes robust. + const auto nowTp = std::chrono::steady_clock::now(); + if (sawTimeout || (nowTp - LastSweep) >= std::chrono::nanoseconds(BackstopPeriodNs)) { + BackstopTicks.fetch_add(1, std::memory_order_relaxed); + LastSweep = nowTp; + SweepAllSessions(); + } + + if (!reArm.empty() || reArmShutdown || reArmWake || reArmTimeout) { + with_lock (Mutex) { + for (int efd : reArm) { + // Skip eventfds whose session was removed in ProcessPendingRemovals. + if (EventFdToSession.contains(efd)) { + ArmPoll(efd, static_cast<ui64>(static_cast<ui32>(efd))); + } + } + if (reArmShutdown) { + ArmPoll(ShutdownFd, ShutdownUserData); + } + if (reArmWake) { + ArmPoll(WakeFd, WakeUserData); + } + if (reArmTimeout) { + ArmTimeout(); + } + io_uring_submit(AnchorRing.get()); + } + } + } + return nullptr; + } + + // Reaper-thread side of RemoveSession: cancel the anchor poll for each torn-down + // session, detach it from the maps and destroy it (exiting its ring and releasing + // its registered socket files / eventfd / buffer ring). + void ProcessPendingRemovals() { + std::vector<int> toRemove; + with_lock (RemovalMutex) { + toRemove.swap(PendingRemoval); + } + if (toRemove.empty()) { + return; + } + + std::vector<std::unique_ptr<TUringSessionRing>> dying; + with_lock (Mutex) { + for (int efd : toRemove) { + auto it = EventFdToSession.find(efd); + if (it == EventFdToSession.end()) { + continue; // never added, or already removed + } + EventFdToSession.erase(it); + // Cancel the multishot poll armed on this eventfd before it is closed. + PollRemove(static_cast<ui64>(static_cast<ui32>(efd))); + for (auto sit = Sessions.begin(); sit != Sessions.end(); ++sit) { + if ((*sit)->EventFd == efd) { + dying.push_back(std::move(*sit)); + Sessions.erase(sit); + break; + } + } + } + io_uring_submit(AnchorRing.get()); + ActiveSessions.store(Sessions.size(), std::memory_order_relaxed); + } + + // Hand the dying rings to the graveyard thread instead of destroying them here. + // ~TUringSessionRing frees the buffer ring (while the session Ring is still valid) + // and then drops the TUringContext reference, which runs io_uring_queue_exit() — + // releasing the registered socket files (so the kernel can finally drop them) and + // closing the session eventfd. io_uring_queue_exit() can block, so keeping it off + // this thread ensures connection churn never stalls completion processing (and the + // idle keepalive backstop) for the still-live sessions. + if (!dying.empty()) { + { + std::lock_guard<std::mutex> lk(GraveyardMutex); + for (auto& d : dying) { + Graveyard.push_back(std::move(d)); + } + } + GraveyardCv.notify_one(); + } + } + + // Dedicated thread that runs the (potentially blocking) destruction of removed session + // rings, off the reaper hot path. + void GraveyardProc() { + TThread::SetCurrentThreadName("IcUringGrave"); + for (;;) { + std::vector<std::unique_ptr<TUringSessionRing>> batch; + { + std::unique_lock<std::mutex> lk(GraveyardMutex); + GraveyardCv.wait(lk, [this] { return GraveyardStop || !Graveyard.empty(); }); + batch.swap(Graveyard); + if (batch.empty() && GraveyardStop) { + return; + } + } + batch.clear(); // runs ~TUringSessionRing -> io_uring_queue_exit off the reaper + } + } + + // Reap every live session ring. Used as the idle backstop. Session removal happens only + // on this same reaper thread (ProcessPendingRemovals), so a snapshot taken under the + // lock stays valid for the duration of the sweep even if AddSession reallocates the + // Sessions vector concurrently. + void SweepAllSessions() { + std::vector<TUringSessionRing*> snapshot; + with_lock (Mutex) { + snapshot.reserve(Sessions.size()); + for (auto& s : Sessions) { + snapshot.push_back(s.get()); + } + } + for (TUringSessionRing* session : snapshot) { + ReapSession(session); + } + } + + static void DrainEventFd(int fd) { + uint64_t val; + [[maybe_unused]] ssize_t r = read(fd, &val, sizeof(val)); + } + + TUringSessionRing* Lookup(int efd) { + with_lock (Mutex) { + auto it = EventFdToSession.find(efd); + return it == EventFdToSession.end() ? nullptr : it->second; + } + } + + // Must be called with Mutex held. Returns an SQE from the anchor ring, submitting any + // already-queued SQEs to make room if the SQ is momentarily full. Returns nullptr only + // if the SQ stays full after repeated submits (effectively unreachable with the current + // queue depth). + struct io_uring_sqe* GetAnchorSqe() { + struct io_uring_sqe* sqe = io_uring_get_sqe(AnchorRing.get()); + for (int attempt = 0; !sqe && attempt < 1024; ++attempt) { + io_uring_submit(AnchorRing.get()); + sqe = io_uring_get_sqe(AnchorRing.get()); + } + return sqe; + } + + // Must be called with Mutex held. Arms a ONE-SHOT poll for POLLIN on `fd`. We deliberately + // do NOT use multishot here: a multishot poll armed on a ring-registered eventfd was + // observed to fire exactly once and then go silent forever (the kernel terminated the + // multishot after the first completion without setting a final !F_MORE CQE we could detect + // to re-arm), so every session ran solely off the 100ms backstop timer — fine for idle + // keepalive but it throttled throughput to ~10 writes/s. A one-shot poll that we re-arm + // after every reap is robust: after we drain the eventfd and reap, the next CQE re-signals + // the (now readable) eventfd and the freshly armed one-shot poll fires immediately. + void ArmPoll(int fd, ui64 userData) { + struct io_uring_sqe* sqe = GetAnchorSqe(); + if (!sqe) { + // Graceful degradation instead of aborting the whole process. A missed arm here + // only delays reaping for this fd until the next backstop sweep re-arms it. + return; + } + io_uring_prep_poll_add(sqe, fd, POLLIN); + io_uring_sqe_set_data64(sqe, userData); + } + + // Must be called with Mutex held. Arms a one-shot relative timeout on the anchor ring used + // as the idle backstop. Re-armed each time it fires. BackstopTs is a member because + // io_uring_prep_timeout keeps a pointer to it until the timeout completes. + void ArmTimeout() { + struct io_uring_sqe* sqe = GetAnchorSqe(); + if (!sqe) { + return; + } + BackstopTs.tv_sec = 0; + BackstopTs.tv_nsec = BackstopPeriodNs; + io_uring_prep_timeout(sqe, &BackstopTs, 0, 0); + io_uring_sqe_set_data64(sqe, TimerUserData); + } + + // Must be called with Mutex held. Cancels a previously armed multishot poll identified + // by the user_data it was armed with. + void PollRemove(ui64 targetUserData) { + struct io_uring_sqe* sqe = GetAnchorSqe(); + if (!sqe) { + return; + } + io_uring_prep_poll_remove(sqe, targetUserData); + io_uring_sqe_set_data64(sqe, PollRemoveUserData); + } + + void SendEv(const TActorId& recipient, IEventBase* ev) { + ActorSystem->Send(new IEventHandle(recipient, TActorId(), ev)); + } + + void ReapSession(TUringSessionRing* session) { + struct io_uring* ring = session->Context->GetRing(); + if (!ring) { + return; + } + + // Recycle any provided-buffer-ring buffers that consumers have released. + session->MainRecvPool.DrainFreelist(); + + struct io_uring_cqe* cqes[BatchSize]; + unsigned count = io_uring_peek_batch_cqe(ring, cqes, BatchSize); + + for (unsigned i = 0; i < count; ++i) { + struct io_uring_cqe* cqe = cqes[i]; + const ui64 userData = io_uring_cqe_get_data64(cqe); + const EUringOpTag tag = static_cast<EUringOpTag>(userData & UringOpTagMask); + + switch (tag) { + case EUringOpTag::MainWritev: + case EUringOpTag::XdcWritev: + WriteCqes.fetch_add(1, std::memory_order_relaxed); + session->Context->DecrementPendingWrites(); + SendEv(session->Context->GetWriteActorId(), + new TEvUringWriteComplete(cqe->res, userData)); + break; + + case EUringOpTag::MainRecv: { + RecvCqes.fetch_add(1, std::memory_order_relaxed); + if (!(cqe->flags & IORING_CQE_F_MORE)) { + session->Context->DecrementPendingRecvs(); + } + TRcBuf data; + if (cqe->res > 0 && (cqe->flags & IORING_CQE_F_BUFFER)) { + ui16 bufIdx = cqe->flags >> IORING_CQE_BUFFER_SHIFT; + // Zero-copy: wrap the provided buffer directly; it is recycled + // back into the buffer ring when the last TRcBuf reference drops. + data = session->MainRecvPool.WrapBuffer(bufIdx, cqe->res); + } + SendEv(session->Context->GetReadActorId(), + new TEvUringRecvComplete(cqe->res, cqe->flags, userData, std::move(data))); + break; + } + + case EUringOpTag::XdcRecv: + RecvCqes.fetch_add(1, std::memory_order_relaxed); + // XDC recv reads directly into destination spans (no provided buffer); + // forward the byte count to the input session for post-processing. + SendEv(session->Context->GetReadActorId(), + new TEvUringRecvComplete(cqe->res, cqe->flags, userData, TRcBuf())); + break; + + case EUringOpTag::XdcSendZc: + if (cqe->flags & IORING_CQE_F_NOTIF) { + // Buffer-release notification: gates buffer reuse, but does not + // count against pending writes. + SendEv(session->Context->GetWriteActorId(), + new TEvUringSendZcNotif(userData)); + } else { + // Data CQE: the send operation itself has completed. The F_NOTIF + // CQE that follows only signals buffer release. + session->Context->DecrementPendingWrites(); + SendEv(session->Context->GetWriteActorId(), + new TEvUringWriteComplete(cqe->res, userData)); + } + break; + + case EUringOpTag::XdcSendNotif: + SendEv(session->Context->GetWriteActorId(), + new TEvUringSendZcNotif(userData)); + break; + + case EUringOpTag::CancelAll: + break; + } + } + + if (count > 0) { + TotalCqesReaped.fetch_add(count, std::memory_order_relaxed); + io_uring_cq_advance(ring, count); + } + } + + private: + static constexpr unsigned AnchorQueueDepth = 4096; + static constexpr unsigned BatchSize = 64; + // SQPOLL kernel-thread idle window (ms) before it sleeps; only used when SQPOLL is on. + static constexpr ui32 SqThreadIdleMs = 2000; + // Idle backstop period for the reaper wait. Must stay comfortably below the + // interconnect DeadPeerTimeout so a worst-case lost wakeup is recovered long before a + // session could be declared dead. + static constexpr long long BackstopPeriodNs = 100'000'000; // 100 ms + + TActorSystem* const ActorSystem; + const bool EnableSqpoll; + std::unique_ptr<struct io_uring> AnchorRing; + int AnchorRingFd = -1; + int ShutdownFd = -1; + int WakeFd = -1; // signalled by RemoveSession to wake the reaper for teardown + std::atomic<bool> Stop{false}; + + TMutex Mutex; // guards anchor SQ submissions + session maps + THashMap<int, TUringSessionRing*> EventFdToSession; + std::vector<std::unique_ptr<TUringSessionRing>> Sessions; + + TMutex RemovalMutex; // guards PendingRemoval + std::vector<int> PendingRemoval; + + // Off-thread destruction of removed session rings (see GraveyardProc). + std::thread GraveyardThread; + std::mutex GraveyardMutex; + std::condition_variable GraveyardCv; + std::vector<std::unique_ptr<TUringSessionRing>> Graveyard; + bool GraveyardStop = false; + + // Backing storage for the periodic backstop timeout SQE (see ArmTimeout); io_uring keeps + // a pointer to it, so it must outlive the in-flight timeout. Touched only with Mutex held. + struct __kernel_timespec BackstopTs{}; + + // Last time SweepAllSessions ran; throttles the time-based backstop. Reaper-thread only. + std::chrono::steady_clock::time_point LastSweep = std::chrono::steady_clock::now(); + + // Liveness counters (see GetStats). Written only by the reaper thread. + std::atomic<ui64> BackstopTicks{0}; // idle backstop sweeps performed + std::atomic<ui64> AnchorWakeups{0}; // woke on at least one anchor CQE + std::atomic<ui64> TotalCqesReaped{0}; // session-ring CQEs consumed across all sessions + std::atomic<ui64> RecvCqes{0}; // of which main/XDC recv completions + std::atomic<ui64> WriteCqes{0}; // of which writev/send_zc data completions + std::atomic<ui64> ActiveSessions{0}; // current live session rings + std::atomic<ui64> EmptyWakes{0}; // diag: wait returned but CQ was empty + std::atomic<ui64> LastAnchorUserData{0}; // diag: last non-empty anchor CQE user_data + std::atomic<ui64> PollWakeups{0}; // diag: session eventfd poll CQEs processed + }; + + class TUringPollerActor : public TActorBootstrapped<TUringPollerActor> { + std::unique_ptr<TUringReaper> Reaper; + const bool EnableSqpoll; + ui16 NextBufGroupId = 0; + + public: + static constexpr TDuration StatsLogPeriod = TDuration::Seconds(5); + + explicit TUringPollerActor(bool enableSqpoll) + : EnableSqpoll(enableSqpoll) + {} + + void Bootstrap() { + Reaper = std::make_unique<TUringReaper>(TActivationContext::ActorSystem(), EnableSqpoll); + Reaper->Start(); + Become(&TUringPollerActor::StateFunc); + Schedule(StatsLogPeriod, new TEvents::TEvWakeup()); + } + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + case TEvUringRegister::EventType: { + auto* x = reinterpret_cast<TEvUringRegister::TPtr*>(&ev); + Handle(*x); + break; + } + case TEvUringUnregister::EventType: { + auto* x = reinterpret_cast<TEvUringUnregister::TPtr*>(&ev); + Handle(*x); + break; + } + case TEvents::TSystem::Wakeup: + LogStats(); + Schedule(StatsLogPeriod, new TEvents::TEvWakeup()); + break; + case TEvents::TSystem::PoisonPill: + PassAway(); + break; + default: + break; + } + } + + // Periodic proof-of-life for the reaper thread plus enough counters to tell, after the + // fact, whether completions were flowing while sessions were being declared DeadPeer: + // if RecvCqes/WriteCqes keep climbing the reaper is healthy and the stall is upstream + // (e.g. the sender never submitted), whereas frozen counters with live sessions point at + // a reaper-thread stall. + void LogStats() { + const TUringReaper::TStats s = Reaper->GetStats(); + LOG_NOTICE(*TlsActivationContext, NActorsServices::INTERCONNECT, + "ICUR50 uring reaper stats activeSessions# %" PRIu64 " backstopTicks# %" PRIu64 + " anchorWakeups# %" PRIu64 " totalCqes# %" PRIu64 " recvCqes# %" PRIu64 + " writeCqes# %" PRIu64 " emptyWakes# %" PRIu64 " pollWakeups# %" PRIu64 + " lastUd# 0x%" PRIx64, + s.ActiveSessions, s.BackstopTicks, s.AnchorWakeups, s.TotalCqesReaped, + s.RecvCqes, s.WriteCqes, s.EmptyWakes, s.PollWakeups, s.LastAnchorUserData); + } + + void Handle(TEvUringRegister::TPtr& ev) { + auto* msg = ev->Get(); + + auto session = std::make_unique<TUringSessionRing>(); + session->MainSocketFd = msg->Socket ? msg->Socket->GetDescriptor() : -1; + session->XdcSocketFd = msg->XdcSocket ? msg->XdcSocket->GetDescriptor() : -1; + + auto context = MakeIntrusive<TUringContext>(msg->WriteActorId, msg->ReadActorId); + if (!context->Init(Reaper->GetAnchorRingFd(), EnableSqpoll)) { + // Ring creation failed: tell the session to fall back to the epoll TPollerActor + // so it is never left without an I/O backend. + Send(msg->WriteActorId, new TEvUringRegisterFailed()); + return; + } + + // Register socket fds as fixed files to avoid per-I/O fget/fput. This is a pure + // optimization; on failure the context transparently uses raw fds, so it is not + // fatal and does not force the epoll fallback. + context->RegisterFiles(session->MainSocketFd, session->XdcSocketFd); + + // Provided buffer ring for the main socket recv path. If this fails the recv + // multishot would loop forever on -ENOBUFS, so treat it as fatal and fall back. + ui16 mainBufGid = NextBufGroupId++; + ui16 xdcBufGid = NextBufGroupId++; + if (!session->MainRecvPool.Init(context->GetRing(), mainBufGid)) { + Send(msg->WriteActorId, new TEvUringRegisterFailed()); + return; + } + + session->Context = context; + // CRITICAL: publish the ring's eventfd so the reaper arms its anchor poll on THIS + // session's real, unique eventfd. Without it EventFd stayed -1: every session armed a + // multishot poll on the invalid fd -1 (POLLNVAL hot-spin) and all sessions collided on + // map key -1, so only the most recently registered session was ever reaped. On a + // multi-session node every other session got no completions delivered and was declared + // DeadPeer at the keepalive timeout (the cluster-wide meltdown); a 2-node loopback test + // has a single session and so accidentally worked. + session->EventFd = context->GetEventFd(); + + Reaper->AddSession(std::move(session)); + + Send(msg->WriteActorId, new TEvUringRegisterResult(context, mainBufGid, xdcBufGid)); + Send(msg->ReadActorId, new TEvUringRegisterResult(context, mainBufGid, xdcBufGid)); + } + + void Handle(TEvUringUnregister::TPtr& ev) { + Reaper->RemoveSession(ev->Get()->EventFd); + } + + void PassAway() override { + if (Reaper) { + Reaper->StopAndJoin(); + Reaper.reset(); + } + TActorBootstrapped::PassAway(); + } + }; + + IActor* CreateUringPollerActor(bool enableSqpoll) { + return new TUringPollerActor(enableSqpoll); + } + +} // namespace NActors diff --git a/ydb/library/actors/interconnect/poller/uring_poller_actor.h b/ydb/library/actors/interconnect/poller/uring_poller_actor.h new file mode 100644 index 00000000000..afce8fb9da1 --- /dev/null +++ b/ydb/library/actors/interconnect/poller/uring_poller_actor.h @@ -0,0 +1,106 @@ +#pragma once + +#include <ydb/library/actors/interconnect/events/events.h> +#include <ydb/library/actors/interconnect/uring_context.h> +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/util/rc_buf.h> + +#include "poller_actor.h" + +namespace NActors { + + struct TEvUringRegister : TEventLocal<TEvUringRegister, ui32(ENetwork::EvUringRegister)> { + TIntrusivePtr<TSharedDescriptor> Socket; + TIntrusivePtr<TSharedDescriptor> XdcSocket; + TActorId ReadActorId; + TActorId WriteActorId; + + TEvUringRegister(TIntrusivePtr<TSharedDescriptor> socket, + TIntrusivePtr<TSharedDescriptor> xdcSocket, + TActorId readActorId, + TActorId writeActorId) + : Socket(std::move(socket)) + , XdcSocket(std::move(xdcSocket)) + , ReadActorId(readActorId) + , WriteActorId(writeActorId) + {} + }; + + struct TEvUringRegisterResult : TEventLocal<TEvUringRegisterResult, ui32(ENetwork::EvUringRegisterResult)> { + TUringContext::TPtr Context; + ui16 MainRecvBufGroupId; + ui16 XdcRecvBufGroupId; + + TEvUringRegisterResult(TUringContext::TPtr context, ui16 mainBufGid, ui16 xdcBufGid) + : Context(std::move(context)) + , MainRecvBufGroupId(mainBufGid) + , XdcRecvBufGroupId(xdcBufGid) + {} + }; + + struct TEvUringWriteComplete : TEventLocal<TEvUringWriteComplete, ui32(ENetwork::EvUringWriteComplete)> { + i32 Result; + ui64 UserData; + + TEvUringWriteComplete(i32 result, ui64 userData) + : Result(result) + , UserData(userData) + {} + }; + + struct TEvUringRecvComplete : TEventLocal<TEvUringRecvComplete, ui32(ENetwork::EvUringRecvComplete)> { + i32 Result; + ui32 Flags; + ui64 UserData; + TRcBuf Data; + + TEvUringRecvComplete(i32 result, ui32 flags, ui64 userData) + : Result(result) + , Flags(flags) + , UserData(userData) + {} + + TEvUringRecvComplete(i32 result, ui32 flags, ui64 userData, TRcBuf data) + : Result(result) + , Flags(flags) + , UserData(userData) + , Data(std::move(data)) + {} + }; + + struct TEvUringSendZcNotif : TEventLocal<TEvUringSendZcNotif, ui32(ENetwork::EvUringSendZcNotif)> { + ui64 UserData; + + explicit TEvUringSendZcNotif(ui64 userData) + : UserData(userData) + {} + }; + + // Sent by the output session on teardown so the reaper can release the session ring + // (exit the ring, release the registered socket files, free the buffer ring, close the + // eventfd). EventFd identifies the session ring in the reaper's maps. + struct TEvUringUnregister : TEventLocal<TEvUringUnregister, ui32(ENetwork::EvUringUnregister)> { + int EventFd; + + explicit TEvUringUnregister(int eventFd) + : EventFd(eventFd) + {} + }; + + // Sent by the poller to the output session when io_uring setup failed: the session must + // fall back to registering its sockets with the epoll TPollerActor. + struct TEvUringRegisterFailed : TEventLocal<TEvUringRegisterFailed, ui32(ENetwork::EvUringRegisterFailed)> { + }; + +#ifdef __linux__ + IActor* CreateUringPollerActor(bool enableSqpoll); +#else + inline IActor* CreateUringPollerActor(bool /*enableSqpoll*/) { return nullptr; } +#endif + + inline TActorId MakeUringPollerActorId() { + char x[12] = {'I', 'C', 'U', 'r', 'i', 'n', 'g', 'P', '\xDE', '\xAD', '\xBE', '\xEF'}; + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + } + +} // namespace NActors diff --git a/ydb/library/actors/interconnect/poller/ya.make b/ydb/library/actors/interconnect/poller/ya.make index 6cd1f20811d..9322bd96b40 100644 --- a/ydb/library/actors/interconnect/poller/ya.make +++ b/ydb/library/actors/interconnect/poller/ya.make @@ -27,6 +27,11 @@ IF (OS_LINUX) SRCS( poller_tcp_unit_epoll.cpp poller_tcp_unit_epoll.h + uring_poller_actor.cpp + uring_poller_actor.h + ) + PEERDIR( + contrib/libs/liburing ) ENDIF() diff --git a/ydb/library/actors/interconnect/uring_context.cpp b/ydb/library/actors/interconnect/uring_context.cpp new file mode 100644 index 00000000000..b14fde1da65 --- /dev/null +++ b/ydb/library/actors/interconnect/uring_context.cpp @@ -0,0 +1,238 @@ +#include "uring_context.h" + +#include <liburing.h> +#include <sys/eventfd.h> +#include <unistd.h> + +namespace NActors { + + TUringContext::TUringContext(TActorId writeActorId, TActorId readActorId) + : WriteActorId(writeActorId) + , ReadActorId(readActorId) + { + EventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + Y_ABORT_UNLESS(EventFd >= 0, "eventfd() failed: %s", strerror(errno)); + } + + TUringContext::~TUringContext() { + if (Ring) { + io_uring_queue_exit(Ring.get()); + } + if (EventFd >= 0) { + close(EventFd); + } + } + + bool TUringContext::Init(int anchorWqFd, bool enableSqpoll) { + Ring = std::make_unique<struct io_uring>(); + + struct io_uring_params params = {}; + // SQPOLL is opt-in (TInterconnectSettings::EnableUringSQPOLL, default off). When off, + // submissions are issued explicitly via io_uring_submit() (io_uring_enter) on the + // session mailbox thread, which makes the writev complete and signal the eventfd + // deterministically with no dependency on a kernel poll thread, and avoids a busy + // poll thread. When on, the anchor ring also has SQPOLL and we set SQPOLL + ATTACH_WQ + // here (wq_fd = anchor), so all session rings + the anchor share ONE kernel poll thread + // (not one per session); liburing's io_uring_submit() wakes an idle SQPOLL thread via + // IORING_SQ_NEED_WAKEUP. Note: SQPOLL was originally suspected of causing an idle + // keepalive meltdown, but that was traced to unrelated reaper bugs (see uring_poller_actor.cpp). + if (enableSqpoll) { + params.flags |= IORING_SETUP_SQPOLL; + params.sq_thread_idle = SqThreadIdleMs; + } + if (anchorWqFd >= 0) { + // Share the async io-wq backend with the anchor ring so we do not spawn a separate + // worker pool per session ring. + params.flags |= IORING_SETUP_ATTACH_WQ; + params.wq_fd = anchorWqFd; + } + + int ret = io_uring_queue_init_params(DefaultQueueDepth, Ring.get(), ¶ms); + if (ret < 0) { + Ring.reset(); + return false; + } + + ret = io_uring_register_eventfd(Ring.get(), EventFd); + if (ret < 0) { + io_uring_queue_exit(Ring.get()); + Ring.reset(); + return false; + } + + return true; + } + + int TUringContext::GetRingFd() const { + return Ring ? Ring->ring_fd : -1; + } + + void TUringContext::RegisterFiles(int mainFd, int xdcFd) { + if (!Ring) { + return; + } + // Index 0 = main socket, index 1 = XDC socket (sparse if absent). + int fds[2] = {mainFd, xdcFd >= 0 ? xdcFd : -1}; + int ret = io_uring_register_files(Ring.get(), fds, 2); + if (ret < 0) { + FixedFiles = false; + return; + } + MainFd = mainFd; + XdcFd = xdcFd; + FixedFiles = true; + } + + bool TUringContext::SubmitWritev(int fd, const struct iovec* iov, unsigned iovcnt, ui64 seqNo, EUringOpTag tag) { + if (!Ring) { + return false; + } + + struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get()); + if (!sqe) { + ++SqeFull; + return false; + } + + const int fixedIdx = FixedIndexForFd(fd); + if (fixedIdx >= 0) { + io_uring_prep_writev(sqe, fixedIdx, iov, iovcnt, 0); + sqe->flags |= IOSQE_FIXED_FILE; + } else { + io_uring_prep_writev(sqe, fd, iov, iovcnt, 0); + } + io_uring_sqe_set_data64(sqe, static_cast<ui64>(tag) | (seqNo & UringOpDataMask)); + return true; + } + + bool TUringContext::SubmitReadv(int fd, const struct iovec* iov, unsigned iovcnt, ui64 seqNo, EUringOpTag tag) { + if (!Ring) { + return false; + } + + struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get()); + if (!sqe) { + ++SqeFull; + return false; + } + + const int fixedIdx = FixedIndexForFd(fd); + if (fixedIdx >= 0) { + io_uring_prep_readv(sqe, fixedIdx, iov, iovcnt, 0); + sqe->flags |= IOSQE_FIXED_FILE; + } else { + io_uring_prep_readv(sqe, fd, iov, iovcnt, 0); + } + io_uring_sqe_set_data64(sqe, static_cast<ui64>(tag) | (seqNo & UringOpDataMask)); + return true; + } + + bool TUringContext::SubmitSendZc(int fd, const void* buf, size_t len, ui64 seqNo) { + if (!Ring) { + return false; + } + + struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get()); + if (!sqe) { + ++SqeFull; + return false; + } + + const int fixedIdx = FixedIndexForFd(fd); + if (fixedIdx >= 0) { + io_uring_prep_send_zc(sqe, fixedIdx, buf, len, 0, 0); + sqe->flags |= IOSQE_FIXED_FILE; + } else { + io_uring_prep_send_zc(sqe, fd, buf, len, 0, 0); + } + io_uring_sqe_set_data64(sqe, static_cast<ui64>(EUringOpTag::XdcSendZc) | (seqNo & UringOpDataMask)); + return true; + } + + bool TUringContext::SubmitRecvMultishot(int fd, ui16 bufGroupId, EUringOpTag tag) { + if (!Ring) { + return false; + } + + struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get()); + if (!sqe) { + ++SqeFull; + return false; + } + + const int fixedIdx = FixedIndexForFd(fd); + if (fixedIdx >= 0) { + io_uring_prep_recv_multishot(sqe, fixedIdx, nullptr, 0, 0); + sqe->flags |= IOSQE_FIXED_FILE; + } else { + io_uring_prep_recv_multishot(sqe, fd, nullptr, 0, 0); + } + sqe->buf_group = bufGroupId; + sqe->flags |= IOSQE_BUFFER_SELECT; + io_uring_sqe_set_data64(sqe, static_cast<ui64>(tag)); + return true; + } + + bool TUringContext::SubmitCancelFd(int fd) { + if (!Ring) { + return false; + } + + struct io_uring_sqe* sqe = io_uring_get_sqe(Ring.get()); + if (!sqe) { + return false; + } + + const int fixedIdx = FixedIndexForFd(fd); + if (fixedIdx >= 0) { + io_uring_prep_cancel_fd(sqe, fixedIdx, IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_FD_FIXED); + } else { + io_uring_prep_cancel_fd(sqe, fd, IORING_ASYNC_CANCEL_ALL); + } + io_uring_sqe_set_data64(sqe, static_cast<ui64>(EUringOpTag::CancelAll)); + return true; + } + + void TUringContext::Flush() { + if (!Ring) { + return; + } + // Number of SQEs queued but not yet submitted. A healthy submit returns exactly this + // many; anything less means the kernel refused part of the batch (e.g. -EBUSY on CQ + // overflow), which would silently strand a keepalive writev and starve the peer. + const unsigned expected = io_uring_sq_ready(Ring.get()); + const int ret = io_uring_submit(Ring.get()); + ++SubmitCalls; + LastSubmitRet = ret; + if (ret < 0) { + ++SubmitErrors; + } else if (static_cast<unsigned>(ret) < expected) { + ++SubmitPartials; + } + } + + bool TUringContext::IsSupported() { + static const bool supported = [] { + // Probe a plain ring (without SQPOLL, which is opt-in and not required for the + // capability check) and require the provided-buffer-ring API, which is the hard + // dependency of the multishot recv data path. If SQPOLL is enabled but the kernel + // refuses the SQPOLL ring at Init time, the session falls back to epoll. + struct io_uring ring; + struct io_uring_params params = {}; + int ret = io_uring_queue_init_params(8, &ring, ¶ms); + if (ret != 0) { + return false; + } + int probeErr = 0; + struct io_uring_buf_ring* br = io_uring_setup_buf_ring(&ring, 1, /*bgid=*/0, 0, &probeErr); + const bool ok = (br != nullptr); + if (ok) { + io_uring_free_buf_ring(&ring, br, 1, /*bgid=*/0); + } + io_uring_queue_exit(&ring); + return ok; + }(); + return supported; + } + +} // namespace NActors diff --git a/ydb/library/actors/interconnect/uring_context.h b/ydb/library/actors/interconnect/uring_context.h new file mode 100644 index 00000000000..0c776214899 --- /dev/null +++ b/ydb/library/actors/interconnect/uring_context.h @@ -0,0 +1,178 @@ +#pragma once + +#include <ydb/library/actors/core/actorid.h> +#include <ydb/library/actors/interconnect/poller/poller.h> + +#include <util/generic/ptr.h> + +#include <atomic> +#include <cstdint> + +#ifdef __linux__ +#include <sys/uio.h> + +struct io_uring; +struct io_uring_buf_ring; +#endif + +namespace NActors { + + enum class EUringOpTag : ui64 { + MainWritev = 1ULL << 56, + XdcWritev = 2ULL << 56, + MainRecv = 3ULL << 56, + XdcRecv = 4ULL << 56, + XdcSendZc = 5ULL << 56, + XdcSendNotif = 6ULL << 56, + CancelAll = 7ULL << 56, + }; + + static constexpr ui64 UringOpTagMask = 0xFF00000000000000ULL; + static constexpr ui64 UringOpDataMask = ~UringOpTagMask; + + class TUringContext : public TThrRefBase { + public: + static constexpr ui32 DefaultQueueDepth = 64; + static constexpr ui32 MaxPendingWrites = 32; + // SQPOLL kernel-thread idle window (ms) before it sleeps; only used when SQPOLL is on. + static constexpr ui32 SqThreadIdleMs = 2000; + +#ifdef __linux__ + TUringContext(TActorId writeActorId, TActorId readActorId); + ~TUringContext(); + + bool Init(int anchorWqFd, bool enableSqpoll); + int GetEventFd() const { return EventFd; } + int GetRingFd() const; + + // Register the session socket fds as fixed files (index 0 = main, 1 = XDC) to avoid + // per-I/O fget/fput. Submit helpers below transparently use IOSQE_FIXED_FILE afterwards. + void RegisterFiles(int mainFd, int xdcFd); + + bool SubmitWritev(int fd, const struct iovec* iov, unsigned iovcnt, ui64 seqNo, EUringOpTag tag); + bool SubmitReadv(int fd, const struct iovec* iov, unsigned iovcnt, ui64 seqNo, EUringOpTag tag); + bool SubmitSendZc(int fd, const void* buf, size_t len, ui64 seqNo); + bool SubmitRecvMultishot(int fd, ui16 bufGroupId, EUringOpTag tag); + bool SubmitCancelFd(int fd); + void Flush(); + + // PendingWrites/PendingRecvs are incremented on the session mailbox thread (submission) + // and decremented on the reaper thread (completion), so they must be atomic. + ui32 GetPendingWrites() const { return PendingWrites.load(std::memory_order_relaxed); } + ui32 GetPendingRecvs() const { return PendingRecvs.load(std::memory_order_relaxed); } + + // Diagnostic submit accounting (instrumentation for the idle-keepalive DeadPeer hunt). + // All updated on the session mailbox thread inside Flush()/submit helpers. Plain ui64 is + // fine: read for logging on the same thread, or torn-read-tolerant from another thread. + ui64 GetSubmitCalls() const { return SubmitCalls; } + ui64 GetSubmitErrors() const { return SubmitErrors; } + ui64 GetSubmitPartials() const { return SubmitPartials; } + i32 GetLastSubmitRet() const { return LastSubmitRet; } + ui64 GetSqeFull() const { return SqeFull; } + void IncrementPendingWrites() { PendingWrites.fetch_add(1, std::memory_order_relaxed); } + void DecrementPendingWrites() { + ui32 v = PendingWrites.load(std::memory_order_relaxed); + while (v > 0 && !PendingWrites.compare_exchange_weak(v, v - 1, std::memory_order_relaxed)) {} + } + void IncrementPendingRecvs() { PendingRecvs.fetch_add(1, std::memory_order_relaxed); } + void DecrementPendingRecvs() { + ui32 v = PendingRecvs.load(std::memory_order_relaxed); + while (v > 0 && !PendingRecvs.compare_exchange_weak(v, v - 1, std::memory_order_relaxed)) {} + } + + TActorId GetWriteActorId() const { return WriteActorId; } + TActorId GetReadActorId() const { return ReadActorId; } + + struct io_uring* GetRing() { return Ring.get(); } + + static bool IsSupported(); +#else + TUringContext(TActorId, TActorId) {} + ~TUringContext() = default; + + bool Init(int, bool) { return false; } + int GetEventFd() const { return -1; } + int GetRingFd() const { return -1; } + + void RegisterFiles(int, int) {} + + bool SubmitWritev(int, const void*, unsigned, ui64, EUringOpTag) { return false; } + bool SubmitReadv(int, const void*, unsigned, ui64, EUringOpTag) { return false; } + bool SubmitSendZc(int, const void*, size_t, ui64) { return false; } + bool SubmitRecvMultishot(int, ui16, EUringOpTag) { return false; } + bool SubmitCancelFd(int) { return false; } + void Flush() {} + + ui32 GetPendingWrites() const { return 0; } + ui32 GetPendingRecvs() const { return 0; } + ui64 GetSubmitCalls() const { return 0; } + ui64 GetSubmitErrors() const { return 0; } + ui64 GetSubmitPartials() const { return 0; } + i32 GetLastSubmitRet() const { return 0; } + ui64 GetSqeFull() const { return 0; } + void IncrementPendingWrites() {} + void DecrementPendingWrites() {} + void IncrementPendingRecvs() {} + void DecrementPendingRecvs() {} + + TActorId GetWriteActorId() const { return {}; } + TActorId GetReadActorId() const { return {}; } + + void* GetRing() { return nullptr; } + + static bool IsSupported() { return false; } +#endif + + using TPtr = TIntrusivePtr<TUringContext>; + + private: +#ifdef __linux__ + // Translate a raw socket fd to its registered fixed-file index, or -1 if fixed + // files are not in use for this fd. + int FixedIndexForFd(int fd) const { + if (!FixedFiles) { + return -1; + } + if (fd == MainFd) { + return 0; + } + if (fd == XdcFd) { + return 1; + } + return -1; + } + + std::unique_ptr<struct io_uring> Ring; + int EventFd = -1; + TActorId WriteActorId; + TActorId ReadActorId; + std::atomic<ui32> PendingWrites{0}; + std::atomic<ui32> PendingRecvs{0}; + // Diagnostic submit accounting (see getters above). Mutated only on the session mailbox + // thread; read elsewhere only for best-effort logging, so no synchronization is needed. + ui64 SubmitCalls = 0; // number of io_uring_submit() calls via Flush() + ui64 SubmitErrors = 0; // io_uring_submit() returned < 0 + ui64 SubmitPartials = 0; // io_uring_submit() returned fewer than the queued SQE count + i32 LastSubmitRet = 0; // last io_uring_submit() return value + ui64 SqeFull = 0; // io_uring_get_sqe() returned nullptr (SQ full) in a submit helper + int MainFd = -1; + int XdcFd = -1; + bool FixedFiles = false; +#endif + }; + + class TEventFdWrapper : public TSharedDescriptor { + public: + TEventFdWrapper(int fd) + : Fd(fd) + {} + + int GetDescriptor() override { + return Fd; + } + + private: + int Fd; + }; + +} // namespace NActors diff --git a/ydb/library/actors/interconnect/uring_recv_buffer_pool.h b/ydb/library/actors/interconnect/uring_recv_buffer_pool.h new file mode 100644 index 00000000000..a1d436e815f --- /dev/null +++ b/ydb/library/actors/interconnect/uring_recv_buffer_pool.h @@ -0,0 +1,174 @@ +#pragma once + +#include <ydb/library/actors/util/rc_buf.h> + +#include <util/system/mutex.h> +#include <util/system/guard.h> +#include <util/generic/vector.h> + +#include <liburing.h> +#include <cstdlib> +#include <cstring> +#include <memory> + +namespace NActors { + + // Provided-buffer ring for io_uring multishot recv with true zero-copy delivery. + // + // The kernel fills one of the registered buffers and reports its index in the CQE. + // Instead of copying the bytes out, we hand the buffer to the input session wrapped + // in a TRcBuf backed by TUringRecvBufferChunk. When the last reference to that TRcBuf + // is dropped (possibly on an arbitrary worker thread, since the event payload outlives + // the input session), the chunk pushes the buffer index onto a thread-safe freelist. + // The reaper thread drains that freelist and re-adds the buffers to the kernel ring + // (DrainFreelist), keeping all io_uring_buf_ring_* calls on a single thread. + class TUringRecvBufferPool { + public: + static constexpr ui32 NumBuffers = 256; + static constexpr ui32 BufferSize = 16384; + + // Shared between the pool (reaper-owned) and any outstanding chunks. Keeps the + // backing memory alive until the last TRcBuf referencing it is released, even if + // the pool/session is torn down first. + struct TCore : public TThrRefBase { + char* Memory = nullptr; + TMutex FreeLock; + TVector<ui16> FreeList; // indices released by consumers, awaiting re-add to the ring + + ~TCore() { + free(Memory); + } + + char* GetBuffer(ui16 index) const { + return Memory + static_cast<size_t>(index) * BufferSize; + } + + void Release(ui16 index) { + with_lock (FreeLock) { + FreeList.push_back(index); + } + } + }; + + class TUringRecvBufferChunk : public IContiguousChunk { + public: + TUringRecvBufferChunk(TIntrusivePtr<TCore> core, ui16 index, ui32 len) + : Core(std::move(core)) + , Index(index) + , Len(len) + {} + + ~TUringRecvBufferChunk() override { + Core->Release(Index); + } + + TContiguousSpan GetData() const override { + return {Core->GetBuffer(Index), Len}; + } + + TMutableContiguousSpan UnsafeGetDataMut() override { + return {Core->GetBuffer(Index), Len}; + } + + size_t GetOccupiedMemorySize() const override { + return BufferSize; + } + + IContiguousChunk::TPtr Clone() override { + return MakeIntrusive<TOwnedCopyChunk>(TString(Core->GetBuffer(Index), Len)); + } + + private: + // Independent heap copy used when a private mutable copy is required. + class TOwnedCopyChunk : public IContiguousChunk { + public: + explicit TOwnedCopyChunk(TString data) + : Data(std::move(data)) + {} + TContiguousSpan GetData() const override { return {Data.data(), Data.size()}; } + TMutableContiguousSpan UnsafeGetDataMut() override { return {Data.Detach(), Data.size()}; } + size_t GetOccupiedMemorySize() const override { return Data.capacity(); } + IContiguousChunk::TPtr Clone() override { return MakeIntrusive<TOwnedCopyChunk>(Data); } + private: + TString Data; + }; + + TIntrusivePtr<TCore> Core; + ui16 Index; + ui32 Len; + }; + + TUringRecvBufferPool() = default; + + ~TUringRecvBufferPool() { + if (BufRing) { + io_uring_free_buf_ring(Ring, BufRing, NumBuffers, BufGroupId); + } + // Core (and its backing memory) is kept alive by any outstanding chunks. + } + + bool Init(struct io_uring* ring, ui16 bufGroupId) { + Ring = ring; + BufGroupId = bufGroupId; + + Core = MakeIntrusive<TCore>(); + Core->Memory = static_cast<char*>(aligned_alloc(4096, NumBuffers * BufferSize)); + if (!Core->Memory) { + Core.Reset(); + return false; + } + memset(Core->Memory, 0, NumBuffers * BufferSize); + + int ret = 0; + BufRing = io_uring_setup_buf_ring(ring, NumBuffers, bufGroupId, 0, &ret); + if (!BufRing) { + Core.Reset(); + return false; + } + + for (ui32 i = 0; i < NumBuffers; ++i) { + io_uring_buf_ring_add(BufRing, Core->GetBuffer(i), BufferSize, i, + io_uring_buf_ring_mask(NumBuffers), i); + } + io_uring_buf_ring_advance(BufRing, NumBuffers); + + return true; + } + + ui16 GetBufGroupId() const { + return BufGroupId; + } + + // Zero-copy: wrap the kernel-filled buffer; recycled when the TRcBuf is released. + TRcBuf WrapBuffer(ui16 index, ui32 len) { + return TRcBuf(MakeIntrusive<TUringRecvBufferChunk>(Core, index, len)); + } + + // Called on the reaper thread: return consumer-released buffers to the kernel ring. + void DrainFreelist() { + if (!BufRing || !Core) { + return; + } + TVector<ui16> local; + with_lock (Core->FreeLock) { + local.swap(Core->FreeList); + } + if (local.empty()) { + return; + } + const int mask = io_uring_buf_ring_mask(NumBuffers); + for (size_t j = 0; j < local.size(); ++j) { + const ui16 idx = local[j]; + io_uring_buf_ring_add(BufRing, Core->GetBuffer(idx), BufferSize, idx, mask, j); + } + io_uring_buf_ring_advance(BufRing, local.size()); + } + + private: + struct io_uring* Ring = nullptr; + struct io_uring_buf_ring* BufRing = nullptr; + TIntrusivePtr<TCore> Core; + ui16 BufGroupId = 0; + }; + +} // namespace NActors diff --git a/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h b/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h index 5f95b246e76..dfae5616fb0 100644 --- a/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h @@ -24,6 +24,8 @@ public: USE_TLS = 1 << 1, RDMA_POLLING_CQ = 1 << 2, DISABLE_RDMA = 1 << 3, + USE_URING = 1 << 4, + USE_URING_SQPOLL = 1 << 5, // implies USE_URING + EnableUringSQPOLL }; using TCheckerFactory = std::function<IActor*(ui32)>; @@ -80,6 +82,18 @@ public: } } + auto effectiveSettingsCustomizer = [flags, settingsCustomizer](ui32 nodeId, NActors::TInterconnectSettings& settings) { + if (flags & (USE_URING | USE_URING_SQPOLL)) { + settings.UseUring = true; + } + if (flags & USE_URING_SQPOLL) { + settings.EnableUringSQPOLL = true; + } + if (settingsCustomizer) { + settingsCustomizer(nodeId, settings); + } + }; + for (ui32 i = 1; i <= NumNodes; ++i) { auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap; Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig, @@ -87,7 +101,7 @@ public: flags & USE_ZC ? ESocketSendOptimization::IC_MSG_ZEROCOPY : ESocketSendOptimization::DISABLED, flags & USE_TLS, checkerFactory, flags & RDMA_POLLING_CQ ? NInterconnect::NRdma::ECqMode::POLLING : NInterconnect::NRdma::ECqMode::EVENT, !(flags & DISABLE_RDMA), - settingsCustomizer, + effectiveSettingsCustomizer, LogBackendFactory)); } } diff --git a/ydb/library/actors/interconnect/ut/lib/node.h b/ydb/library/actors/interconnect/ut/lib/node.h index b5796eb17b9..781e09610b7 100644 --- a/ydb/library/actors/interconnect/ut/lib/node.h +++ b/ydb/library/actors/interconnect/ut/lib/node.h @@ -11,6 +11,7 @@ #include <ydb/library/actors/interconnect/interconnect_tcp_server.h> #include <ydb/library/actors/interconnect/interconnect_tcp_proxy.h> #include <ydb/library/actors/interconnect/interconnect_proxy_wrapper.h> +#include <ydb/library/actors/interconnect/poller/uring_poller_actor.h> #include <ydb/library/actors/interconnect/rdma/mem_pool.h> #include <ydb/library/actors/interconnect/rdma/cq_actor/cq_actor.h> @@ -109,6 +110,10 @@ public: setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(counters), TMailboxType::ReadAsFilled, 0)); + if (common->Settings.UseUring && TUringContext::IsSupported()) { + setup.LocalServices.emplace_back(MakeUringPollerActorId(), TActorSetupCmd(CreateUringPollerActor(common->Settings.EnableUringSQPOLL), + TMailboxType::ReadAsFilled, 0)); + } setup.LocalServices.emplace_back(NInterconnect::NRdma::MakeCqActorId(), TActorSetupCmd(NInterconnect::NRdma::CreateCqActor(-1, 1024, rdmaCqMode, nullptr), TMailboxType::ReadAsFilled, 0)); diff --git a/ydb/library/actors/interconnect/ut/uring_ut.cpp b/ydb/library/actors/interconnect/ut/uring_ut.cpp new file mode 100644 index 00000000000..ffc24d1a712 --- /dev/null +++ b/ydb/library/actors/interconnect/ut/uring_ut.cpp @@ -0,0 +1,499 @@ +#include <ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h> +#include <ydb/library/actors/interconnect/uring_context.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <atomic> + +using namespace NActors; + +Y_UNIT_TEST_SUITE(InterconnectUring) { + + // Every uring data-path test runs in three modes so the SQPOLL path stays covered: + // *_NoUring - epoll backend (UseUring off): a control that the test itself is sound + // *_Uring - io_uring, explicit submit (UseUring on, SQPOLL off; the default) + // *_UringSqpoll - io_uring with the shared SQPOLL kernel poll thread (EnableUringSQPOLL on) + // The bodies are extracted into Do<Name>(mode) and emitted via URING_TEST_3MODES. + + // Skip only the uring variants when the kernel lacks io_uring; the no-uring variant always + // runs. (If SQPOLL itself is unavailable the session falls back to epoll, so the SQPOLL + // variant still passes functionally.) + static bool UringUnavailable(TTestICCluster::Flags mode) { + const bool wantUring = mode & (TTestICCluster::USE_URING | TTestICCluster::USE_URING_SQPOLL); + if (wantUring && !TUringContext::IsSupported()) { + Cerr << "io_uring not supported on this kernel, skipping" << Endl; + return true; + } + return false; + } + + static TTestICCluster::Flags WithMode(TTestICCluster::Flags base, TTestICCluster::Flags mode) { + return static_cast<TTestICCluster::Flags>(base | mode); + } + +#define URING_TEST_3MODES(Name) \ + Y_UNIT_TEST(Name##_NoUring) { Do##Name(TTestICCluster::EMPTY); } \ + Y_UNIT_TEST(Name##_Uring) { Do##Name(TTestICCluster::USE_URING); } \ + Y_UNIT_TEST(Name##_UringSqpoll) { Do##Name(static_cast<TTestICCluster::Flags>(TTestICCluster::USE_URING | TTestICCluster::USE_URING_SQPOLL)); } + + Y_UNIT_TEST(UringSupported) { + // Just verify the probe runs without crashing + [[maybe_unused]] bool supported = TUringContext::IsSupported(); + } + + void DoBasicSendRecv(TTestICCluster::Flags mode) { + if (UringUnavailable(mode)) { + return; + } + + TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode); + + // Simple ping-pong: send an event from node 1 to node 2 and get a response + class TPingActor : public TActorBootstrapped<TPingActor> { + public: + TPingActor(TActorId target, std::atomic<int>& counter) + : Target(target) + , Counter(counter) + {} + + void Bootstrap() { + Send(Target, new TEvents::TEvPing); + Become(&TPingActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + cFunc(TEvents::TEvPong::EventType, HandlePong) + ) + + void HandlePong() { + Counter.fetch_add(1); + PassAway(); + } + + private: + TActorId Target; + std::atomic<int>& Counter; + }; + + class TPongActor : public TActorBootstrapped<TPongActor> { + public: + void Bootstrap() { + Become(&TPongActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvPing, Handle) + ) + + void Handle(TEvents::TEvPing::TPtr& ev) { + Send(ev->Sender, new TEvents::TEvPong); + } + }; + + std::atomic<int> counter{0}; + + TActorId pongId = cluster.RegisterActor(new TPongActor, 2); + cluster.RegisterActor(new TPingActor(pongId, counter), 1); + + const TInstant deadline = TInstant::Now() + TDuration::Seconds(10); + while (counter.load() == 0 && TInstant::Now() < deadline) { + Sleep(TDuration::MilliSeconds(50)); + } + + UNIT_ASSERT_VALUES_EQUAL(counter.load(), 1); + } + URING_TEST_3MODES(BasicSendRecv) + + void DoLargeMessage(TTestICCluster::Flags mode) { + if (UringUnavailable(mode)) { + return; + } + + TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode); + + class TReceiver : public TActorBootstrapped<TReceiver> { + public: + TReceiver(std::atomic<size_t>& bytesReceived) + : BytesReceived(bytesReceived) + {} + + void Bootstrap() { + Become(&TReceiver::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvBlob, Handle) + ) + + void Handle(TEvents::TEvBlob::TPtr& ev) { + BytesReceived.fetch_add(ev->Get()->Blob.size()); + } + + private: + std::atomic<size_t>& BytesReceived; + }; + + std::atomic<size_t> bytesReceived{0}; + const size_t messageSize = 1024 * 1024; // 1 MB + + TActorId receiverId = cluster.RegisterActor(new TReceiver(bytesReceived), 2); + + // Send a large blob from node 1 to node 2 + TString data(messageSize, 'X'); + cluster.GetNode(1)->Send(receiverId, new TEvents::TEvBlob(data)); + + const TInstant deadline = TInstant::Now() + TDuration::Seconds(30); + while (bytesReceived.load() < messageSize && TInstant::Now() < deadline) { + Sleep(TDuration::MilliSeconds(100)); + } + + UNIT_ASSERT_VALUES_EQUAL(bytesReceived.load(), messageSize); + } + URING_TEST_3MODES(LargeMessage) + + void DoMultipleMessages(TTestICCluster::Flags mode) { + if (UringUnavailable(mode)) { + return; + } + + TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode); + + class TCountingReceiver : public TActorBootstrapped<TCountingReceiver> { + public: + TCountingReceiver(std::atomic<int>& counter) + : Counter(counter) + {} + + void Bootstrap() { + Become(&TCountingReceiver::StateFunc); + } + + STRICT_STFUNC(StateFunc, + cFunc(TEvents::TEvPing::EventType, HandlePing) + ) + + void HandlePing() { + Counter.fetch_add(1); + } + + private: + std::atomic<int>& Counter; + }; + + std::atomic<int> counter{0}; + const int numMessages = 1000; + + TActorId receiverId = cluster.RegisterActor(new TCountingReceiver(counter), 2); + + for (int i = 0; i < numMessages; ++i) { + cluster.GetNode(1)->Send(receiverId, new TEvents::TEvPing); + } + + const TInstant deadline = TInstant::Now() + TDuration::Seconds(30); + while (counter.load() < numMessages && TInstant::Now() < deadline) { + Sleep(TDuration::MilliSeconds(100)); + } + + UNIT_ASSERT_VALUES_EQUAL(counter.load(), numMessages); + } + URING_TEST_3MODES(MultipleMessages) + + void DoSustainedStream(TTestICCluster::Flags mode) { + // Stream a large amount of data through many messages to force the provided-buffer + // recv ring to recycle buffers repeatedly (and exercise the ENOBUFS re-arm path). + if (UringUnavailable(mode)) { + return; + } + + TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode); + + class TReceiver : public TActorBootstrapped<TReceiver> { + public: + TReceiver(std::atomic<size_t>& bytesReceived) + : BytesReceived(bytesReceived) + {} + void Bootstrap() { Become(&TReceiver::StateFunc); } + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvBlob, Handle) + ) + void Handle(TEvents::TEvBlob::TPtr& ev) { + BytesReceived.fetch_add(ev->Get()->Blob.size()); + } + private: + std::atomic<size_t>& BytesReceived; + }; + + std::atomic<size_t> bytesReceived{0}; + const size_t messageSize = 64 * 1024; + const int numMessages = 1000; + const size_t totalBytes = messageSize * numMessages; + + TActorId receiverId = cluster.RegisterActor(new TReceiver(bytesReceived), 2); + + TString data(messageSize, 'Z'); + for (int i = 0; i < numMessages; ++i) { + cluster.GetNode(1)->Send(receiverId, new TEvents::TEvBlob(data)); + } + + const TInstant deadline = TInstant::Now() + TDuration::Seconds(60); + while (bytesReceived.load() < totalBytes && TInstant::Now() < deadline) { + Sleep(TDuration::MilliSeconds(100)); + } + + UNIT_ASSERT_VALUES_EQUAL(bytesReceived.load(), totalBytes); + } + URING_TEST_3MODES(SustainedStream) + + void DoZeroCopyLargeMessages(TTestICCluster::Flags mode) { + // Exercise the io_uring IORING_OP_SEND_ZC path (XDC) together with the NOTIF-gated + // buffer lifetime. Content is verified to catch any premature buffer reuse. + if (UringUnavailable(mode)) { + return; + } + + TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, + WithMode(TTestICCluster::USE_ZC, mode)); + + class TVerifier : public TActorBootstrapped<TVerifier> { + public: + TVerifier(std::atomic<int>& ok, std::atomic<int>& bad, char fill) + : Ok(ok) + , Bad(bad) + , Fill(fill) + {} + void Bootstrap() { Become(&TVerifier::StateFunc); } + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvBlob, Handle) + ) + void Handle(TEvents::TEvBlob::TPtr& ev) { + const TString& blob = ev->Get()->Blob; + bool good = true; + for (char c : blob) { + if (c != Fill) { + good = false; + break; + } + } + (good ? Ok : Bad).fetch_add(1); + } + private: + std::atomic<int>& Ok; + std::atomic<int>& Bad; + const char Fill; + }; + + std::atomic<int> ok{0}; + std::atomic<int> bad{0}; + const char fill = 'Q'; + const size_t messageSize = 512 * 1024; + const int numMessages = 50; + + TActorId receiverId = cluster.RegisterActor(new TVerifier(ok, bad, fill), 2); + + TString data(messageSize, fill); + for (int i = 0; i < numMessages; ++i) { + cluster.GetNode(1)->Send(receiverId, new TEvents::TEvBlob(data)); + } + + const TInstant deadline = TInstant::Now() + TDuration::Seconds(60); + while (ok.load() + bad.load() < numMessages && TInstant::Now() < deadline) { + Sleep(TDuration::MilliSeconds(100)); + } + + UNIT_ASSERT_VALUES_EQUAL(bad.load(), 0); + UNIT_ASSERT_VALUES_EQUAL(ok.load(), numMessages); + } + URING_TEST_3MODES(ZeroCopyLargeMessages) + + void DoSessionChurnReconnect(TTestICCluster::Flags mode) { + // Repeatedly tear the session down (TEvPoisonSession) and verify the connection + // re-establishes and delivers every round. Before the reaper learned to release + // session rings, each churn leaked an io_uring fd + eventfd + buffer ring and held + // the socket open, so after a handful of cycles the node could no longer connect + // (fd exhaustion / anchor-SQ abort) — exactly the cluster meltdown this guards. + if (UringUnavailable(mode)) { + return; + } + + TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, mode); + + class TPongActor : public TActorBootstrapped<TPongActor> { + public: + void Bootstrap() { Become(&TPongActor::StateFunc); } + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvPing, Handle) + ) + void Handle(TEvents::TEvPing::TPtr& ev) { + Send(ev->Sender, new TEvents::TEvPong); + } + }; + + class TPingActor : public TActorBootstrapped<TPingActor> { + public: + TPingActor(TActorId target, std::atomic<int>& counter) + : Target(target) + , Counter(counter) + {} + void Bootstrap() { + Send(Target, new TEvents::TEvPing, IEventHandle::FlagTrackDelivery); + Become(&TPingActor::StateFunc); + } + STRICT_STFUNC(StateFunc, + cFunc(TEvents::TEvPong::EventType, HandlePong) + ) + void HandlePong() { + Counter.fetch_add(1); + PassAway(); + } + private: + TActorId Target; + std::atomic<int>& Counter; + }; + + TActorId pongId = cluster.RegisterActor(new TPongActor, 2); + + const int rounds = 40; + for (int round = 0; round < rounds; ++round) { + std::atomic<int> counter{0}; + cluster.RegisterActor(new TPingActor(pongId, counter), 1); + + const TInstant deadline = TInstant::Now() + TDuration::Seconds(20); + while (counter.load() == 0 && TInstant::Now() < deadline) { + Sleep(TDuration::MilliSeconds(20)); + } + UNIT_ASSERT_C(counter.load() == 1, + "round " << round << ": ping/pong did not complete (reconnect failed?)"); + + // Force a session teardown so the next round must reconnect over a fresh ring. + cluster.GetNode(1)->Send(cluster.InterconnectProxy(2, 1), new TEvInterconnect::TEvPoisonSession); + Sleep(TDuration::MilliSeconds(50)); + } + } + URING_TEST_3MODES(SessionChurnReconnect) + + void DoIdleKeepAlive(TTestICCluster::Flags mode) { + // Regression test for the io_uring idle-keepalive (DeadPeer) cluster meltdown. + // + // An idle connection exchanges nothing but periodic keepalive pings (every PingPeriod, + // default 3s). The original cluster meltdown was traced to reaper bugs (eventfd never + // registered, the anchor poll going silent after one fire) that starved completions on + // idle sessions until the peer declared DeadPeer. This guards that an idle session is + // kept alive across several ping cycles in every backend mode (epoll, uring, uring+SQPOLL). + // + // The settings below make this deterministic: PingPeriod (3s) < DeadPeer (6s) so a + // healthy connection (one whose pings keep flowing) is never torn down. We subscribe to + // the peer (establishing a session but sending no payload) and sit idle across several + // ping/DeadPeer cycles, asserting the session is never declared dead. + if (UringUnavailable(mode)) { + return; + } + + TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, + mode, /*checkerFactory=*/{}, /*deadPeerTimeout=*/TDuration::Seconds(6)); + + class TConnSubscriber : public TActorBootstrapped<TConnSubscriber> { + public: + TConnSubscriber(ui32 peerNodeId, std::atomic<int>& connects, std::atomic<int>& disconnects) + : PeerNodeId(peerNodeId) + , Connects(connects) + , Disconnects(disconnects) + {} + void Bootstrap() { + Become(&TConnSubscriber::StateFunc); + Send(TActivationContext::InterconnectProxy(PeerNodeId), new TEvents::TEvSubscribe); + } + STRICT_STFUNC(StateFunc, + cFunc(TEvInterconnect::TEvNodeConnected::EventType, HandleConnected) + cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected) + ) + void HandleConnected() { Connects.fetch_add(1); } + void HandleDisconnected() { Disconnects.fetch_add(1); } + private: + const ui32 PeerNodeId; + std::atomic<int>& Connects; + std::atomic<int>& Disconnects; + }; + + std::atomic<int> connects{0}; + std::atomic<int> disconnects{0}; + + cluster.RegisterActor(new TConnSubscriber(2, connects, disconnects), 1); + + // Wait for the session to come up. + const TInstant connectDeadline = TInstant::Now() + TDuration::Seconds(10); + while (connects.load() == 0 && TInstant::Now() < connectDeadline) { + Sleep(TDuration::MilliSeconds(20)); + } + UNIT_ASSERT_C(connects.load() >= 1, "session never connected"); + + // Baseline right before going idle, then stay idle across several ping (3s) and + // DeadPeer (6s) cycles. A healthy keepalive keeps the session up the whole time. + const int connectsBaseline = connects.load(); + const int disconnectsBaseline = disconnects.load(); + + Sleep(TDuration::Seconds(16)); + + UNIT_ASSERT_C(disconnects.load() == disconnectsBaseline, + "session was torn down while idle (keepalive stall): disconnects " + << disconnectsBaseline << " -> " << disconnects.load()); + UNIT_ASSERT_C(connects.load() == connectsBaseline, + "session reconnected while idle (keepalive stall): connects " + << connectsBaseline << " -> " << connects.load()); + } + URING_TEST_3MODES(IdleKeepAlive) + + Y_UNIT_TEST(FallbackToEpollWithEncryption) { + // When TLS is enabled, even with UseUring=true, the session should + // fall back to the epoll path. This test verifies no crash occurs. + if (!TUringContext::IsSupported()) { + Cerr << "io_uring not supported on this kernel, skipping" << Endl; + return; + } + + auto flags = static_cast<TTestICCluster::Flags>( + TTestICCluster::USE_URING | TTestICCluster::USE_TLS); + TTestICCluster cluster(2, NActors::TChannelsConfig(), nullptr, nullptr, flags); + + std::atomic<int> counter{0}; + + class TPongActor : public TActorBootstrapped<TPongActor> { + public: + void Bootstrap() { Become(&TPongActor::StateFunc); } + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvPing, Handle) + ) + void Handle(TEvents::TEvPing::TPtr& ev) { + Send(ev->Sender, new TEvents::TEvPong); + } + }; + + class TPingActor : public TActorBootstrapped<TPingActor> { + public: + TPingActor(TActorId target, std::atomic<int>& c) + : Target(target), Counter(c) {} + void Bootstrap() { + Send(Target, new TEvents::TEvPing); + Become(&TPingActor::StateFunc); + } + STRICT_STFUNC(StateFunc, + cFunc(TEvents::TEvPong::EventType, HandlePong) + ) + void HandlePong() { + Counter.fetch_add(1); + PassAway(); + } + private: + TActorId Target; + std::atomic<int>& Counter; + }; + + TActorId pongId = cluster.RegisterActor(new TPongActor, 2); + cluster.RegisterActor(new TPingActor(pongId, counter), 1); + + const TInstant deadline = TInstant::Now() + TDuration::Seconds(10); + while (counter.load() == 0 && TInstant::Now() < deadline) { + Sleep(TDuration::MilliSeconds(50)); + } + + UNIT_ASSERT_VALUES_EQUAL(counter.load(), 1); + } +} diff --git a/ydb/library/actors/interconnect/ut/ya.make b/ydb/library/actors/interconnect/ut/ya.make index 3d073111acc..2c75a693b06 100644 --- a/ydb/library/actors/interconnect/ut/ya.make +++ b/ydb/library/actors/interconnect/ut/ya.make @@ -19,6 +19,7 @@ SRCS( poller_actor_ut.cpp dynamic_proxy_ut.cpp sticking_ut.cpp + uring_ut.cpp ) PEERDIR( diff --git a/ydb/library/actors/interconnect/ya.make b/ydb/library/actors/interconnect/ya.make index 6291eeba70f..b1ee9208ca6 100644 --- a/ydb/library/actors/interconnect/ya.make +++ b/ydb/library/actors/interconnect/ya.make @@ -58,6 +58,17 @@ SRCS( watchdog_timer.h ) +IF (OS_LINUX) + SRCS( + uring_context.cpp + uring_context.h + uring_recv_buffer_pool.h + ) + PEERDIR( + contrib/libs/liburing + ) +ENDIF() + PEERDIR( contrib/libs/libc_compat contrib/libs/openssl diff --git a/ydb/tools/stress_tool/device_test_tool.cpp b/ydb/tools/stress_tool/device_test_tool.cpp index 9b147bd6d50..82da4da11e2 100644 --- a/ydb/tools/stress_tool/device_test_tool.cpp +++ b/ydb/tools/stress_tool/device_test_tool.cpp @@ -167,6 +167,9 @@ int main(int argc, char **argv) { .RequiredArgument("PORT").StoreResult(&icPort).Hidden(); opts.AddLongOption("num-server-devices", "number of devices per server (default 1)") .RequiredArgument("N").DefaultValue("1").Hidden(); + bool useUring = false; + opts.AddLongOption("use-uring", "use io_uring transport for interconnect instead of epoll (Linux 5.19+)") + .StoreTrue(&useUring).NoArgument().Hidden(); { const size_t kCol = 26; @@ -190,7 +193,9 @@ int main(int argc, char **argv) { << row("ic-port PORT", "interconnect port for server to listen on (server only)") << row("num-server-devices N", - "number of devices per server (default 1)"); + "number of devices per server (default 1)") + << row("use-uring", + "use io_uring transport for interconnect instead of epoll (Linux 5.19+)"); opts.AddSection("DDisk client/server options", ddiskHelp); } @@ -293,7 +298,7 @@ int main(int argc, char **argv) { if (serverMode) { InstallServerSignalHandler(); auto printer = MakeIntrusive<NKikimr::TResultPrinter>(config.OutputFormat, config.RunCount); - THolder<NKikimr::TPerfTest> test(new NKikimr::TDDiskServer<>(config, testProto, serverNodeId, clientNodeId, icPort)); + THolder<NKikimr::TPerfTest> test(new NKikimr::TDDiskServer<>(config, testProto, serverNodeId, clientNodeId, icPort, useUring)); test->SetPrinter(printer); test->RunTest(); return 0; @@ -330,7 +335,7 @@ int main(int argc, char **argv) { overrideDDiskInFlight(testProto, inFlight); for (ui32 run = 0; run < config.RunCount; ++run) { THolder<NKikimr::TPerfTest> test( - new NKikimr::TDDiskClient(config, testProto, clientNodeId, serverPeers, numServerDevices)); + new NKikimr::TDDiskClient(config, testProto, clientNodeId, serverPeers, numServerDevices, useUring)); test->SetPrinter(printer); test->RunTest(); } @@ -338,7 +343,7 @@ int main(int argc, char **argv) { } else { for (ui32 run = 0; run < config.RunCount; ++run) { THolder<NKikimr::TPerfTest> test( - new NKikimr::TDDiskClient(config, testProto, clientNodeId, serverPeers, numServerDevices)); + new NKikimr::TDDiskClient(config, testProto, clientNodeId, serverPeers, numServerDevices, useUring)); test->SetPrinter(printer); test->RunTest(); } diff --git a/ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h b/ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h index cc8a063e56c..2de9fe09452 100644 --- a/ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h +++ b/ydb/tools/stress_tool/device_test_tool_ddisk_client_server.h @@ -11,6 +11,7 @@ #include <ydb/library/actors/interconnect/interconnect_proxy_wrapper.h> #include <ydb/library/actors/interconnect/handshake_broker.h> #include <ydb/library/actors/interconnect/poller/poller_actor.h> +#include <ydb/library/actors/interconnect/poller/uring_poller_actor.h> #include <util/system/event.h> @@ -27,7 +28,7 @@ struct TInterconnectPeer { }; static TInterconnectProxyCommon::TPtr MakeInterconnectCommon( - const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, ui32 selfNodeId) { + const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, ui32 selfNodeId, bool useUring = false) { auto common = MakeIntrusive<TInterconnectProxyCommon>(); common->NameserviceId = GetNameserviceActorId(); common->MonCounters = counters->GetSubgroup("nodeId", ToString(selfNodeId)); @@ -41,6 +42,7 @@ static TInterconnectProxyCommon::TPtr MakeInterconnectCommon( common->Settings.TotalInflightAmountOfData = 256ull * 1024 * 1024; common->Settings.TCPSocketBufferSize = 8 * 1024 * 1024; common->Settings.EnableExternalDataChannel = true; + common->Settings.UseUring = useUring; common->OutgoingHandshakeInflightLimit = 3; return common; } @@ -73,6 +75,12 @@ static void SetupInterconnectServices(TActorSystemSetup* setup, MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, InterconnectPoolId)); + if (common->Settings.UseUring && TUringContext::IsSupported()) { + setup->LocalServices.emplace_back( + MakeUringPollerActorId(), + TActorSetupCmd(CreateUringPollerActor(common->Settings.EnableUringSQPOLL), TMailboxType::ReadAsFilled, InterconnectPoolId)); + } + setup->LocalServices.emplace_back( MakeHandshakeBrokerOutId(), TActorSetupCmd(CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit), @@ -107,15 +115,17 @@ struct TDDiskServer : public TPDiskTest<ChunkSize> { ui32 ServerNodeId; ui32 ClientNodeId; ui16 Port; + bool UseUring; static constexpr ui32 DDiskSlotId = 1; TDDiskServer(const TPerfTestConfig& cfg, const NDevicePerfTest::TDDiskTest& testProto, - ui32 serverNodeId, ui32 clientNodeId, ui16 port) + ui32 serverNodeId, ui32 clientNodeId, ui16 port, bool useUring = false) : TBase(cfg, DefaultPDiskTestProto()) , DDiskTestProto(testProto) , ServerNodeId(serverNodeId) , ClientNodeId(clientNodeId) , Port(port) + , UseUring(useUring) { // Re-key PDisk actor IDs and the logger to ServerNodeId before Init() runs; // otherwise local sends from DDisk -> PDisk/logger get routed through @@ -193,7 +203,7 @@ struct TDDiskServer : public TPDiskTest<ChunkSize> { // Server only accepts incoming connections, but the interconnect handshake still // resolves the peer NodeId via the local nameserver, so we must register the // client with a placeholder address (port 0; server never initiates connection). - auto common = MakeInterconnectCommon(TBase::Counters, ServerNodeId); + auto common = MakeInterconnectCommon(TBase::Counters, ServerNodeId, UseUring); TVector<TInterconnectPeer> peers = {{ClientNodeId, "::", 0}}; SetupInterconnectServices(TBase::Setup.Get(), common, ServerNodeId, @@ -245,10 +255,11 @@ struct TDDiskClient : public TPerfTest { ui32 ClientNodeId; TVector<TInterconnectPeer> ServerPeers; ui32 NumDevicesPerServer; + bool UseUring; TDDiskClient(const TPerfTestConfig& cfg, const NDevicePerfTest::TDDiskTest& testProto, ui32 clientNodeId, const TVector<TInterconnectPeer>& serverPeers, - ui32 numDevicesPerServer) + ui32 numDevicesPerServer, bool useUring = false) : TPerfTest(cfg) , Setup(new TActorSystemSetup()) , LogSettings(new NActors::NLog::TSettings(NActors::TActorId(clientNodeId, "logger"), @@ -261,6 +272,7 @@ struct TDDiskClient : public TPerfTest { , ClientNodeId(clientNodeId) , ServerPeers(serverPeers) , NumDevicesPerServer(numDevicesPerServer) + , UseUring(useUring) { } @@ -279,7 +291,7 @@ struct TDDiskClient : public TPerfTest { Setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(64, 20))); // Set up interconnect with all server peers - auto common = MakeInterconnectCommon(Counters, ClientNodeId); + auto common = MakeInterconnectCommon(Counters, ClientNodeId, UseUring); SetupInterconnectServices(Setup.Get(), common, ClientNodeId, "::", 0, ServerPeers, /*listen=*/false); |
