aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/tcp2.cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-22 00:51:33 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-22 00:51:33 +0000
commit25754ddf6bf0d9f39deb2b793a946176b6d7c9fb (patch)
treeb81cf85ea8fbef3290618d909971b9ad02dd9e46 /library/cpp/neh/tcp2.cpp
parentc18aa245b684fef9b14c697b38e6c6695e0733f3 (diff)
parent87f8036d8027790ed03ac34feb2b5f3e141f948c (diff)
downloadydb-25754ddf6bf0d9f39deb2b793a946176b6d7c9fb.tar.gz
Merge branch 'rightlib' into merge-libs-250322-0050
Diffstat (limited to 'library/cpp/neh/tcp2.cpp')
-rw-r--r--library/cpp/neh/tcp2.cpp50
1 files changed, 11 insertions, 39 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp
index 3dad055af1..641408973d 100644
--- a/library/cpp/neh/tcp2.cpp
+++ b/library/cpp/neh/tcp2.cpp
@@ -332,35 +332,6 @@ namespace {
char MemPool_[MemPoolSize_ + MemPoolReserve_];
};
- //protector for limit usage tcp connection output (and used data) only from one thread at same time
- class TOutputLock {
- public:
- TOutputLock() noexcept
- : Lock_(0)
- {
- }
-
- bool TryAquire() noexcept {
- do {
- if (AtomicTryLock(&Lock_)) {
- return true;
- }
- } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable
- return false;
- }
-
- void Release() noexcept {
- AtomicUnlock(&Lock_);
- }
-
- bool IsFree() const noexcept {
- return !AtomicGet(Lock_);
- }
-
- private:
- TAtomic Lock_;
- };
-
class TClient {
class TRequest;
class TConnection;
@@ -718,13 +689,13 @@ namespace {
}
void ProcessOutputReqsQueue() {
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(false);
}
}
void ProcessOutputCancelsQueue() {
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
return;
}
@@ -763,7 +734,7 @@ namespace {
PrepareSocket(AS_.Native());
AtomicSet(State_, Connected);
AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2));
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(true);
return;
}
@@ -857,7 +828,7 @@ namespace {
DBGOUT("TClient::SendMessages(exit2)");
return;
}
- } while (OutputLock_.TryAquire());
+ } while (OutputLock_.TryAcquire());
DBGOUT("TClient::SendMessages(exit1)");
}
@@ -1058,7 +1029,8 @@ namespace {
TTcp2Message Msg_;
//output
- TOutputLock OutputLock_;
+
+ TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
TAtomic NeedCheckReqsQueue_;
TLockFreeQueue<TRequest*> Reqs_;
TAtomic NeedCheckCancelsQueue_;
@@ -1412,11 +1384,11 @@ namespace {
void ProcessOutputQueue() {
AtomicSet(NeedCheckOutputQueue_, 1);
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(false);
return;
}
- DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree());
+ DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)!OutputLock_.IsLocked());
}
//must be called only after success aquiring output
@@ -1444,10 +1416,10 @@ namespace {
OutputLock_.Release();
if (!AtomicGet(NeedCheckOutputQueue_)) {
- DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree());
+ DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked());
return;
}
- } while (OutputLock_.TryAquire());
+ } while (OutputLock_.TryAcquire());
DBGOUT("Server::SendMessages(exit1)");
} catch (...) {
OnError();
@@ -1518,7 +1490,7 @@ namespace {
TLockFreeQueue<TRequestId> FinReqs_;
//output
- TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
+ TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
TAtomic NeedCheckOutputQueue_;
NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;
TOutputBuffers OutputBuffers_;