aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/tcp2.cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-28 00:51:44 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-28 00:51:44 +0000
commit149dc5893b4b90db5ad40d019d2d1ffa6e1f0abd (patch)
tree6e3243c1b457c31839ddc9fa75b5f46bcc1fecb4 /library/cpp/neh/tcp2.cpp
parent4c8dfa633cab20dabf3d11464d986335519bfcfa (diff)
parent71e9df83f284bf42c2f8ea872752bf02e1055555 (diff)
downloadydb-149dc5893b4b90db5ad40d019d2d1ffa6e1f0abd.tar.gz
Merge branch 'rightlib' into merge-libs-250328-0050
Diffstat (limited to 'library/cpp/neh/tcp2.cpp')
-rw-r--r--library/cpp/neh/tcp2.cpp30
1 files changed, 15 insertions, 15 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp
index 641408973d9..fa24635e465 100644
--- a/library/cpp/neh/tcp2.cpp
+++ b/library/cpp/neh/tcp2.cpp
@@ -621,8 +621,8 @@ namespace {
, State_(Init)
, BuffSize_(TTcp2Options::InputBufferSize)
, Buff_(new char[BuffSize_])
- , NeedCheckReqsQueue_(0)
- , NeedCheckCancelsQueue_(0)
+ , NeedCheckReqsQueue_(false)
+ , NeedCheckCancelsQueue_(false)
, GenReqId_(0)
, LastSendedReqId_(0)
{
@@ -651,7 +651,7 @@ namespace {
throw;
}
- AtomicSet(NeedCheckReqsQueue_, 1);
+ NeedCheckReqsQueue_.store(true);
req->SetConnection(this);
TAtomicBase state = AtomicGet(State_);
if (Y_LIKELY(state == Connected)) {
@@ -682,7 +682,7 @@ namespace {
//called from client thread
void Cancel(TRequestId id) {
Cancels_.Enqueue(id);
- AtomicSet(NeedCheckCancelsQueue_, 1);
+ NeedCheckCancelsQueue_.store(true);
if (Y_LIKELY(AtomicGet(State_) == Connected)) {
ProcessOutputCancelsQueue();
}
@@ -758,7 +758,7 @@ namespace {
do {
if (asioThread) {
- AtomicSet(NeedCheckCancelsQueue_, 0);
+ NeedCheckCancelsQueue_.store(false);
TRequestId reqId;
ProcessReqsInFlyQueue();
@@ -776,14 +776,14 @@ namespace {
}
}
}
- } else if (AtomicGet(NeedCheckCancelsQueue_)) {
+ } else if (NeedCheckCancelsQueue_.load()) {
AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
return;
}
TRequestId lastReqId = 0;
{
- AtomicSet(NeedCheckReqsQueue_, 0);
+ NeedCheckReqsQueue_.store(false);
TRequest* reqPtr;
while (Reqs_.Dequeue(&reqPtr)) {
@@ -824,7 +824,7 @@ namespace {
OutputLock_.Release();
- if (!AtomicGet(NeedCheckReqsQueue_) && !AtomicGet(NeedCheckCancelsQueue_)) {
+ if (!NeedCheckReqsQueue_.load() && !NeedCheckCancelsQueue_.load()) {
DBGOUT("TClient::SendMessages(exit2)");
return;
}
@@ -1031,9 +1031,9 @@ namespace {
//output
TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
- TAtomic NeedCheckReqsQueue_;
+ std::atomic<bool> NeedCheckReqsQueue_;
TLockFreeQueue<TRequest*> Reqs_;
- TAtomic NeedCheckCancelsQueue_;
+ std::atomic<bool> NeedCheckCancelsQueue_;
TLockFreeQueue<TRequestId> Cancels_;
TAdaptiveLock GenReqIdLock_;
std::atomic<TRequestId> GenReqId_;
@@ -1182,7 +1182,7 @@ namespace {
, RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
, BuffSize_(TTcp2Options::InputBufferSize)
, Buff_(new char[BuffSize_])
- , NeedCheckOutputQueue_(0)
+ , NeedCheckOutputQueue_(false)
{
DBGOUT("TServer::TConnection()");
}
@@ -1383,7 +1383,7 @@ namespace {
}
void ProcessOutputQueue() {
- AtomicSet(NeedCheckOutputQueue_, 1);
+ NeedCheckOutputQueue_.store(true);
if (OutputLock_.TryAcquire()) {
SendMessages(false);
return;
@@ -1396,7 +1396,7 @@ namespace {
DBGOUT("TServer::SendMessages(enter)");
try {
do {
- AtomicUnlock(&NeedCheckOutputQueue_);
+ NeedCheckOutputQueue_.store(false);
TAutoPtr<TOutputData> d;
while (OutputData_.Dequeue(&d)) {
d->MoveTo(OutputBuffers_);
@@ -1415,7 +1415,7 @@ namespace {
OutputLock_.Release();
- if (!AtomicGet(NeedCheckOutputQueue_)) {
+ if (!NeedCheckOutputQueue_.load()) {
DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked());
return;
}
@@ -1491,7 +1491,7 @@ namespace {
//output
TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
- TAtomic NeedCheckOutputQueue_;
+ std::atomic<bool> NeedCheckOutputQueue_;
NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;
TOutputBuffers OutputBuffers_;
};