diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-03-21 11:48:29 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-03-21 12:09:16 +0300 |
commit | d920411538b9141e0cee8212be3c9ae0e77d1fab (patch) | |
tree | ddeca883519e2a6476649d73ebaf80bde1a83ce8 /library/cpp/neh | |
parent | 9ba31c6b407453a9514a2f36ce38cc980041e646 (diff) | |
download | ydb-d920411538b9141e0cee8212be3c9ae0e77d1fab.tar.gz |
Intermediate changes
commit_hash:acc6d7b7aee41b420f987775aa566ee89eacb8eb
Diffstat (limited to 'library/cpp/neh')
-rw-r--r-- | library/cpp/neh/tcp2.cpp | 50 |
1 files changed, 11 insertions, 39 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp index 3dad055af1..641408973d 100644 --- a/library/cpp/neh/tcp2.cpp +++ b/library/cpp/neh/tcp2.cpp @@ -332,35 +332,6 @@ namespace { char MemPool_[MemPoolSize_ + MemPoolReserve_]; }; - //protector for limit usage tcp connection output (and used data) only from one thread at same time - class TOutputLock { - public: - TOutputLock() noexcept - : Lock_(0) - { - } - - bool TryAquire() noexcept { - do { - if (AtomicTryLock(&Lock_)) { - return true; - } - } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable - return false; - } - - void Release() noexcept { - AtomicUnlock(&Lock_); - } - - bool IsFree() const noexcept { - return !AtomicGet(Lock_); - } - - private: - TAtomic Lock_; - }; - class TClient { class TRequest; class TConnection; @@ -718,13 +689,13 @@ namespace { } void ProcessOutputReqsQueue() { - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(false); } } void ProcessOutputCancelsQueue() { - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true)); return; } @@ -763,7 +734,7 @@ namespace { PrepareSocket(AS_.Native()); AtomicSet(State_, Connected); AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2)); - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(true); return; } @@ -857,7 +828,7 @@ namespace { DBGOUT("TClient::SendMessages(exit2)"); return; } - } while (OutputLock_.TryAquire()); + } while (OutputLock_.TryAcquire()); DBGOUT("TClient::SendMessages(exit1)"); } @@ -1058,7 +1029,8 @@ namespace { TTcp2Message Msg_; //output - TOutputLock OutputLock_; + + TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads TAtomic NeedCheckReqsQueue_; TLockFreeQueue<TRequest*> Reqs_; TAtomic NeedCheckCancelsQueue_; @@ -1412,11 +1384,11 @@ namespace { void ProcessOutputQueue() { AtomicSet(NeedCheckOutputQueue_, 1); - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(false); return; } - DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree()); + DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)!OutputLock_.IsLocked()); } //must be called only after success aquiring output @@ -1444,10 +1416,10 @@ namespace { OutputLock_.Release(); if (!AtomicGet(NeedCheckOutputQueue_)) { - DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree()); + DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked()); return; } - } while (OutputLock_.TryAquire()); + } while (OutputLock_.TryAcquire()); DBGOUT("Server::SendMessages(exit1)"); } catch (...) { OnError(); @@ -1518,7 +1490,7 @@ namespace { TLockFreeQueue<TRequestId> FinReqs_; //output - TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads + TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads TAtomic NeedCheckOutputQueue_; NNeh::TAutoLockFreeQueue<TOutputData> OutputData_; TOutputBuffers OutputBuffers_; |