aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-03-27 15:55:51 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-03-27 16:07:07 +0300
commit77c5e3305c6786b76d694a755614a8e878f4ac33 (patch)
tree22c89481dad54bb9d001c417b4094482b2f8b7cf /library/cpp/neh
parent3098b5d7f2fe6c0bbee56a7b57371dec369a9e1a (diff)
downloadydb-77c5e3305c6786b76d694a755614a8e878f4ac33.tar.gz
Intermediate changes
commit_hash:0830a73862cb129490950eb5b3986d9c2db68d84
Diffstat (limited to 'library/cpp/neh')
-rw-r--r--library/cpp/neh/tcp2.cpp30
1 files changed, 15 insertions, 15 deletions
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp
index 641408973d..fa24635e46 100644
--- a/library/cpp/neh/tcp2.cpp
+++ b/library/cpp/neh/tcp2.cpp
@@ -621,8 +621,8 @@ namespace {
, State_(Init)
, BuffSize_(TTcp2Options::InputBufferSize)
, Buff_(new char[BuffSize_])
- , NeedCheckReqsQueue_(0)
- , NeedCheckCancelsQueue_(0)
+ , NeedCheckReqsQueue_(false)
+ , NeedCheckCancelsQueue_(false)
, GenReqId_(0)
, LastSendedReqId_(0)
{
@@ -651,7 +651,7 @@ namespace {
throw;
}
- AtomicSet(NeedCheckReqsQueue_, 1);
+ NeedCheckReqsQueue_.store(true);
req->SetConnection(this);
TAtomicBase state = AtomicGet(State_);
if (Y_LIKELY(state == Connected)) {
@@ -682,7 +682,7 @@ namespace {
//called from client thread
void Cancel(TRequestId id) {
Cancels_.Enqueue(id);
- AtomicSet(NeedCheckCancelsQueue_, 1);
+ NeedCheckCancelsQueue_.store(true);
if (Y_LIKELY(AtomicGet(State_) == Connected)) {
ProcessOutputCancelsQueue();
}
@@ -758,7 +758,7 @@ namespace {
do {
if (asioThread) {
- AtomicSet(NeedCheckCancelsQueue_, 0);
+ NeedCheckCancelsQueue_.store(false);
TRequestId reqId;
ProcessReqsInFlyQueue();
@@ -776,14 +776,14 @@ namespace {
}
}
}
- } else if (AtomicGet(NeedCheckCancelsQueue_)) {
+ } else if (NeedCheckCancelsQueue_.load()) {
AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
return;
}
TRequestId lastReqId = 0;
{
- AtomicSet(NeedCheckReqsQueue_, 0);
+ NeedCheckReqsQueue_.store(false);
TRequest* reqPtr;
while (Reqs_.Dequeue(&reqPtr)) {
@@ -824,7 +824,7 @@ namespace {
OutputLock_.Release();
- if (!AtomicGet(NeedCheckReqsQueue_) && !AtomicGet(NeedCheckCancelsQueue_)) {
+ if (!NeedCheckReqsQueue_.load() && !NeedCheckCancelsQueue_.load()) {
DBGOUT("TClient::SendMessages(exit2)");
return;
}
@@ -1031,9 +1031,9 @@ namespace {
//output
TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
- TAtomic NeedCheckReqsQueue_;
+ std::atomic<bool> NeedCheckReqsQueue_;
TLockFreeQueue<TRequest*> Reqs_;
- TAtomic NeedCheckCancelsQueue_;
+ std::atomic<bool> NeedCheckCancelsQueue_;
TLockFreeQueue<TRequestId> Cancels_;
TAdaptiveLock GenReqIdLock_;
std::atomic<TRequestId> GenReqId_;
@@ -1182,7 +1182,7 @@ namespace {
, RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
, BuffSize_(TTcp2Options::InputBufferSize)
, Buff_(new char[BuffSize_])
- , NeedCheckOutputQueue_(0)
+ , NeedCheckOutputQueue_(false)
{
DBGOUT("TServer::TConnection()");
}
@@ -1383,7 +1383,7 @@ namespace {
}
void ProcessOutputQueue() {
- AtomicSet(NeedCheckOutputQueue_, 1);
+ NeedCheckOutputQueue_.store(true);
if (OutputLock_.TryAcquire()) {
SendMessages(false);
return;
@@ -1396,7 +1396,7 @@ namespace {
DBGOUT("TServer::SendMessages(enter)");
try {
do {
- AtomicUnlock(&NeedCheckOutputQueue_);
+ NeedCheckOutputQueue_.store(false);
TAutoPtr<TOutputData> d;
while (OutputData_.Dequeue(&d)) {
d->MoveTo(OutputBuffers_);
@@ -1415,7 +1415,7 @@ namespace {
OutputLock_.Release();
- if (!AtomicGet(NeedCheckOutputQueue_)) {
+ if (!NeedCheckOutputQueue_.load()) {
DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked());
return;
}
@@ -1491,7 +1491,7 @@ namespace {
//output
TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
- TAtomic NeedCheckOutputQueue_;
+ std::atomic<bool> NeedCheckOutputQueue_;
NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;
TOutputBuffers OutputBuffers_;
};