diff options
| author | Daniil Cherednik <[email protected]> | 2025-04-29 23:15:37 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-04-29 21:15:37 +0000 |
| commit | e02f5d68baec21992891c3050da8495383736756 (patch) | |
| tree | f31117d67545885d79a3ad4d43db6b8f4fb0133f | |
| parent | fa086bc82e23cbcb717a32196a275951b88e8ddd (diff) | |
Fix windows compatibility. (#17872)
Missed #ifdef has been added. MSG_ZEROCOPY related code works only on linux.
4 files changed, 51 insertions, 13 deletions
diff --git a/ydb/library/actors/interconnect/interconnect_stream.cpp b/ydb/library/actors/interconnect/interconnect_stream.cpp index 6cc128613f9..9a5575bd77f 100644 --- a/ydb/library/actors/interconnect/interconnect_stream.cpp +++ b/ydb/library/actors/interconnect/interconnect_stream.cpp @@ -125,6 +125,7 @@ namespace NInterconnect { return ret; } +#if defined(__linux__) ssize_t TStreamSocket::RecvErrQueue(struct msghdr* msg) const { const auto ret = ::recvmsg(Descriptor, msg, MSG_ERRQUEUE); @@ -133,6 +134,7 @@ namespace NInterconnect { return ret; } +#endif ssize_t TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const { diff --git a/ydb/library/actors/interconnect/interconnect_stream.h b/ydb/library/actors/interconnect/interconnect_stream.h index f5bbb1c184b..0189896b73e 100644 --- a/ydb/library/actors/interconnect/interconnect_stream.h +++ b/ydb/library/actors/interconnect/interconnect_stream.h @@ -60,7 +60,9 @@ namespace NInterconnect { virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const; ssize_t SendWithFlags(const void* msg, size_t len, int flags) const; +#if defined(__linux__) ssize_t RecvErrQueue(struct msghdr* msg) const; +#endif int Connect(const TAddress& addr) const; int Connect(const NAddr::IRemoteAddr* addr) const; diff --git a/ydb/library/actors/interconnect/interconnect_zc_processor.cpp b/ydb/library/actors/interconnect/interconnect_zc_processor.cpp index bf6059f2308..05719d39336 100644 --- a/ydb/library/actors/interconnect/interconnect_zc_processor.cpp +++ b/ydb/library/actors/interconnect/interconnect_zc_processor.cpp @@ -150,6 +150,7 @@ size_t AdjustLen(std::span<const TConstIoVec> wbuf, std::span<const TOutgoingStr } void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocket& socket) { +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED const TProcessErrQueueResult res = DoProcessErrQueue(socket); std::visit(TOverloaded{ @@ -177,6 +178,9 @@ void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocke AddErr("Hidden copy during ZC operation"); ZcState = ZC_DISABLED_HIDDEN_COPY; } +#else + Y_UNUSED(socket); +#endif } void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& socket) @@ -191,6 +195,8 @@ void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& s ResetState(); } } +#else + Y_UNUSED(socket); #endif } @@ -264,10 +270,6 @@ ssize_t TInterconnectZcProcessor::ProcessSend(std::span<TConstIoVec> wbuf, TStre return r; } -TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled) - : ZcState(enabled ? ZC_OK : ZC_DISABLED) -{} - TString TInterconnectZcProcessor::GetCurrentStateName() const { switch (ZcState) { case ZC_DISABLED: @@ -293,8 +295,23 @@ TString TInterconnectZcProcessor::ExtractErrText() { } } +void TInterconnectZcProcessor::AddErr(const TString& err) { + if (LastErr) { + LastErr.reserve(err.size() + 2); + LastErr += ", "; + LastErr += err; + } else { + LastErr = err; + } +} + /////////////////////////////////////////////////////////////////////////////// +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED +TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled) + : ZcState(enabled ? ZC_OK : ZC_DISABLED) +{} + // Guard part. // We must guarantee liveness of buffers used for zc // until enqueued zc operation completed by kernel @@ -399,20 +416,33 @@ private: std::list<TEventHolder> Delayed; }; -void TInterconnectZcProcessor::AddErr(const TString& err) { - if (LastErr) { - LastErr.reserve(err.size() + 2); - LastErr += ", "; - LastErr += err; - } else { - LastErr = err; - } +std::unique_ptr<IZcGuard> TInterconnectZcProcessor::GetGuard() +{ + return std::make_unique<TGuardRunner>(SendAsZc, Confirmed); } +#else +TInterconnectZcProcessor::TInterconnectZcProcessor(bool) + : ZcState(ZC_DISABLED) +{} + +class TDummyGuardRunner : public IZcGuard { +public: + TDummyGuardRunner(ui64 uncompleted, ui64 confirmed) + { + Y_UNUSED(uncompleted); + Y_UNUSED(confirmed); + } + + void ExtractToSafeTermination(std::list<TEventHolder>&) noexcept override {} + void Terminate(std::unique_ptr<NActors::TEventHolderPool>&&, TIntrusivePtr<NInterconnect::TStreamSocket>, const NActors::TActorContext&) override {} +}; std::unique_ptr<IZcGuard> TInterconnectZcProcessor::GetGuard() { - return std::make_unique<TGuardRunner>(SendAsZc, Confirmed); + return std::make_unique<TDummyGuardRunner>(SendAsZc, Confirmed); } +#endif + } diff --git a/ydb/library/actors/interconnect/ut_fat/main.cpp b/ydb/library/actors/interconnect/ut_fat/main.cpp index 36dcc9afd5a..a765a8724e2 100644 --- a/ydb/library/actors/interconnect/ut_fat/main.cpp +++ b/ydb/library/actors/interconnect/ut_fat/main.cpp @@ -212,6 +212,10 @@ Y_UNIT_TEST_SUITE(InterconnectZcLocalOp) { NanoSleep(5ULL * 1000 * 1000 * 1000); // Zero copy send via loopback causes hidden copy inside linux kernel +#if defined (__linux__) UNIT_ASSERT_VALUES_EQUAL("DisabledHiddenCopy", GetZcState(testCluster, 1, 2)); +#else + UNIT_ASSERT_VALUES_EQUAL("Disabled", GetZcState(testCluster, 1, 2)); +#endif } } |
