aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba
diff options
context:
space:
mode:
authorilnurkh <ilnurkh@yandex-team.com>2023-10-09 23:39:40 +0300
committerilnurkh <ilnurkh@yandex-team.com>2023-10-09 23:57:14 +0300
commite601ca03f859335d57ecff2e5aa6af234b6052ed (patch)
treede519a847e58a1b3993fcbfe05ff44cc946a3e24 /library/cpp/netliba
parentbbf2b6878af3854815a2c0ecb07a687071787639 (diff)
downloadydb-e601ca03f859335d57ecff2e5aa6af234b6052ed.tar.gz
Y_VERIFY->Y_ABORT_UNLESS at ^l
https://clubs.at.yandex-team.ru/arcadia/29404
Diffstat (limited to 'library/cpp/netliba')
-rw-r--r--library/cpp/netliba/socket/socket.cpp54
-rw-r--r--library/cpp/netliba/v6/ib_buffers.h6
-rw-r--r--library/cpp/netliba/v6/ib_collective.cpp30
-rw-r--r--library/cpp/netliba/v6/ib_cs.cpp18
-rw-r--r--library/cpp/netliba/v6/ib_low.h4
-rw-r--r--library/cpp/netliba/v6/udp_client_server.cpp6
-rw-r--r--library/cpp/netliba/v6/udp_http.cpp4
7 files changed, 61 insertions, 61 deletions
diff --git a/library/cpp/netliba/socket/socket.cpp b/library/cpp/netliba/socket/socket.cpp
index c10236229b..ca8a64d25c 100644
--- a/library/cpp/netliba/socket/socket.cpp
+++ b/library/cpp/netliba/socket/socket.cpp
@@ -188,24 +188,24 @@ namespace NNetlibaSocket {
}
{
int flag = 0;
- Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&flag, sizeof(flag)) == 0, "IPV6_V6ONLY failed");
+ Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&flag, sizeof(flag)) == 0, "IPV6_V6ONLY failed");
}
{
int flag = 1;
- Y_VERIFY(SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (const char*)&flag, sizeof(flag)) == 0, "SO_REUSEADDR failed");
+ Y_ABORT_UNLESS(SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (const char*)&flag, sizeof(flag)) == 0, "SO_REUSEADDR failed");
}
#if defined(_win_)
unsigned long dummy = 1;
ioctlsocket(S, FIONBIO, &dummy);
#else
- Y_VERIFY(fcntl(S, F_SETFL, O_NONBLOCK) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
- Y_VERIFY(fcntl(S, F_SETFD, FD_CLOEXEC) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
+ Y_ABORT_UNLESS(fcntl(S, F_SETFL, O_NONBLOCK) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
+ Y_ABORT_UNLESS(fcntl(S, F_SETFD, FD_CLOEXEC) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
{
int flag = 1;
#ifndef IPV6_RECVPKTINFO /* Darwin platforms require this */
- Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_PKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_PKTINFO failed");
+ Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IPV6, IPV6_PKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_PKTINFO failed");
#else
- Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_RECVPKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_RECVPKTINFO failed");
+ Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IPV6, IPV6_RECVPKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_RECVPKTINFO failed");
#endif
}
#endif
@@ -280,17 +280,17 @@ namespace NNetlibaSocket {
#if defined(_win_)
DWORD flag = 0;
socklen_t sz = sizeof(flag);
- Y_VERIFY(GetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (char*)&flag, &sz) == 0, "");
+ Y_ABORT_UNLESS(GetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (char*)&flag, &sz) == 0, "");
return flag;
#elif defined(_linux_)
int flag = 0;
socklen_t sz = sizeof(flag);
- Y_VERIFY(GetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (char*)&flag, &sz) == 0, "");
+ Y_ABORT_UNLESS(GetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (char*)&flag, &sz) == 0, "");
return flag == IPV6_PMTUDISC_DO;
#elif !defined(_darwin_)
int flag = 0;
socklen_t sz = sizeof(flag);
- Y_VERIFY(GetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (char*)&flag, &sz) == 0, "");
+ Y_ABORT_UNLESS(GetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (char*)&flag, &sz) == 0, "");
return flag;
#endif
return false;
@@ -346,10 +346,10 @@ namespace NNetlibaSocket {
int TAbstractSocket::SendMMsg(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) {
Y_ASSERT(IsValid());
- Y_VERIFY(SendMMsgFunc, "sendmmsg is not supported!");
+ Y_ABORT_UNLESS(SendMMsgFunc, "sendmmsg is not supported!");
TReadGuard rg(Mutex);
static bool checked = 0;
- Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
+ Y_ABORT_UNLESS(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
return SendMMsgFunc(S, msgvec, vlen, flags);
}
@@ -357,23 +357,23 @@ namespace NNetlibaSocket {
Y_ASSERT(IsValid());
#ifdef _win32_
static bool checked = 0;
- Y_VERIFY(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows");
+ Y_ABORT_UNLESS(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows");
if (hdr->Tos || frag == FF_DONT_FRAG) {
TWriteGuard wg(Mutex);
if (frag == FF_DONT_FRAG) {
ForbidFragmentation();
} else {
- Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
+ Y_ABORT_UNLESS(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
}
int originalTos;
if (hdr->Tos) {
socklen_t sz = sizeof(originalTos);
- Y_VERIFY(GetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, &sz) == 0, "");
- Y_VERIFY(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&hdr->Tos, sizeof(hdr->Tos)) == 0, "");
+ Y_ABORT_UNLESS(GetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, &sz) == 0, "");
+ Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&hdr->Tos, sizeof(hdr->Tos)) == 0, "");
}
const ssize_t rv = sendto(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, hdr->msg_namelen);
if (hdr->Tos) {
- Y_VERIFY(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, sizeof(originalTos)) == 0, "");
+ Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, sizeof(originalTos)) == 0, "");
}
if (frag == FF_DONT_FRAG) {
EnableFragmentation();
@@ -381,7 +381,7 @@ namespace NNetlibaSocket {
return rv;
}
TReadGuard rg(Mutex);
- Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
+ Y_ABORT_UNLESS(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
return sendto(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, hdr->msg_namelen);
#else
if (frag == FF_DONT_FRAG) {
@@ -395,7 +395,7 @@ namespace NNetlibaSocket {
TReadGuard rg(Mutex);
#ifndef _darwin_
static bool checked = 0;
- Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
+ Y_ABORT_UNLESS(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
#endif
return sendmsg(S, hdr, flags);
#endif
@@ -501,13 +501,13 @@ namespace NNetlibaSocket {
void TAbstractSocket::CloseImpl() {
if (IsValid()) {
Poller.Unwait(S);
- Y_VERIFY(closesocket(S) == 0, "closesocket failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
+ Y_ABORT_UNLESS(closesocket(S) == 0, "closesocket failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
}
S = INVALID_SOCKET;
}
void TAbstractSocket::WaitImpl(float timeoutSec) const {
- Y_VERIFY(IsValid(), "something went wrong");
+ Y_ABORT_UNLESS(IsValid(), "something went wrong");
Poller.WaitT(TDuration::Seconds(timeoutSec));
}
@@ -526,7 +526,7 @@ namespace NNetlibaSocket {
Y_ASSERT(IsValid());
#ifdef _win32_
- Y_VERIFY(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows");
+ Y_ABORT_UNLESS(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows");
return recvfrom(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, &hdr->msg_namelen);
#else
return recvmsg(S, hdr, flags);
@@ -559,7 +559,7 @@ namespace NNetlibaSocket {
// thread-safe
int TAbstractSocket::RecvMMsgImpl(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags, timespec* timeout) {
Y_ASSERT(IsValid());
- Y_VERIFY(RecvMMsgFunc, "recvmmsg is not supported!");
+ Y_ABORT_UNLESS(RecvMMsgFunc, "recvmmsg is not supported!");
return RecvMMsgFunc(S, msgvec, vlen, flags, timeout);
}
@@ -648,7 +648,7 @@ namespace NNetlibaSocket {
ssize_t RecvMsg(TMsgHdr* hdr, int flags) override {
Y_UNUSED(hdr);
Y_UNUSED(flags);
- Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TRecvMMsgSocket implementation must use memcpy which is suboptimal and thus forbidden!");
+ Y_ABORT_UNLESS(false, "Use TBasicSocket for RecvMsg call! TRecvMMsgSocket implementation must use memcpy which is suboptimal and thus forbidden!");
}
TUdpRecvPacket* Recv(sockaddr_in6* addr, sockaddr_in6* dstAddr, int netlibaVersion) override;
};
@@ -842,7 +842,7 @@ public:
AtomicSwap(&NumThreadsToDie, (int)RecvThreads.size());
CancelWaitImpl();
- Y_VERIFY(AllThreadsAreDead.WaitT(TDuration::Seconds(30)), "TMTRecvSocket destruction failed");
+ Y_ABORT_UNLESS(AllThreadsAreDead.WaitT(TDuration::Seconds(30)), "TMTRecvSocket destruction failed");
CloseImpl();
}
@@ -869,7 +869,7 @@ public:
}
bool IsRecvMsgSupported() const { return false; }
- ssize_t RecvMsg(TMsgHdr* hdr, int flags) { Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TMTRecvSocket implementation must use memcpy which is suboptimal and thus forbidden!"); }
+ ssize_t RecvMsg(TMsgHdr* hdr, int flags) { Y_ABORT_UNLESS(false, "Use TBasicSocket for RecvMsg call! TMTRecvSocket implementation must use memcpy which is suboptimal and thus forbidden!"); }
};
*/
@@ -939,7 +939,7 @@ public:
ssize_t RecvMsg(TMsgHdr* hdr, int flags) override {
Y_UNUSED(hdr);
Y_UNUSED(flags);
- Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TDualStackSocket implementation must use memcpy which is suboptimal and thus forbidden!");
+ Y_ABORT_UNLESS(false, "Use TBasicSocket for RecvMsg call! TDualStackSocket implementation must use memcpy which is suboptimal and thus forbidden!");
}
TUdpRecvPacket* Recv(sockaddr_in6* addr, sockaddr_in6* dstAddr, int netlibaVersion) override;
@@ -989,7 +989,7 @@ public:
AtomicSwap(&ShouldDie, 1);
CancelWaitImpl();
- Y_VERIFY(DieEvent.WaitT(TDuration::Seconds(30)), "TDualStackSocket::Close failed");
+ Y_ABORT_UNLESS(DieEvent.WaitT(TDuration::Seconds(30)), "TDualStackSocket::Close failed");
TBase::Close();
}
diff --git a/library/cpp/netliba/v6/ib_buffers.h b/library/cpp/netliba/v6/ib_buffers.h
index 96d83ff654..8847250137 100644
--- a/library/cpp/netliba/v6/ib_buffers.h
+++ b/library/cpp/netliba/v6/ib_buffers.h
@@ -42,7 +42,7 @@ namespace NNetliba {
void AddBlock() {
if (FirstFreeBlock == Blocks.size()) {
- Y_VERIFY(0, "run out of buffers");
+ Y_ABORT_UNLESS(0, "run out of buffers");
}
Blocks[FirstFreeBlock].Alloc(IBCtx);
size_t start = (FirstFreeBlock == 0) ? 1 : FirstFreeBlock * BLOCK_SIZE;
@@ -95,7 +95,7 @@ namespace NNetliba {
}
int PostSend(TPtrArg<TRCQueuePair> qp, const void* data, size_t len) {
if (len > SMALL_PKT_SIZE) {
- Y_VERIFY(0, "buffer overrun");
+ Y_ABORT_UNLESS(0, "buffer overrun");
}
if (len <= MAX_INLINE_DATA_SIZE) {
qp->PostSend(nullptr, 0, data, len);
@@ -112,7 +112,7 @@ namespace NNetliba {
void PostSend(TPtrArg<TUDQueuePair> qp, TPtrArg<TAddressHandle> ah, int remoteQPN, int remoteQKey,
const void* data, size_t len) {
if (len > SMALL_PKT_SIZE - 40) {
- Y_VERIFY(0, "buffer overrun");
+ Y_ABORT_UNLESS(0, "buffer overrun");
}
ui64 id = AllocBuf();
TSingleBlock& blk = Blocks[id >> BLOCK_SIZE_LN];
diff --git a/library/cpp/netliba/v6/ib_collective.cpp b/library/cpp/netliba/v6/ib_collective.cpp
index 87ea166366..af5dd6d284 100644
--- a/library/cpp/netliba/v6/ib_collective.cpp
+++ b/library/cpp/netliba/v6/ib_collective.cpp
@@ -73,7 +73,7 @@ namespace NNetliba {
Ops.resize(colSize);
}
void Transfer(int srcRank, int dstRank, int sl, int rangeBeg, int rangeFin, int id) {
- Y_VERIFY(id < 64, "recv mask overflow");
+ Y_ABORT_UNLESS(id < 64, "recv mask overflow");
Ops[srcRank].OutList.push_back(TMergeRecord::TTransfer(dstRank, sl, rangeBeg, rangeFin, id));
Ops[dstRank].InList.push_back(TMergeRecord::TInTransfer(srcRank, sl));
Ops[dstRank].RecvMask |= ui64(1) << id;
@@ -188,7 +188,7 @@ namespace NNetliba {
for (int k = 1; k < groupSize; ++k) {
int h1 = myGroup[k - 1];
int h2 = myGroup[k];
- Y_VERIFY(hostCoverage[h1].Fin == hostCoverage[h2].Beg, "Invalid host order in CreateGroupMerge()");
+ Y_ABORT_UNLESS(hostCoverage[h1].Fin == hostCoverage[h2].Beg, "Invalid host order in CreateGroupMerge()");
}
switch (mode) {
@@ -303,12 +303,12 @@ namespace NNetliba {
if (cur == prev + 1) {
isIncrement = false;
} else {
- Y_VERIFY(cur == 0, "ib_hosts, wrapped to non-zero");
- Y_VERIFY(prev == gcount[groupType] - 1, "ib_hosts, structure is irregular");
+ Y_ABORT_UNLESS(cur == 0, "ib_hosts, wrapped to non-zero");
+ Y_ABORT_UNLESS(prev == gcount[groupType] - 1, "ib_hosts, structure is irregular");
isIncrement = true;
}
} else {
- Y_VERIFY(prev == cur, "ib_hosts, structure is irregular");
+ Y_ABORT_UNLESS(prev == cur, "ib_hosts, structure is irregular");
}
}
}
@@ -333,7 +333,7 @@ namespace NNetliba {
if (newIter == 0) {
newIter = nn;
} else {
- Y_VERIFY(newIter == nn, "groups should be of same size");
+ Y_ABORT_UNLESS(newIter == nn, "groups should be of same size");
}
}
baseIter = newIter;
@@ -429,13 +429,13 @@ namespace NNetliba {
while ((recvMask & iter.RecvMask) != iter.RecvMask) {
int rv = CQ->Poll(&wc, 1);
if (rv > 0) {
- Y_VERIFY(wc.status == IBV_WC_SUCCESS, "AllGather::Sync fail, status %d", (int)wc.status);
+ Y_ABORT_UNLESS(wc.status == IBV_WC_SUCCESS, "AllGather::Sync fail, status %d", (int)wc.status);
if (wc.opcode == IBV_WC_RECV_RDMA_WITH_IMM) {
//printf("Got %d\n", wc.imm_data);
++recvDebt;
ui64 newBit = ui64(1) << wc.imm_data;
if (recvMask & newBit) {
- Y_VERIFY((FutureRecvMask & newBit) == 0, "data from 2 Sync() ahead is impossible");
+ Y_ABORT_UNLESS((FutureRecvMask & newBit) == 0, "data from 2 Sync() ahead is impossible");
FutureRecvMask |= newBit;
} else {
recvMask |= newBit;
@@ -604,7 +604,7 @@ namespace NNetliba {
}
bool Resize(const TVector<size_t>& szPerRank) override {
- Y_VERIFY(szPerRank.ysize() == ColSize, "Invalid size array");
+ Y_ABORT_UNLESS(szPerRank.ysize() == ColSize, "Invalid size array");
TVector<size_t> offsets;
offsets.push_back(0);
@@ -732,7 +732,7 @@ namespace NNetliba {
ibv_wc wc;
int rv = CQ->Poll(&wc, 1);
if (rv > 0) {
- Y_VERIFY(wc.status == IBV_WC_SUCCESS, "WaitForMsg() fail, status %d", (int)wc.status);
+ Y_ABORT_UNLESS(wc.status == IBV_WC_SUCCESS, "WaitForMsg() fail, status %d", (int)wc.status);
if (wc.opcode & IBV_WC_RECV) {
BP->RequestPostRecv();
if (tbl->NeedQPN(wc.qp_num)) {
@@ -753,7 +753,7 @@ namespace NNetliba {
}
bool ProcessSendCompletion(const ibv_wc& wc) {
- Y_VERIFY(wc.status == IBV_WC_SUCCESS, "WaitForMsg() fail, status %d", (int)wc.status);
+ Y_ABORT_UNLESS(wc.status == IBV_WC_SUCCESS, "WaitForMsg() fail, status %d", (int)wc.status);
if (wc.opcode & IBV_WC_RECV) {
BP->RequestPostRecv();
Pending.push_back(TPendingMessage(wc.qp_num, wc.wr_id));
@@ -806,7 +806,7 @@ namespace NNetliba {
for (;;) {
int rv = CQ->Poll(&wc, 1);
if (rv > 0) {
- Y_VERIFY(wc.status == IBV_WC_SUCCESS, "WaitForMsg() fail, status %d", (int)wc.status);
+ Y_ABORT_UNLESS(wc.status == IBV_WC_SUCCESS, "WaitForMsg() fail, status %d", (int)wc.status);
if (wc.opcode & IBV_WC_RECV) {
BP->RequestPostRecv();
if ((int)wc.qp_num == qpn) {
@@ -957,7 +957,7 @@ namespace NNetliba {
IAllGather* CreateAllGather(const TVector<size_t>& szPerRank) override {
const TMergePlan& plan = MergePlan;
- Y_VERIFY(szPerRank.ysize() == ColSize, "Invalid size array");
+ Y_ABORT_UNLESS(szPerRank.ysize() == ColSize, "Invalid size array");
size_t totalSize = 0;
for (int i = 0; i < szPerRank.ysize(); ++i) {
@@ -1004,7 +1004,7 @@ namespace NNetliba {
iter.RecvMask = rr.RecvMask;
}
bool rv = res->Resize(szPerRank);
- Y_VERIFY(rv, "oops");
+ Y_ABORT_UNLESS(rv, "oops");
return res;
}
@@ -1087,7 +1087,7 @@ namespace NNetliba {
res->ReadyOffsetMult = currentDataOffset;
bool rv = res->Resize(dataSize);
- Y_VERIFY(rv, "oops");
+ Y_ABORT_UNLESS(rv, "oops");
return res;
}
diff --git a/library/cpp/netliba/v6/ib_cs.cpp b/library/cpp/netliba/v6/ib_cs.cpp
index 1b904e3358..93fc4ef67e 100644
--- a/library/cpp/netliba/v6/ib_cs.cpp
+++ b/library/cpp/netliba/v6/ib_cs.cpp
@@ -200,7 +200,7 @@ namespace NNetliba {
return z;
}
}
- Y_VERIFY(0, "no send by guid");
+ Y_ABORT_UNLESS(0, "no send by guid");
return SendQueue.begin();
}
TDeque<TQueuedSend>::iterator GetSend(TIBMsgHandle msgHandle) {
@@ -209,7 +209,7 @@ namespace NNetliba {
return z;
}
}
- Y_VERIFY(0, "no send by handle");
+ Y_ABORT_UNLESS(0, "no send by handle");
return SendQueue.begin();
}
TDeque<TQueuedRecv>::iterator GetRecv(const TGUID& packetGuid) {
@@ -218,7 +218,7 @@ namespace NNetliba {
return z;
}
}
- Y_VERIFY(0, "no recv by guid");
+ Y_ABORT_UNLESS(0, "no recv by guid");
return RecvQueue.begin();
}
void PostRDMA(TQueuedSend& qs) {
@@ -354,7 +354,7 @@ namespace NNetliba {
//printf("Remove peer %p from hash (QPN %d)\n", peer.Get(), peer->QP->GetQPN());
TPeerChannelHash::iterator z = Channels.find(peer->QP->GetQPN());
if (z == Channels.end()) {
- Y_VERIFY(0, "peer failed for unregistered peer");
+ Y_ABORT_UNLESS(0, "peer failed for unregistered peer");
}
Channels.erase(z);
}
@@ -447,7 +447,7 @@ namespace NNetliba {
TDeque<TQueuedRecv>::iterator z = peer->GetRecv(cmd.PacketGuid);
TQueuedRecv& qr = *z;
#ifdef _DEBUG
- Y_VERIFY(MurmurHash<ui64>(qr.Data->GetData(), qr.Data->GetSize()) == cmd.DataHash || cmd.DataHash == 0, "RDMA data hash mismatch");
+ Y_ABORT_UNLESS(MurmurHash<ui64>(qr.Data->GetData(), qr.Data->GetSize()) == cmd.DataHash || cmd.DataHash == 0, "RDMA data hash mismatch");
#endif
TRequest* req = new TRequest;
req->Address = peer->PeerAddress;
@@ -525,12 +525,12 @@ namespace NNetliba {
BP.FreeBuf(wc->wr_id);
}
} else {
- Y_VERIFY(0, "got completion without outstanding messages");
+ Y_ABORT_UNLESS(0, "got completion without outstanding messages");
}
} else {
//printf("Got completion for non existing qpn %d, bufId %d (status %d)\n", wc->qp_num, (int)wc->wr_id, (int)wc->status);
if (wc->status == IBV_WC_SUCCESS) {
- Y_VERIFY(0, "only errors should go unmatched");
+ Y_ABORT_UNLESS(0, "only errors should go unmatched");
}
// no need to free buf since it has to be freed in PeerFailed()
}
@@ -617,7 +617,7 @@ namespace NNetliba {
// received msg
if ((int)wc.qp_num == WelcomeQPN) {
if (wc.status != IBV_WC_SUCCESS) {
- Y_VERIFY(0, "ud recv op completed with error %d\n", (int)wc.status);
+ Y_ABORT_UNLESS(0, "ud recv op completed with error %d\n", (int)wc.status);
}
Y_ASSERT(wc.opcode == IBV_WC_RECV | IBV_WC_SEND);
ParseWelcomePacket(&wc);
@@ -645,7 +645,7 @@ namespace NNetliba {
}
TDeque<TQueuedSend>::iterator z = peer->GetSend(msgHandle);
if (z == peer->SendQueue.end()) {
- Y_VERIFY(0, "peer %p, copy completed, msg %d not found?\n", peer.Get(), (int)msgHandle);
+ Y_ABORT_UNLESS(0, "peer %p, copy completed, msg %d not found?\n", peer.Get(), (int)msgHandle);
continue;
}
TQueuedSend& qs = *z;
diff --git a/library/cpp/netliba/v6/ib_low.h b/library/cpp/netliba/v6/ib_low.h
index e3afd9df2e..42e11ed941 100644
--- a/library/cpp/netliba/v6/ib_low.h
+++ b/library/cpp/netliba/v6/ib_low.h
@@ -13,7 +13,7 @@ namespace NNetliba {
int rv = (x); \
if (rv != 0) { \
fprintf(stderr, "check_z failed, errno = %d\n", errno); \
- Y_VERIFY(0, "check_z"); \
+ Y_ABORT_UNLESS(0, "check_z"); \
} \
}
@@ -167,7 +167,7 @@ namespace NNetliba {
//};
int rv = ibv_poll_cq(CQ, bufSize, res);
if (rv < 0) {
- Y_VERIFY(0, "ibv_poll_cq failed");
+ Y_ABORT_UNLESS(0, "ibv_poll_cq failed");
}
if (rv > 0) {
//printf("Completed wr\n");
diff --git a/library/cpp/netliba/v6/udp_client_server.cpp b/library/cpp/netliba/v6/udp_client_server.cpp
index 3eaf6e5e96..e7cde45242 100644
--- a/library/cpp/netliba/v6/udp_client_server.cpp
+++ b/library/cpp/netliba/v6/udp_client_server.cpp
@@ -438,14 +438,14 @@ namespace NNetliba {
if (IB->GetSendResult(&sr)) {
TIBtoTransferKeyHash::iterator z = IBKeyToTransferKey.find(sr.Handle);
if (z == IBKeyToTransferKey.end()) {
- Y_VERIFY(0, "unknown handle returned from IB");
+ Y_ABORT_UNLESS(0, "unknown handle returned from IB");
}
TTransferKey transferKey = z->second;
IBKeyToTransferKey.erase(z);
TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
if (i == SendQueue.end()) {
- Y_VERIFY(0, "IBKeyToTransferKey refers nonexisting xfer");
+ Y_ABORT_UNLESS(0, "IBKeyToTransferKey refers nonexisting xfer");
}
if (sr.Success) {
TUdpOutTransfer& xfer = i->second;
@@ -1034,7 +1034,7 @@ namespace NNetliba {
case ACK_RESEND_NOSHMEM: {
// abort execution here
// failed to open shmem on recv side, need to transmit data without using shmem
- Y_VERIFY(0, "not implemented yet");
+ Y_ABORT_UNLESS(0, "not implemented yet");
break;
}
case PING: {
diff --git a/library/cpp/netliba/v6/udp_http.cpp b/library/cpp/netliba/v6/udp_http.cpp
index 9fa0b07818..f318554a76 100644
--- a/library/cpp/netliba/v6/udp_http.cpp
+++ b/library/cpp/netliba/v6/udp_http.cpp
@@ -829,7 +829,7 @@ namespace NNetliba {
void SendRequestImpl(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId,
TWaitResponse* wr, TRequesterUserQueues* userQueues) {
if (data && data->size() > MAX_PACKET_SIZE) {
- Y_VERIFY(0, "data size is too large");
+ Y_ABORT_UNLESS(0, "data size is too large");
}
//printf("SendRequest(%s)\n", url.c_str());
if (wr)
@@ -873,7 +873,7 @@ namespace NNetliba {
void SendResponseImpl(const TGUID& reqId, EPacketPriority prior, TVector<char>* data) // non-virtual, for direct call from TRequestOps
{
if (data && data->size() > MAX_PACKET_SIZE) {
- Y_VERIFY(0, "data size is too large");
+ Y_ABORT_UNLESS(0, "data size is too large");
}
SendRespList.Enqueue(new TSendResponse(reqId, prior, data));
Host->CancelWait();