aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-22 00:51:33 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-22 00:51:33 +0000
commit25754ddf6bf0d9f39deb2b793a946176b6d7c9fb (patch)
treeb81cf85ea8fbef3290618d909971b9ad02dd9e46 /library/cpp
parentc18aa245b684fef9b14c697b38e6c6695e0733f3 (diff)
parent87f8036d8027790ed03ac34feb2b5f3e141f948c (diff)
downloadydb-25754ddf6bf0d9f39deb2b793a946176b6d7c9fb.tar.gz
Merge branch 'rightlib' into merge-libs-250322-0050
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/neh/tcp2.cpp50
-rw-r--r--library/cpp/netliba/v6/ib_low.cpp2
2 files changed, 12 insertions, 40 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp
index 3dad055af1..641408973d 100644
--- a/library/cpp/neh/tcp2.cpp
+++ b/library/cpp/neh/tcp2.cpp
@@ -332,35 +332,6 @@ namespace {
char MemPool_[MemPoolSize_ + MemPoolReserve_];
};
- //protector for limit usage tcp connection output (and used data) only from one thread at same time
- class TOutputLock {
- public:
- TOutputLock() noexcept
- : Lock_(0)
- {
- }
-
- bool TryAquire() noexcept {
- do {
- if (AtomicTryLock(&Lock_)) {
- return true;
- }
- } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable
- return false;
- }
-
- void Release() noexcept {
- AtomicUnlock(&Lock_);
- }
-
- bool IsFree() const noexcept {
- return !AtomicGet(Lock_);
- }
-
- private:
- TAtomic Lock_;
- };
-
class TClient {
class TRequest;
class TConnection;
@@ -718,13 +689,13 @@ namespace {
}
void ProcessOutputReqsQueue() {
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(false);
}
}
void ProcessOutputCancelsQueue() {
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
return;
}
@@ -763,7 +734,7 @@ namespace {
PrepareSocket(AS_.Native());
AtomicSet(State_, Connected);
AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2));
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(true);
return;
}
@@ -857,7 +828,7 @@ namespace {
DBGOUT("TClient::SendMessages(exit2)");
return;
}
- } while (OutputLock_.TryAquire());
+ } while (OutputLock_.TryAcquire());
DBGOUT("TClient::SendMessages(exit1)");
}
@@ -1058,7 +1029,8 @@ namespace {
TTcp2Message Msg_;
//output
- TOutputLock OutputLock_;
+
+ TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
TAtomic NeedCheckReqsQueue_;
TLockFreeQueue<TRequest*> Reqs_;
TAtomic NeedCheckCancelsQueue_;
@@ -1412,11 +1384,11 @@ namespace {
void ProcessOutputQueue() {
AtomicSet(NeedCheckOutputQueue_, 1);
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(false);
return;
}
- DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree());
+ DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)!OutputLock_.IsLocked());
}
//must be called only after success aquiring output
@@ -1444,10 +1416,10 @@ namespace {
OutputLock_.Release();
if (!AtomicGet(NeedCheckOutputQueue_)) {
- DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree());
+ DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked());
return;
}
- } while (OutputLock_.TryAquire());
+ } while (OutputLock_.TryAcquire());
DBGOUT("Server::SendMessages(exit1)");
} catch (...) {
OnError();
@@ -1518,7 +1490,7 @@ namespace {
TLockFreeQueue<TRequestId> FinReqs_;
//output
- TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
+ TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
TAtomic NeedCheckOutputQueue_;
NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;
TOutputBuffers OutputBuffers_;
diff --git a/library/cpp/netliba/v6/ib_low.cpp b/library/cpp/netliba/v6/ib_low.cpp
index 99d77d593f..fca97353d2 100644
--- a/library/cpp/netliba/v6/ib_low.cpp
+++ b/library/cpp/netliba/v6/ib_low.cpp
@@ -33,7 +33,7 @@ namespace NNetliba {
TIntrusivePtr<TIBContext> ctx;
TIntrusivePtr<TIBPort> resPort;
- int numDevices;
+ int numDevices{0};
ibv_device** deviceList = ibv_get_device_list(&numDevices);
//for (int i = 0; i < numDevices; ++i) {
// ibv_device *dev = deviceList[i];