diff options
| author | robot-piglet <[email protected]> | 2025-03-21 11:48:29 +0300 | 
|---|---|---|
| committer | robot-piglet <[email protected]> | 2025-03-21 12:09:16 +0300 | 
| commit | d920411538b9141e0cee8212be3c9ae0e77d1fab (patch) | |
| tree | ddeca883519e2a6476649d73ebaf80bde1a83ce8 /library/cpp/neh | |
| parent | 9ba31c6b407453a9514a2f36ce38cc980041e646 (diff) | |
Intermediate changes
commit_hash:acc6d7b7aee41b420f987775aa566ee89eacb8eb
Diffstat (limited to 'library/cpp/neh')
| -rw-r--r-- | library/cpp/neh/tcp2.cpp | 50 | 
1 files changed, 11 insertions, 39 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp index 3dad055af1f..641408973d9 100644 --- a/library/cpp/neh/tcp2.cpp +++ b/library/cpp/neh/tcp2.cpp @@ -332,35 +332,6 @@ namespace {              char MemPool_[MemPoolSize_ + MemPoolReserve_];          }; -        //protector for limit usage tcp connection output (and used data) only from one thread at same time -        class TOutputLock { -        public: -            TOutputLock() noexcept -                : Lock_(0) -            { -            } - -            bool TryAquire() noexcept { -                do { -                    if (AtomicTryLock(&Lock_)) { -                        return true; -                    } -                } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable -                return false; -            } - -            void Release() noexcept { -                AtomicUnlock(&Lock_); -            } - -            bool IsFree() const noexcept { -                return !AtomicGet(Lock_); -            } - -        private: -            TAtomic Lock_; -        }; -          class TClient {              class TRequest;              class TConnection; @@ -718,13 +689,13 @@ namespace {                  }                  void ProcessOutputReqsQueue() { -                    if (OutputLock_.TryAquire()) { +                    if (OutputLock_.TryAcquire()) {                          SendMessages(false);                      }                  }                  void ProcessOutputCancelsQueue() { -                    if (OutputLock_.TryAquire()) { +                    if (OutputLock_.TryAcquire()) {                          AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));                          return;                      } @@ -763,7 +734,7 @@ namespace {                              PrepareSocket(AS_.Native());                              AtomicSet(State_, Connected);                              AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2)); -                            if (OutputLock_.TryAquire()) { +                            if (OutputLock_.TryAcquire()) {                                  SendMessages(true);                                  return;                              } @@ -857,7 +828,7 @@ namespace {                              DBGOUT("TClient::SendMessages(exit2)");                              return;                          } -                    } while (OutputLock_.TryAquire()); +                    } while (OutputLock_.TryAcquire());                      DBGOUT("TClient::SendMessages(exit1)");                  } @@ -1058,7 +1029,8 @@ namespace {                  TTcp2Message Msg_;                  //output -                TOutputLock OutputLock_; + +                TSpinLock OutputLock_;  //protect socket/buffers from simultaneous access from few threads                  TAtomic NeedCheckReqsQueue_;                  TLockFreeQueue<TRequest*> Reqs_;                  TAtomic NeedCheckCancelsQueue_; @@ -1412,11 +1384,11 @@ namespace {                  void ProcessOutputQueue() {                      AtomicSet(NeedCheckOutputQueue_, 1); -                    if (OutputLock_.TryAquire()) { +                    if (OutputLock_.TryAcquire()) {                          SendMessages(false);                          return;                      } -                    DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree()); +                    DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)!OutputLock_.IsLocked());                  }                  //must be called only after success aquiring output @@ -1444,10 +1416,10 @@ namespace {                              OutputLock_.Release();                              if (!AtomicGet(NeedCheckOutputQueue_)) { -                                DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree()); +                                DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked());                                  return;                              } -                        } while (OutputLock_.TryAquire()); +                        } while (OutputLock_.TryAcquire());                          DBGOUT("Server::SendMessages(exit1)");                      } catch (...) {                          OnError(); @@ -1518,7 +1490,7 @@ namespace {                  TLockFreeQueue<TRequestId> FinReqs_;                  //output -                TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads +                TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads                  TAtomic NeedCheckOutputQueue_;                  NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;                  TOutputBuffers OutputBuffers_;  | 
