diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-03-27 15:55:51 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-03-27 16:07:07 +0300 |
commit | 77c5e3305c6786b76d694a755614a8e878f4ac33 (patch) | |
tree | 22c89481dad54bb9d001c417b4094482b2f8b7cf /library/cpp/neh | |
parent | 3098b5d7f2fe6c0bbee56a7b57371dec369a9e1a (diff) | |
download | ydb-77c5e3305c6786b76d694a755614a8e878f4ac33.tar.gz |
Intermediate changes
commit_hash:0830a73862cb129490950eb5b3986d9c2db68d84
Diffstat (limited to 'library/cpp/neh')
-rw-r--r-- | library/cpp/neh/tcp2.cpp | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp index 641408973d..fa24635e46 100644 --- a/library/cpp/neh/tcp2.cpp +++ b/library/cpp/neh/tcp2.cpp @@ -621,8 +621,8 @@ namespace { , State_(Init) , BuffSize_(TTcp2Options::InputBufferSize) , Buff_(new char[BuffSize_]) - , NeedCheckReqsQueue_(0) - , NeedCheckCancelsQueue_(0) + , NeedCheckReqsQueue_(false) + , NeedCheckCancelsQueue_(false) , GenReqId_(0) , LastSendedReqId_(0) { @@ -651,7 +651,7 @@ namespace { throw; } - AtomicSet(NeedCheckReqsQueue_, 1); + NeedCheckReqsQueue_.store(true); req->SetConnection(this); TAtomicBase state = AtomicGet(State_); if (Y_LIKELY(state == Connected)) { @@ -682,7 +682,7 @@ namespace { //called from client thread void Cancel(TRequestId id) { Cancels_.Enqueue(id); - AtomicSet(NeedCheckCancelsQueue_, 1); + NeedCheckCancelsQueue_.store(true); if (Y_LIKELY(AtomicGet(State_) == Connected)) { ProcessOutputCancelsQueue(); } @@ -758,7 +758,7 @@ namespace { do { if (asioThread) { - AtomicSet(NeedCheckCancelsQueue_, 0); + NeedCheckCancelsQueue_.store(false); TRequestId reqId; ProcessReqsInFlyQueue(); @@ -776,14 +776,14 @@ namespace { } } } - } else if (AtomicGet(NeedCheckCancelsQueue_)) { + } else if (NeedCheckCancelsQueue_.load()) { AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true)); return; } TRequestId lastReqId = 0; { - AtomicSet(NeedCheckReqsQueue_, 0); + NeedCheckReqsQueue_.store(false); TRequest* reqPtr; while (Reqs_.Dequeue(&reqPtr)) { @@ -824,7 +824,7 @@ namespace { OutputLock_.Release(); - if (!AtomicGet(NeedCheckReqsQueue_) && !AtomicGet(NeedCheckCancelsQueue_)) { + if (!NeedCheckReqsQueue_.load() && !NeedCheckCancelsQueue_.load()) { DBGOUT("TClient::SendMessages(exit2)"); return; } @@ -1031,9 +1031,9 @@ namespace { //output TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads - TAtomic NeedCheckReqsQueue_; + std::atomic<bool> NeedCheckReqsQueue_; TLockFreeQueue<TRequest*> Reqs_; - TAtomic NeedCheckCancelsQueue_; + std::atomic<bool> NeedCheckCancelsQueue_; TLockFreeQueue<TRequestId> Cancels_; TAdaptiveLock GenReqIdLock_; std::atomic<TRequestId> GenReqId_; @@ -1182,7 +1182,7 @@ namespace { , RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr())) , BuffSize_(TTcp2Options::InputBufferSize) , Buff_(new char[BuffSize_]) - , NeedCheckOutputQueue_(0) + , NeedCheckOutputQueue_(false) { DBGOUT("TServer::TConnection()"); } @@ -1383,7 +1383,7 @@ namespace { } void ProcessOutputQueue() { - AtomicSet(NeedCheckOutputQueue_, 1); + NeedCheckOutputQueue_.store(true); if (OutputLock_.TryAcquire()) { SendMessages(false); return; @@ -1396,7 +1396,7 @@ namespace { DBGOUT("TServer::SendMessages(enter)"); try { do { - AtomicUnlock(&NeedCheckOutputQueue_); + NeedCheckOutputQueue_.store(false); TAutoPtr<TOutputData> d; while (OutputData_.Dequeue(&d)) { d->MoveTo(OutputBuffers_); @@ -1415,7 +1415,7 @@ namespace { OutputLock_.Release(); - if (!AtomicGet(NeedCheckOutputQueue_)) { + if (!NeedCheckOutputQueue_.load()) { DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked()); return; } @@ -1491,7 +1491,7 @@ namespace { //output TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads - TAtomic NeedCheckOutputQueue_; + std::atomic<bool> NeedCheckOutputQueue_; NNeh::TAutoLockFreeQueue<TOutputData> OutputData_; TOutputBuffers OutputBuffers_; }; |