summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/library/actors/interconnect/interconnect_stream.cpp2
-rw-r--r--ydb/library/actors/interconnect/interconnect_stream.h2
-rw-r--r--ydb/library/actors/interconnect/interconnect_zc_processor.cpp56
-rw-r--r--ydb/library/actors/interconnect/ut_fat/main.cpp4
4 files changed, 51 insertions, 13 deletions
diff --git a/ydb/library/actors/interconnect/interconnect_stream.cpp b/ydb/library/actors/interconnect/interconnect_stream.cpp
index 6cc128613f9..9a5575bd77f 100644
--- a/ydb/library/actors/interconnect/interconnect_stream.cpp
+++ b/ydb/library/actors/interconnect/interconnect_stream.cpp
@@ -125,6 +125,7 @@ namespace NInterconnect {
return ret;
}
+#if defined(__linux__)
ssize_t
TStreamSocket::RecvErrQueue(struct msghdr* msg) const {
const auto ret = ::recvmsg(Descriptor, msg, MSG_ERRQUEUE);
@@ -133,6 +134,7 @@ namespace NInterconnect {
return ret;
}
+#endif
ssize_t
TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const {
diff --git a/ydb/library/actors/interconnect/interconnect_stream.h b/ydb/library/actors/interconnect/interconnect_stream.h
index f5bbb1c184b..0189896b73e 100644
--- a/ydb/library/actors/interconnect/interconnect_stream.h
+++ b/ydb/library/actors/interconnect/interconnect_stream.h
@@ -60,7 +60,9 @@ namespace NInterconnect {
virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const;
ssize_t SendWithFlags(const void* msg, size_t len, int flags) const;
+#if defined(__linux__)
ssize_t RecvErrQueue(struct msghdr* msg) const;
+#endif
int Connect(const TAddress& addr) const;
int Connect(const NAddr::IRemoteAddr* addr) const;
diff --git a/ydb/library/actors/interconnect/interconnect_zc_processor.cpp b/ydb/library/actors/interconnect/interconnect_zc_processor.cpp
index bf6059f2308..05719d39336 100644
--- a/ydb/library/actors/interconnect/interconnect_zc_processor.cpp
+++ b/ydb/library/actors/interconnect/interconnect_zc_processor.cpp
@@ -150,6 +150,7 @@ size_t AdjustLen(std::span<const TConstIoVec> wbuf, std::span<const TOutgoingStr
}
void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocket& socket) {
+#ifdef YDB_MSG_ZEROCOPY_SUPPORTED
const TProcessErrQueueResult res = DoProcessErrQueue(socket);
std::visit(TOverloaded{
@@ -177,6 +178,9 @@ void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocke
AddErr("Hidden copy during ZC operation");
ZcState = ZC_DISABLED_HIDDEN_COPY;
}
+#else
+ Y_UNUSED(socket);
+#endif
}
void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& socket)
@@ -191,6 +195,8 @@ void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& s
ResetState();
}
}
+#else
+ Y_UNUSED(socket);
#endif
}
@@ -264,10 +270,6 @@ ssize_t TInterconnectZcProcessor::ProcessSend(std::span<TConstIoVec> wbuf, TStre
return r;
}
-TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled)
- : ZcState(enabled ? ZC_OK : ZC_DISABLED)
-{}
-
TString TInterconnectZcProcessor::GetCurrentStateName() const {
switch (ZcState) {
case ZC_DISABLED:
@@ -293,8 +295,23 @@ TString TInterconnectZcProcessor::ExtractErrText() {
}
}
+void TInterconnectZcProcessor::AddErr(const TString& err) {
+ if (LastErr) {
+ LastErr.reserve(err.size() + 2);
+ LastErr += ", ";
+ LastErr += err;
+ } else {
+ LastErr = err;
+ }
+}
+
///////////////////////////////////////////////////////////////////////////////
+#ifdef YDB_MSG_ZEROCOPY_SUPPORTED
+TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled)
+ : ZcState(enabled ? ZC_OK : ZC_DISABLED)
+{}
+
// Guard part.
// We must guarantee liveness of buffers used for zc
// until enqueued zc operation completed by kernel
@@ -399,20 +416,33 @@ private:
std::list<TEventHolder> Delayed;
};
-void TInterconnectZcProcessor::AddErr(const TString& err) {
- if (LastErr) {
- LastErr.reserve(err.size() + 2);
- LastErr += ", ";
- LastErr += err;
- } else {
- LastErr = err;
- }
+std::unique_ptr<IZcGuard> TInterconnectZcProcessor::GetGuard()
+{
+ return std::make_unique<TGuardRunner>(SendAsZc, Confirmed);
}
+#else
+TInterconnectZcProcessor::TInterconnectZcProcessor(bool)
+ : ZcState(ZC_DISABLED)
+{}
+
+class TDummyGuardRunner : public IZcGuard {
+public:
+ TDummyGuardRunner(ui64 uncompleted, ui64 confirmed)
+ {
+ Y_UNUSED(uncompleted);
+ Y_UNUSED(confirmed);
+ }
+
+ void ExtractToSafeTermination(std::list<TEventHolder>&) noexcept override {}
+ void Terminate(std::unique_ptr<NActors::TEventHolderPool>&&, TIntrusivePtr<NInterconnect::TStreamSocket>, const NActors::TActorContext&) override {}
+};
std::unique_ptr<IZcGuard> TInterconnectZcProcessor::GetGuard()
{
- return std::make_unique<TGuardRunner>(SendAsZc, Confirmed);
+ return std::make_unique<TDummyGuardRunner>(SendAsZc, Confirmed);
}
+#endif
+
}
diff --git a/ydb/library/actors/interconnect/ut_fat/main.cpp b/ydb/library/actors/interconnect/ut_fat/main.cpp
index 36dcc9afd5a..a765a8724e2 100644
--- a/ydb/library/actors/interconnect/ut_fat/main.cpp
+++ b/ydb/library/actors/interconnect/ut_fat/main.cpp
@@ -212,6 +212,10 @@ Y_UNIT_TEST_SUITE(InterconnectZcLocalOp) {
NanoSleep(5ULL * 1000 * 1000 * 1000);
// Zero copy send via loopback causes hidden copy inside linux kernel
+#if defined (__linux__)
UNIT_ASSERT_VALUES_EQUAL("DisabledHiddenCopy", GetZcState(testCluster, 1, 2));
+#else
+ UNIT_ASSERT_VALUES_EQUAL("Disabled", GetZcState(testCluster, 1, 2));
+#endif
}
}