diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-03-22 00:51:33 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-03-22 00:51:33 +0000 |
commit | 25754ddf6bf0d9f39deb2b793a946176b6d7c9fb (patch) | |
tree | b81cf85ea8fbef3290618d909971b9ad02dd9e46 /library/cpp/neh/tcp2.cpp | |
parent | c18aa245b684fef9b14c697b38e6c6695e0733f3 (diff) | |
parent | 87f8036d8027790ed03ac34feb2b5f3e141f948c (diff) | |
download | ydb-25754ddf6bf0d9f39deb2b793a946176b6d7c9fb.tar.gz |
Merge branch 'rightlib' into merge-libs-250322-0050
Diffstat (limited to 'library/cpp/neh/tcp2.cpp')
-rw-r--r-- | library/cpp/neh/tcp2.cpp | 50 |
1 files changed, 11 insertions, 39 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp index 3dad055af1..641408973d 100644 --- a/library/cpp/neh/tcp2.cpp +++ b/library/cpp/neh/tcp2.cpp @@ -332,35 +332,6 @@ namespace { char MemPool_[MemPoolSize_ + MemPoolReserve_]; }; - //protector for limit usage tcp connection output (and used data) only from one thread at same time - class TOutputLock { - public: - TOutputLock() noexcept - : Lock_(0) - { - } - - bool TryAquire() noexcept { - do { - if (AtomicTryLock(&Lock_)) { - return true; - } - } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable - return false; - } - - void Release() noexcept { - AtomicUnlock(&Lock_); - } - - bool IsFree() const noexcept { - return !AtomicGet(Lock_); - } - - private: - TAtomic Lock_; - }; - class TClient { class TRequest; class TConnection; @@ -718,13 +689,13 @@ namespace { } void ProcessOutputReqsQueue() { - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(false); } } void ProcessOutputCancelsQueue() { - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true)); return; } @@ -763,7 +734,7 @@ namespace { PrepareSocket(AS_.Native()); AtomicSet(State_, Connected); AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2)); - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(true); return; } @@ -857,7 +828,7 @@ namespace { DBGOUT("TClient::SendMessages(exit2)"); return; } - } while (OutputLock_.TryAquire()); + } while (OutputLock_.TryAcquire()); DBGOUT("TClient::SendMessages(exit1)"); } @@ -1058,7 +1029,8 @@ namespace { TTcp2Message Msg_; //output - TOutputLock OutputLock_; + + TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads TAtomic NeedCheckReqsQueue_; TLockFreeQueue<TRequest*> Reqs_; TAtomic NeedCheckCancelsQueue_; @@ -1412,11 +1384,11 @@ namespace { void ProcessOutputQueue() { AtomicSet(NeedCheckOutputQueue_, 1); - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(false); return; } - DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree()); + DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)!OutputLock_.IsLocked()); } //must be called only after success aquiring output @@ -1444,10 +1416,10 @@ namespace { OutputLock_.Release(); if (!AtomicGet(NeedCheckOutputQueue_)) { - DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree()); + DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked()); return; } - } while (OutputLock_.TryAquire()); + } while (OutputLock_.TryAcquire()); DBGOUT("Server::SendMessages(exit1)"); } catch (...) { OnError(); @@ -1518,7 +1490,7 @@ namespace { TLockFreeQueue<TRequestId> FinReqs_; //output - TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads + TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads TAtomic NeedCheckOutputQueue_; NNeh::TAutoLockFreeQueue<TOutputData> OutputData_; TOutputBuffers OutputBuffers_; |