aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-03-21 11:48:29 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-03-21 12:09:16 +0300
commitd920411538b9141e0cee8212be3c9ae0e77d1fab (patch)
treeddeca883519e2a6476649d73ebaf80bde1a83ce8 /library/cpp/neh
parent9ba31c6b407453a9514a2f36ce38cc980041e646 (diff)
downloadydb-d920411538b9141e0cee8212be3c9ae0e77d1fab.tar.gz
Intermediate changes
commit_hash:acc6d7b7aee41b420f987775aa566ee89eacb8eb
Diffstat (limited to 'library/cpp/neh')
-rw-r--r--library/cpp/neh/tcp2.cpp50
1 files changed, 11 insertions, 39 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp
index 3dad055af1..641408973d 100644
--- a/library/cpp/neh/tcp2.cpp
+++ b/library/cpp/neh/tcp2.cpp
@@ -332,35 +332,6 @@ namespace {
char MemPool_[MemPoolSize_ + MemPoolReserve_];
};
- //protector for limit usage tcp connection output (and used data) only from one thread at same time
- class TOutputLock {
- public:
- TOutputLock() noexcept
- : Lock_(0)
- {
- }
-
- bool TryAquire() noexcept {
- do {
- if (AtomicTryLock(&Lock_)) {
- return true;
- }
- } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable
- return false;
- }
-
- void Release() noexcept {
- AtomicUnlock(&Lock_);
- }
-
- bool IsFree() const noexcept {
- return !AtomicGet(Lock_);
- }
-
- private:
- TAtomic Lock_;
- };
-
class TClient {
class TRequest;
class TConnection;
@@ -718,13 +689,13 @@ namespace {
}
void ProcessOutputReqsQueue() {
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(false);
}
}
void ProcessOutputCancelsQueue() {
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
return;
}
@@ -763,7 +734,7 @@ namespace {
PrepareSocket(AS_.Native());
AtomicSet(State_, Connected);
AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2));
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(true);
return;
}
@@ -857,7 +828,7 @@ namespace {
DBGOUT("TClient::SendMessages(exit2)");
return;
}
- } while (OutputLock_.TryAquire());
+ } while (OutputLock_.TryAcquire());
DBGOUT("TClient::SendMessages(exit1)");
}
@@ -1058,7 +1029,8 @@ namespace {
TTcp2Message Msg_;
//output
- TOutputLock OutputLock_;
+
+ TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
TAtomic NeedCheckReqsQueue_;
TLockFreeQueue<TRequest*> Reqs_;
TAtomic NeedCheckCancelsQueue_;
@@ -1412,11 +1384,11 @@ namespace {
void ProcessOutputQueue() {
AtomicSet(NeedCheckOutputQueue_, 1);
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(false);
return;
}
- DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree());
+ DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)!OutputLock_.IsLocked());
}
//must be called only after success aquiring output
@@ -1444,10 +1416,10 @@ namespace {
OutputLock_.Release();
if (!AtomicGet(NeedCheckOutputQueue_)) {
- DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree());
+ DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked());
return;
}
- } while (OutputLock_.TryAquire());
+ } while (OutputLock_.TryAcquire());
DBGOUT("Server::SendMessages(exit1)");
} catch (...) {
OnError();
@@ -1518,7 +1490,7 @@ namespace {
TLockFreeQueue<TRequestId> FinReqs_;
//output
- TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
+ TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
TAtomic NeedCheckOutputQueue_;
NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;
TOutputBuffers OutputBuffers_;