#include "netliba_udp_http.h"
#include "utils.h"
#include <library/cpp/netliba/v6/cpu_affinity.h>
#include <library/cpp/netliba/v6/stdafx.h>
#include <library/cpp/netliba/v6/udp_client_server.h>
#include <library/cpp/netliba/v6/udp_socket.h>
#include <library/cpp/netliba/v6/block_chain.h> // depend on another headers
#include <util/system/hp_timer.h>
#include <util/system/shmat.h>
#include <util/system/spinlock.h>
#include <util/system/thread.h>
#include <util/system/types.h>
#include <util/system/yassert.h>
#include <util/thread/lfqueue.h>
#include <atomic>
#if !defined(_win_)
#include <signal.h>
#include <pthread.h>
#endif
using namespace NNetliba;
namespace {
const float HTTP_TIMEOUT = 15.0f;
const size_t MIN_SHARED_MEM_PACKET = 1000;
const size_t MAX_PACKET_SIZE = 0x70000000;
NNeh::TAtomicBool PanicAttack;
std::atomic<NHPTimer::STime> LastHeartbeat;
std::atomic<double> HeartbeatTimeout;
bool IsLocal(const TUdpAddress& addr) {
return addr.IsIPv4() ? IsLocalIPv4(addr.GetIPv4()) : IsLocalIPv6(addr.Network, addr.Interface);
}
void StopAllNetLibaThreads() {
PanicAttack = true; // AAAA!!!!
}
void ReadShm(TSharedMemory* shm, TVector<char>* data) {
Y_ASSERT(shm);
int dataSize = shm->GetSize();
data->yresize(dataSize);
memcpy(&(*data)[0], shm->GetPtr(), dataSize);
}
void ReadShm(TSharedMemory* shm, TString* data) {
Y_ASSERT(shm);
size_t dataSize = shm->GetSize();
data->ReserveAndResize(dataSize);
memcpy(data->begin(), shm->GetPtr(), dataSize);
}
template <class T>
void EraseList(TLockFreeQueue<T*>* data) {
T* ptr = 0;
while (data->Dequeue(&ptr)) {
delete ptr;
}
}
enum EHttpPacket {
PKT_REQUEST,
PKT_PING,
PKT_PING_RESPONSE,
PKT_RESPONSE,
PKT_LOCAL_REQUEST,
PKT_LOCAL_RESPONSE,
PKT_CANCEL,
};
}
namespace NNehNetliba {
TUdpHttpMessage::TUdpHttpMessage(const TGUID& reqId, const TUdpAddress& peerAddr)
: ReqId(reqId)
, PeerAddress(peerAddr)
{
}
TUdpHttpRequest::TUdpHttpRequest(TAutoPtr<TRequest>& dataHolder, const TGUID& reqId, const TUdpAddress& peerAddr)
: TUdpHttpMessage(reqId, peerAddr)
{
TBlockChainIterator reqData(dataHolder->Data->GetChain());
char pktType;
reqData.Read(&pktType, 1);
ReadArr(&reqData, &Url);
if (pktType == PKT_REQUEST) {
ReadYArr(&reqData, &Data);
} else if (pktType == PKT_LOCAL_REQUEST) {
ReadShm(dataHolder->Data->GetSharedData(), &Data);
} else {
Y_ASSERT(0);
}
if (reqData.HasFailed()) {
Y_ASSERT(0 && "wrong format, memory corruption suspected");
Url = "";
Data.clear();
}
}
TUdpHttpResponse::TUdpHttpResponse(TAutoPtr<TRequest>& dataHolder, const TGUID& reqId, const TUdpAddress& peerAddr, EResult result, const char* error)
: TUdpHttpMessage(reqId, peerAddr)
, Ok(result)
{
if (result == TUdpHttpResponse::FAILED) {
Error = error ? error : "request failed";
} else if (result == TUdpHttpResponse::CANCELED) {
Error = error ? error : "request cancelled";
} else {
TBlockChainIterator reqData(dataHolder->Data->GetChain());
if (Y_UNLIKELY(reqData.HasFailed())) {
Y_ASSERT(0 && "wrong format, memory corruption suspected");
Ok = TUdpHttpResponse::FAILED;
Data.clear();
Error = "wrong response format";
} else {
char pktType;
reqData.Read(&pktType, 1);
TGUID guid;
reqData.Read(&guid, sizeof(guid));
Y_ASSERT(ReqId == guid);
if (pktType == PKT_RESPONSE) {
ReadArr<TString>(&reqData, &Data);
} else if (pktType == PKT_LOCAL_RESPONSE) {
ReadShm(dataHolder->Data->GetSharedData(), &Data);
} else {
Y_ASSERT(0);
}
}
}
}
class TUdpHttp: public IRequester {
enum EDir {
DIR_OUT,
DIR_IN
};
struct TInRequestState {
enum EState {
S_WAITING,
S_RESPONSE_SENDING,
S_CANCELED,
};
TInRequestState()
: State(S_WAITING)
{
}
TInRequestState(const TUdpAddress& address)
: State(S_WAITING)
, Address(address)
{
}
EState State;
TUdpAddress Address;
};
struct TOutRequestState {
enum EState {
S_SENDING,
S_WAITING,
S_WAITING_PING_SENDING,
S_WAITING_PING_SENT,
S_CANCEL_AFTER_SENDING
};
TOutRequestState()
: State(S_SENDING)
, TimePassed(0)
, PingTransferId(-1)
{
}
EState State;
TUdpAddress Address;
double TimePassed;
int PingTransferId;
IEventsCollectorRef EventsCollector;
};
struct TTransferPurpose {
EDir Dir;
TGUID Guid;
TTransferPurpose()
: Dir(DIR_OUT)
{
}
TTransferPurpose(EDir dir, TGUID guid)
: Dir(dir)
, Guid(guid)
{
}
};
struct TSendRequest {
TSendRequest() = default;
TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqGuid, const IEventsCollectorRef& eventsCollector)
: Addr(addr)
, Data(*data)
, ReqGuid(reqGuid)
, EventsCollector(eventsCollector)
, Crc32(CalcChecksum(Data->GetChain()))
{
}
TUdpAddress Addr;
TAutoPtr<TRopeDataPacket> Data;
TGUID ReqGuid;
IEventsCollectorRef EventsCollector;
ui32 Crc32;
};
struct TSendResponse {
TSendResponse() = default;
TSendResponse(const TGUID& reqGuid, EPacketPriority prior, TVector<char>* data)
: ReqGuid(reqGuid)
, DataCrc32(0)
, Priority(prior)
{
if (data && !data->empty()) {
data->swap(Data);
DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize());
}
}
TVector<char> Data;
TGUID ReqGuid;
ui32 DataCrc32;
EPacketPriority Priority;
};
typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash;
typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash;
public:
TUdpHttp(const IEventsCollectorRef& eventsCollector)
: MyThread_(ExecServerThread, (void*)this)
, AbortTransactions_(false)
, Port_(0)
, EventCollector_(eventsCollector)
, ReportRequestCancel_(false)
, ReporRequestAck_(false)
, PhysicalCpu_(-1)
{
}
~TUdpHttp() override {
if (MyThread_.Running()) {
AtomicSet(KeepRunning_, 0);
MyThread_.Join();
}
}
bool Start(int port, int physicalCpu) {
Y_ASSERT(Host_.Get() == nullptr);
Port_ = port;
PhysicalCpu_ = physicalCpu;
MyThread_.Start();
HasStarted_.Wait();
return Host_.Get() != nullptr;
}
void EnableReportRequestCancel() override {
ReportRequestCancel_ = true;
}
void EnableReportRequestAck() override {
ReporRequestAck_ = true;
}
void SendRequest(const TUdpAddress& addr, const TString& url, const TString& data, const TGUID& reqId) override {
Y_VERIFY(
data.size() < MAX_PACKET_SIZE,
"data size is too large; data.size()=%" PRISZT ", MAX_PACKET_SIZE=%" PRISZT,
data.size(), MAX_PACKET_SIZE);
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
if (data.size() > MIN_SHARED_MEM_PACKET && IsLocal(addr)) {
TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
if (shm->Create(data.size())) {
ms->Write((char)PKT_LOCAL_REQUEST);
ms->WriteStroka(url);
memcpy(shm->GetPtr(), data.begin(), data.size());
ms->AttachSharedData(shm);
}
}
if (ms->GetSharedData() == nullptr) {
ms->Write((char)PKT_REQUEST);
ms->WriteStroka(url);
struct TStrokaStorage: public TThrRefBase, public TString {
TStrokaStorage(const TString& s)
: TString(s)
{
}
};
TStrokaStorage* ss = new TStrokaStorage(data);
ms->Write((int)ss->size());
ms->AddBlock(ss, ss->begin(), ss->size());
}
SendReqList_.Enqueue(new TSendRequest(addr, &ms, reqId, EventCollector_));
Host_->CancelWait();
}
void CancelRequest(const TGUID& reqId) override {
CancelReqList_.Enqueue(reqId);
Host_->CancelWait();
}
void SendResponse(const TGUID& reqId, TVector<char>* data) override {
if (data && data->size() > MAX_PACKET_SIZE) {
Y_FAIL(
"data size is too large; data->size()=%" PRISZT ", MAX_PACKET_SIZE=%" PRISZT,
data->size(), MAX_PACKET_SIZE);
}
SendRespList_.Enqueue(new TSendResponse(reqId, PP_NORMAL, data));
Host_->CancelWait();
}
void StopNoWait() override {
AbortTransactions_ = true;
AtomicSet(KeepRunning_, 0);
// calcel all outgoing requests
TGuard<TSpinLock> lock(Spn_);
while (!OutRequests_.empty()) {
// cancel without informing peer that we are cancelling the request
FinishRequest(OutRequests_.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()");
}
}
private:
void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TRequestPtr data, const char* error = nullptr) {
TOutRequestState& s = i->second;
s.EventsCollector->AddResponse(new TUdpHttpResponse(data, i->first, s.Address, ok, error));
OutRequests_.erase(i);
}
int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) {
ui32 crc32 = CalcChecksum(data->GetChain());
return Host_->Send(addr, data.Release(), crc32, nullptr, PP_HIGH);
}
void ProcessIncomingPackets() {
TVector<TGUID> failedRequests;
for (;;) {
TAutoPtr<TRequest> req = Host_->GetRequest();
if (req.Get() == nullptr)
break;
TBlockChainIterator reqData(req->Data->GetChain());
char pktType;
reqData.Read(&pktType, 1);
switch (pktType) {
case PKT_REQUEST:
case PKT_LOCAL_REQUEST: {
TGUID reqId = req->Guid;
TInRequestHash::iterator z = InRequests_.find(reqId);
if (z != InRequests_.end()) {
// oops, this request already exists!
// might happen if request can be stored in single packet
// and this packet had source IP broken during transmission and managed to pass crc checks
// since we already reported wrong source address for this request to the user
// the best thing we can do is to stop the program to avoid further complications
// but we just report the accident to stderr
fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n",
GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(),
GetAddressAsString(req->Address).c_str());
} else {
InRequests_[reqId] = TInRequestState(req->Address);
EventCollector_->AddRequest(new TUdpHttpRequest(req, reqId, req->Address));
}
} break;
case PKT_PING: {
TGUID guid;
reqData.Read(&guid, sizeof(guid));
bool ok = InRequests_.find(guid) != InRequests_.end();
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ms->Write((char)PKT_PING_RESPONSE);
ms->Write(guid);
ms->Write(ok);
SendWithHighPriority(req->Address, ms.Release());
} break;
case PKT_PING_RESPONSE: {
TGUID guid;
bool ok;
reqData.Read(&guid, sizeof(guid));
reqData.Read(&ok, sizeof(ok));
TOutRequestHash::iterator i = OutRequests_.find(guid);
if (i == OutRequests_.end())
; //Y_ASSERT(0); // actually possible with some packet orders
else {
if (!ok) {
// can not delete request at this point
// since we can receive failed ping and response at the same moment
// consider sequence: client sends ping, server sends response
// and replies false to ping as reply is sent
// we can not receive failed ping_response earlier then response itself
// but we can receive them simultaneously
failedRequests.push_back(guid);
} else {
TOutRequestState& s = i->second;
switch (s.State) {
case TOutRequestState::S_WAITING_PING_SENDING: {
Y_ASSERT(s.PingTransferId >= 0);
TTransferHash::iterator k = TransferHash_.find(s.PingTransferId);
if (k != TransferHash_.end())
TransferHash_.erase(k);
else
Y_ASSERT(0);
s.PingTransferId = -1;
s.TimePassed = 0;
s.State = TOutRequestState::S_WAITING;
} break;
case TOutRequestState::S_WAITING_PING_SENT:
s.TimePassed = 0;
s.State = TOutRequestState::S_WAITING;
break;
default:
Y_ASSERT(0);
break;
}
}
}
} break;
case PKT_RESPONSE:
case PKT_LOCAL_RESPONSE: {
TGUID guid;
reqData.Read(&guid, sizeof(guid));
TOutRequestHash::iterator i = OutRequests_.find(guid);
if (i == OutRequests_.end()) {
; //Y_ASSERT(0); // does happen
} else {
FinishRequest(i, TUdpHttpResponse::OK, req);
}
} break;
case PKT_CANCEL: {
TGUID guid;
reqData.Read(&guid, sizeof(guid));
TInRequestHash::iterator i = InRequests_.find(guid);
if (i == InRequests_.end()) {
; //Y_ASSERT(0); // may happen
} else {
TInRequestState& s = i->second;
if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel_)
EventCollector_->AddCancel(guid);
s.State = TInRequestState::S_CANCELED;
}
} break;
default:
Y_ASSERT(0);
}
}
// cleanup failed requests
for (size_t k = 0; k < failedRequests.size(); ++k) {
const TGUID& guid = failedRequests[k];
TOutRequestHash::iterator i = OutRequests_.find(guid);
if (i != OutRequests_.end())
FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "failed udp ping");
}
}
void AnalyzeSendResults() {
TSendResult res;
while (Host_->GetSendResult(&res)) {
TTransferHash::iterator k = TransferHash_.find(res.TransferId);
if (k != TransferHash_.end()) {
const TTransferPurpose& tp = k->second;
switch (tp.Dir) {
case DIR_OUT: {
TOutRequestHash::iterator i = OutRequests_.find(tp.Guid);
if (i != OutRequests_.end()) {
const TGUID& reqId = i->first;
TOutRequestState& s = i->second;
switch (s.State) {
case TOutRequestState::S_SENDING:
if (!res.Success) {
FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING");
} else {
if (ReporRequestAck_ && !!s.EventsCollector) {
s.EventsCollector->AddRequestAck(reqId);
}
s.State = TOutRequestState::S_WAITING;
s.TimePassed = 0;
}
break;
case TOutRequestState::S_CANCEL_AFTER_SENDING:
DoSendCancel(s.Address, reqId);
FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING");
break;
case TOutRequestState::S_WAITING:
case TOutRequestState::S_WAITING_PING_SENT:
Y_ASSERT(0);
break;
case TOutRequestState::S_WAITING_PING_SENDING:
Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId);
if (!res.Success) {
FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING");
} else {
s.PingTransferId = -1;
s.State = TOutRequestState::S_WAITING_PING_SENT;
s.TimePassed = 0;
}
break;
default:
Y_ASSERT(0);
break;
}
}
} break;
case DIR_IN: {
TInRequestHash::iterator i = InRequests_.find(tp.Guid);
if (i != InRequests_.end()) {
Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED);
InRequests_.erase(i);
}
} break;
default:
Y_ASSERT(0);
break;
}
TransferHash_.erase(k);
}
}
}
void SendPingsIfNeeded() {
NHPTimer::STime tChk = PingsSendT_;
float deltaT = (float)NHPTimer::GetTimePassed(&tChk);
if (deltaT < 0.05) {
return;
}
PingsSendT_ = tChk;
deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3);
{
for (TOutRequestHash::iterator i = OutRequests_.begin(); i != OutRequests_.end();) {
TOutRequestHash::iterator curIt = i++;
TOutRequestState& s = curIt->second;
const TGUID& guid = curIt->first;
switch (s.State) {
case TOutRequestState::S_WAITING:
s.TimePassed += deltaT;
if (s.TimePassed > HTTP_TIMEOUT) {
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ms->Write((char)PKT_PING);
ms->Write(guid);
int transId = SendWithHighPriority(s.Address, ms.Release());
TransferHash_[transId] = TTransferPurpose(DIR_OUT, guid);
s.State = TOutRequestState::S_WAITING_PING_SENDING;
s.PingTransferId = transId;
}
break;
case TOutRequestState::S_WAITING_PING_SENT:
s.TimePassed += deltaT;
if (s.TimePassed > HTTP_TIMEOUT) {
FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT");
}
break;
default:
break;
}
}
}
}
void Step() {
{
TGuard<TSpinLock> lock(Spn_);
DoSends();
}
Host_->Step();
{
TGuard<TSpinLock> lock(Spn_);
DoSends();
ProcessIncomingPackets();
AnalyzeSendResults();
SendPingsIfNeeded();
}
}
void Wait() {
Host_->Wait(0.1f);
}
void DoSendCancel(const TUdpAddress& addr, const TGUID& req) {
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ms->Write((char)PKT_CANCEL);
ms->Write(req);
SendWithHighPriority(addr, ms);
}
void DoSends() {
{
// cancelling requests
TGUID reqGuid;
while (CancelReqList_.Dequeue(&reqGuid)) {
TOutRequestHash::iterator i = OutRequests_.find(reqGuid);
if (i == OutRequests_.end()) {
AnticipateCancels_.insert(reqGuid);
continue; // cancelling non existing request is ok
}
TOutRequestState& s = i->second;
if (s.State == TOutRequestState::S_SENDING) {
// we are in trouble - have not sent request and we already have to cancel it, wait send
s.State = TOutRequestState::S_CANCEL_AFTER_SENDING;
s.EventsCollector->AddCancel(i->first);
} else {
DoSendCancel(s.Address, reqGuid);
FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side");
}
}
}
{
// sending replies
for (TSendResponse* rd = nullptr; SendRespList_.Dequeue(&rd); delete rd) {
TInRequestHash::iterator i = InRequests_.find(rd->ReqGuid);
if (i == InRequests_.end()) {
Y_ASSERT(0);
continue;
}
TInRequestState& s = i->second;
if (s.State == TInRequestState::S_CANCELED) {
// need not send response for the canceled request
InRequests_.erase(i);
continue;
}
Y_ASSERT(s.State == TInRequestState::S_WAITING);
s.State = TInRequestState::S_RESPONSE_SENDING;
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ui32 crc32 = 0;
int dataSize = rd->Data.ysize();
if (rd->Data.size() > MIN_SHARED_MEM_PACKET && IsLocal(s.Address)) {
TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
if (shm->Create(dataSize)) {
ms->Write((char)PKT_LOCAL_RESPONSE);
ms->Write(rd->ReqGuid);
memcpy(shm->GetPtr(), &rd->Data[0], dataSize);
TVector<char> empty;
rd->Data.swap(empty);
ms->AttachSharedData(shm);
crc32 = CalcChecksum(ms->GetChain());
}
}
if (ms->GetSharedData() == nullptr) {
ms->Write((char)PKT_RESPONSE);
ms->Write(rd->ReqGuid);
// to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32
// this means that we are calculating crc when shared memory is used
// it is hard to avoid since in SendResponse() we don't know if shared mem will be used
// (peer address is not available there)
TIncrementalChecksumCalcer csCalcer;
AddChain(&csCalcer, ms->GetChain());
// here we are replicating the way WriteDestructive serializes data
csCalcer.AddBlock(&dataSize, sizeof(dataSize));
csCalcer.AddBlockSum(rd->DataCrc32, dataSize);
crc32 = csCalcer.CalcChecksum();
ms->WriteDestructive(&rd->Data);
//ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses
//Y_ASSERT(chkCrc == crc32);
}
int transId = Host_->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority);
TransferHash_[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid);
}
}
{
// sending requests
for (TSendRequest* rd = nullptr; SendReqList_.Dequeue(&rd); delete rd) {
Y_ASSERT(OutRequests_.find(rd->ReqGuid) == OutRequests_.end());
{
TOutRequestState& s = OutRequests_[rd->ReqGuid];
s.State = TOutRequestState::S_SENDING;
s.Address = rd->Addr;
s.EventsCollector = rd->EventsCollector;
}
if (AnticipateCancels_.find(rd->ReqGuid) != AnticipateCancels_.end()) {
FinishRequest(OutRequests_.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "Canceled (before transmit)");
} else {
TGUID pktGuid = rd->ReqGuid; // request packet id should match request id
int transId = Host_->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL);
TransferHash_[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid);
}
}
}
if (!AnticipateCancels_.empty()) {
AnticipateCancels_.clear();
}
}
void FinishOutstandingTransactions() {
// wait all pending requests, all new requests are canceled
while ((!OutRequests_.empty() || !InRequests_.empty() || !SendRespList_.IsEmpty() || !SendReqList_.IsEmpty()) && !PanicAttack) {
Step();
sleep(0);
}
}
static void* ExecServerThread(void* param) {
TUdpHttp* pThis = (TUdpHttp*)param;
if (pThis->GetPhysicalCpu() >= 0) {
BindToSocket(pThis->GetPhysicalCpu());
}
SetHighestThreadPriority();
TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket();
socket->Open(pThis->Port_);
if (socket->IsValid()) {
pThis->Port_ = socket->GetPort();
pThis->Host_ = CreateUdpHost(socket);
} else {
pThis->Host_ = nullptr;
}
pThis->HasStarted_.Signal();
if (!pThis->Host_)
return nullptr;
NHPTimer::GetTime(&pThis->PingsSendT_);
while (AtomicGet(pThis->KeepRunning_) && !PanicAttack) {
if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) {
NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire);
if (NHPTimer::GetTimePassed(&chk) > HeartbeatTimeout.load(std::memory_order_acquire)) {
StopAllNetLibaThreads();
#ifndef _win_
killpg(0, SIGKILL);
#endif
abort();
break;
}
}
pThis->Step();
pThis->Wait();
}
if (!pThis->AbortTransactions_ && !PanicAttack) {
pThis->FinishOutstandingTransactions();
}
pThis->Host_ = nullptr;
return nullptr;
}
int GetPhysicalCpu() const noexcept {
return PhysicalCpu_;
}
private:
TThread MyThread_;
TAtomic KeepRunning_ = 1;
bool AbortTransactions_;
TSpinLock Spn_;
TSystemEvent HasStarted_;
NHPTimer::STime PingsSendT_;
TIntrusivePtr<IUdpHost> Host_;
int Port_;
TOutRequestHash OutRequests_;
TInRequestHash InRequests_;
typedef THashMap<int, TTransferPurpose> TTransferHash;
TTransferHash TransferHash_;
// hold it here to not construct on every DoSends()
typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels;
TAnticipateCancels AnticipateCancels_;
TLockFreeQueue<TSendRequest*> SendReqList_;
TLockFreeQueue<TSendResponse*> SendRespList_;
TLockFreeQueue<TGUID> CancelReqList_;
TIntrusivePtr<IEventsCollector> EventCollector_;
bool ReportRequestCancel_;
bool ReporRequestAck_;
int PhysicalCpu_;
};
IRequesterRef CreateHttpUdpRequester(int port, const IEventsCollectorRef& ec, int physicalCpu) {
TUdpHttp* udpHttp = new TUdpHttp(ec);
IRequesterRef res(udpHttp);
if (!udpHttp->Start(port, physicalCpu)) {
if (port) {
ythrow yexception() << "netliba can't bind port=" << port;
} else {
ythrow yexception() << "netliba can't bind random port";
}
}
return res;
}
}