diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/udp_http.cpp | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/udp_http.cpp')
-rw-r--r-- | library/cpp/netliba/v6/udp_http.cpp | 1354 |
1 files changed, 1354 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/udp_http.cpp b/library/cpp/netliba/v6/udp_http.cpp new file mode 100644 index 0000000000..9fa0b07818 --- /dev/null +++ b/library/cpp/netliba/v6/udp_http.cpp @@ -0,0 +1,1354 @@ +#include "stdafx.h" +#include "udp_http.h" +#include "udp_client_server.h" +#include "udp_socket.h" +#include "cpu_affinity.h" + +#include <library/cpp/threading/atomic/bool.h> + +#include <util/system/hp_timer.h> +#include <util/thread/lfqueue.h> +#include <util/system/thread.h> +#include <util/system/spinlock.h> +#if !defined(_win_) +#include <signal.h> +#include <pthread.h> +#endif +#include "block_chain.h" +#include <util/system/shmat.h> + +#include <atomic> + +namespace NNetliba { + const float HTTP_TIMEOUT = 15.0f; + const int MIN_SHARED_MEM_PACKET = 1000; + + static ::NAtomic::TBool PanicAttack; + static std::atomic<NHPTimer::STime> LastHeartbeat; + static std::atomic<double> HeartbeatTimeout; + + static int GetPacketSize(TRequest* req) { + if (req && req->Data.Get()) + return req->Data->GetSize(); + return 0; + } + + static bool IsLocalFast(const TUdpAddress& addr) { + if (addr.IsIPv4()) { + return IsLocalIPv4(addr.GetIPv4()); + } else { + return IsLocalIPv6(addr.Network, addr.Interface); + } + } + + bool IsLocal(const TUdpAddress& addr) { + InitLocalIPList(); + return IsLocalFast(addr); + } + + TUdpHttpRequest::~TUdpHttpRequest() { + } + + TUdpHttpResponse::~TUdpHttpResponse() { + } + + class TRequesterUserQueueSizes: public TThrRefBase { + public: + TAtomic ReqCount, RespCount; + TAtomic ReqQueueSize, RespQueueSize; + + TRequesterUserQueueSizes() + : ReqCount(0) + , RespCount(0) + , ReqQueueSize(0) + , RespQueueSize(0) + { + } + }; + + template <class T> + void EraseList(TLockFreeQueue<T*>* data) { + T* ptr = nullptr; + while (data->Dequeue(&ptr)) { + delete ptr; + } + } + + class TRequesterUserQueues: public TThrRefBase { + TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes; + TLockFreeQueue<TUdpHttpRequest*> ReqList; + TLockFreeQueue<TUdpHttpResponse*> ResponseList; + TLockFreeStack<TGUID> CancelList, SendRequestAccList; // any order will do + TMuxEvent AsyncEvent; + + void UpdateAsyncSignalState() { + // not sure about this one. Idea is that AsyncEvent.Reset() is a memory barrier + if (ReqList.IsEmpty() && ResponseList.IsEmpty() && CancelList.IsEmpty() && SendRequestAccList.IsEmpty()) { + AsyncEvent.Reset(); + if (!ReqList.IsEmpty() || !ResponseList.IsEmpty() || !CancelList.IsEmpty() || !SendRequestAccList.IsEmpty()) + AsyncEvent.Signal(); + } + } + ~TRequesterUserQueues() override { + EraseList(&ReqList); + EraseList(&ResponseList); + } + + public: + TRequesterUserQueues(TRequesterUserQueueSizes* queueSizes) + : QueueSizes(queueSizes) + { + } + TUdpHttpRequest* GetRequest(); + TUdpHttpResponse* GetResponse(); + bool GetRequestCancel(TGUID* req) { + bool res = CancelList.Dequeue(req); + UpdateAsyncSignalState(); + return res; + } + bool GetSendRequestAcc(TGUID* req) { + bool res = SendRequestAccList.Dequeue(req); + UpdateAsyncSignalState(); + return res; + } + + void AddRequest(TUdpHttpRequest* res) { + AtomicAdd(QueueSizes->ReqCount, 1); + AtomicAdd(QueueSizes->ReqQueueSize, GetPacketSize(res->DataHolder.Get())); + ReqList.Enqueue(res); + AsyncEvent.Signal(); + } + void AddResponse(TUdpHttpResponse* res) { + AtomicAdd(QueueSizes->RespCount, 1); + AtomicAdd(QueueSizes->RespQueueSize, GetPacketSize(res->DataHolder.Get())); + ResponseList.Enqueue(res); + AsyncEvent.Signal(); + } + void AddCancel(const TGUID& req) { + CancelList.Enqueue(req); + AsyncEvent.Signal(); + } + void AddSendRequestAcc(const TGUID& req) { + SendRequestAccList.Enqueue(req); + AsyncEvent.Signal(); + } + TMuxEvent& GetAsyncEvent() { + return AsyncEvent; + } + void AsyncSignal() { + AsyncEvent.Signal(); + } + }; + + struct TOutRequestState { + enum EState { + S_SENDING, + S_WAITING, + S_WAITING_PING_SENDING, + S_WAITING_PING_SENT, + S_CANCEL_AFTER_SENDING + }; + EState State; + TUdpAddress Address; + double TimePassed; + int PingTransferId; + TIntrusivePtr<TRequesterUserQueues> UserQueues; + + TOutRequestState() + : State(S_SENDING) + , TimePassed(0) + , PingTransferId(-1) + { + } + }; + + struct TInRequestState { + enum EState { + S_WAITING, + S_RESPONSE_SENDING, + S_CANCELED, + }; + EState State; + TUdpAddress Address; + + TInRequestState() + : State(S_WAITING) + { + } + TInRequestState(const TUdpAddress& address) + : State(S_WAITING) + , Address(address) + { + } + }; + + enum EHttpPacket { + PKT_REQUEST, + PKT_PING, + PKT_PING_RESPONSE, + PKT_RESPONSE, + PKT_GETDEBUGINFO, + PKT_LOCAL_REQUEST, + PKT_LOCAL_RESPONSE, + PKT_CANCEL, + }; + + class TUdpHttp: public IRequester { + enum EDir { + DIR_OUT, + DIR_IN + }; + struct TTransferPurpose { + EDir Dir; + TGUID Guid; + TTransferPurpose() + : Dir(DIR_OUT) + { + } + TTransferPurpose(EDir dir, TGUID guid) + : Dir(dir) + , Guid(guid) + { + } + }; + + struct TSendRequest { + TUdpAddress Addr; + TAutoPtr<TRopeDataPacket> Data; + TGUID ReqGuid; + TIntrusivePtr<TWaitResponse> WR; + TIntrusivePtr<TRequesterUserQueues> UserQueues; + ui32 Crc32; + + TSendRequest() + : Crc32(0) + { + } + TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqguid, TWaitResponse* wr, TRequesterUserQueues* userQueues) + : Addr(addr) + , Data(*data) + , ReqGuid(reqguid) + , WR(wr) + , UserQueues(userQueues) + , Crc32(CalcChecksum(Data->GetChain())) + { + } + }; + struct TSendResponse { + TVector<char> Data; + TGUID ReqGuid; + ui32 DataCrc32; + EPacketPriority Priority; + + TSendResponse() + : DataCrc32(0) + , Priority(PP_NORMAL) + { + } + TSendResponse(const TGUID& reqguid, EPacketPriority prior, TVector<char>* data) + : ReqGuid(reqguid) + , DataCrc32(0) + , Priority(prior) + { + if (data && !data->empty()) { + data->swap(Data); + DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize()); + } + } + }; + struct TCancelRequest { + TGUID ReqGuid; + + TCancelRequest() = default; + TCancelRequest(const TGUID& reqguid) + : ReqGuid(reqguid) + { + } + }; + struct TBreakRequest { + TGUID ReqGuid; + + TBreakRequest() = default; + TBreakRequest(const TGUID& reqguid) + : ReqGuid(reqguid) + { + } + }; + + TThread myThread; + bool KeepRunning, AbortTransactions; + TSpinLock cs; + TSystemEvent HasStarted; + + NHPTimer::STime PingsSendT; + + TIntrusivePtr<IUdpHost> Host; + TIntrusivePtr<NNetlibaSocket::ISocket> Socket; + typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash; + typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash; + TOutRequestHash OutRequests; + TInRequestHash InRequests; + + typedef THashMap<int, TTransferPurpose> TTransferHash; + TTransferHash TransferHash; + + typedef THashMap<TGUID, TIntrusivePtr<TWaitResponse>, TGUIDHash> TSyncRequests; + TSyncRequests SyncRequests; + + // hold it here to not construct on every DoSends() + typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels; + TAnticipateCancels AnticipateCancels; + + TLockFreeQueue<TSendRequest*> SendReqList; + TLockFreeQueue<TSendResponse*> SendRespList; + TLockFreeQueue<TCancelRequest> CancelReqList; + TLockFreeQueue<TBreakRequest> BreakReqList; + + TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes; + TIntrusivePtr<TRequesterUserQueues> UserQueues; + + struct TStatsRequest: public TThrRefBase { + enum EReq { + PENDING_SIZE, + DEBUG_INFO, + HAS_IN_REQUEST, + GET_PEER_ADDRESS, + GET_PEER_QUEUE_STATS, + }; + EReq Req; + TRequesterPendingDataStats PendingDataSize; + TString DebugInfo; + TGUID RequestId; + TUdpAddress PeerAddress; + TIntrusivePtr<IPeerQueueStats> QueueStats; + bool RequestFound; + TSystemEvent Complete; + + TStatsRequest(EReq req) + : Req(req) + , RequestFound(false) + { + } + }; + TLockFreeQueue<TIntrusivePtr<TStatsRequest>> StatsReqList; + + bool ReportRequestCancel; + bool ReportSendRequestAcc; + + void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TAutoPtr<TRequest> data, const char* error = nullptr) { + TOutRequestState& s = i->second; + TUdpHttpResponse* res = new TUdpHttpResponse; + res->DataHolder = data; + res->ReqId = i->first; + res->PeerAddress = s.Address; + res->Ok = ok; + if (ok == TUdpHttpResponse::FAILED) + res->Error = error ? error : "request failed"; + else if (ok == TUdpHttpResponse::CANCELED) + res->Error = error ? error : "request cancelled"; + TSyncRequests::iterator k = SyncRequests.find(res->ReqId); + if (k != SyncRequests.end()) { + TIntrusivePtr<TWaitResponse>& wr = k->second; + wr->SetResponse(res); + SyncRequests.erase(k); + } else { + s.UserQueues->AddResponse(res); + } + + OutRequests.erase(i); + } + int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) { + ui32 crc32 = CalcChecksum(data->GetChain()); + return Host->Send(addr, data.Release(), crc32, nullptr, PP_HIGH); + } + void ProcessIncomingPackets() { + TVector<TGUID, TCustomAllocator<TGUID>> failedRequests; + for (;;) { + TAutoPtr<TRequest> req = Host->GetRequest(); + if (req.Get() == nullptr) { + if (!failedRequests.empty()) { + // we want to handle following sequence of events + // <- send ping + // -> send response over IB + // -> send ping response (no such request) over UDP + // Now if we are lucky enough we can get IB response waiting in the IB receive queue + // at the same time response sender will receive "send complete" from IB + // indeed, IB delivered message (but it was not parsed by ib_cs.cpp yet) + // so after receiving "send response complete" event resposne sender can legally response + // to pings with "no such request" + // but ping responses can be sent over UDP + // So we can run into situation with negative ping response in + // UDP receive queue and response waiting unprocessed in IB receive queue + // to check that there is no response in the IB queue we have to process IB queues + // so we call IBStep() + Host->IBStep(); + req = Host->GetRequest(); + if (req.Get() == nullptr) { + break; + } + } else { + break; + } + } + + TBlockChainIterator reqData(req->Data->GetChain()); + char pktType; + reqData.Read(&pktType, 1); + switch (pktType) { + case PKT_REQUEST: + case PKT_LOCAL_REQUEST: { + //printf("recv PKT_REQUEST or PKT_LOCAL_REQUEST\n"); + TGUID reqId = req->Guid; + TInRequestHash::iterator z = InRequests.find(reqId); + if (z != InRequests.end()) { + // oops, this request already exists! + // might happen if request can be stored in single packet + // and this packet had source IP broken during transmission and managed to pass crc checks + // since we already reported wrong source address for this request to the user + // the best thing we can do is to stop the program to avoid further complications + // but we just report the accident to stderr + fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n", + GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(), + GetAddressAsString(req->Address).c_str()); + } else { + InRequests[reqId] = TInRequestState(req->Address); + + //printf("InReq %s PKT_REQUEST recv ... -> S_WAITING\n", GetGuidAsString(reqId).c_str()); + + TUdpHttpRequest* res = new TUdpHttpRequest; + res->ReqId = reqId; + res->PeerAddress = req->Address; + res->DataHolder = req; + + UserQueues->AddRequest(res); + } + } break; + case PKT_PING: { + //printf("recv PKT_PING\n"); + TGUID guid; + reqData.Read(&guid, sizeof(guid)); + bool ok = InRequests.find(guid) != InRequests.end(); + TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket; + ms->Write((char)PKT_PING_RESPONSE); + ms->Write(guid); + ms->Write(ok); + SendWithHighPriority(req->Address, ms.Release()); + //printf("InReq %s PKT_PING recv Sending PKT_PING_RESPONSE\n", GetGuidAsString(guid).c_str()); + //printf("got PKT_PING, responding %d\n", (int)ok); + } break; + case PKT_PING_RESPONSE: { + //printf("recv PKT_PING_RESPONSE\n"); + TGUID guid; + bool ok; + reqData.Read(&guid, sizeof(guid)); + reqData.Read(&ok, sizeof(ok)); + TOutRequestHash::iterator i = OutRequests.find(guid); + if (i == OutRequests.end()) { + ; //Y_ASSERT(0); // actually possible with some packet orders + } else { + if (!ok) { + // can not delete request at this point + // since we can receive failed ping and response at the same moment + // consider sequence: client sends ping, server sends response + // and replies false to ping as reply is sent + // we can not receive failed ping_response earlier then response itself + // but we can receive them simultaneously + failedRequests.push_back(guid); + //printf("OutReq %s PKT_PING_RESPONSE recv no such query -> failed\n", GetGuidAsString(guid).c_str()); + } else { + TOutRequestState& s = i->second; + switch (s.State) { + case TOutRequestState::S_WAITING_PING_SENDING: { + Y_ASSERT(s.PingTransferId >= 0); + TTransferHash::iterator k = TransferHash.find(s.PingTransferId); + if (k != TransferHash.end()) + TransferHash.erase(k); + else + Y_ASSERT(0); + s.PingTransferId = -1; + s.TimePassed = 0; + s.State = TOutRequestState::S_WAITING; + //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENDING -> S_WAITING\n", GetGuidAsString(guid).c_str()); + } break; + case TOutRequestState::S_WAITING_PING_SENT: + s.TimePassed = 0; + s.State = TOutRequestState::S_WAITING; + //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENT -> S_WAITING\n", GetGuidAsString(guid).c_str()); + break; + default: + Y_ASSERT(0); + break; + } + } + } + } break; + case PKT_RESPONSE: + case PKT_LOCAL_RESPONSE: { + //printf("recv PKT_RESPONSE or PKT_LOCAL_RESPONSE\n"); + TGUID guid; + reqData.Read(&guid, sizeof(guid)); + TOutRequestHash::iterator i = OutRequests.find(guid); + if (i == OutRequests.end()) { + ; //Y_ASSERT(0); // does happen + //printf("OutReq %s PKT_RESPONSE recv for non-existing req\n", GetGuidAsString(guid).c_str()); + } else { + FinishRequest(i, TUdpHttpResponse::OK, req); + //printf("OutReq %s PKT_RESPONSE recv ... -> ok\n", GetGuidAsString(guid).c_str()); + } + } break; + case PKT_CANCEL: { + //printf("recv PKT_CANCEL\n"); + TGUID guid; + reqData.Read(&guid, sizeof(guid)); + TInRequestHash::iterator i = InRequests.find(guid); + if (i == InRequests.end()) { + ; //Y_ASSERT(0); // may happen + //printf("InReq %s PKT_CANCEL recv for non-existing req\n", GetGuidAsString(guid).c_str()); + } else { + TInRequestState& s = i->second; + if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel) + UserQueues->AddCancel(guid); + s.State = TInRequestState::S_CANCELED; + //printf("InReq %s PKT_CANCEL recv\n", GetGuidAsString(guid).c_str()); + } + } break; + case PKT_GETDEBUGINFO: { + //printf("recv PKT_GETDEBUGINFO\n"); + TString dbgInfo = GetDebugInfoLocked(); + TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket; + ms->Write(dbgInfo.c_str(), (int)dbgInfo.size()); + SendWithHighPriority(req->Address, ms); + } break; + default: + Y_ASSERT(0); + } + } + // cleanup failed requests + for (size_t k = 0; k < failedRequests.size(); ++k) { + const TGUID& guid = failedRequests[k]; + TOutRequestHash::iterator i = OutRequests.find(guid); + if (i != OutRequests.end()) + FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: recv no such query"); + } + } + void AnalyzeSendResults() { + TSendResult res; + while (Host->GetSendResult(&res)) { + //printf("Send result received\n"); + TTransferHash::iterator k1 = TransferHash.find(res.TransferId); + if (k1 != TransferHash.end()) { + const TTransferPurpose& tp = k1->second; + switch (tp.Dir) { + case DIR_OUT: { + TOutRequestHash::iterator i = OutRequests.find(tp.Guid); + if (i != OutRequests.end()) { + const TGUID& reqId = i->first; + TOutRequestState& s = i->second; + switch (s.State) { + case TOutRequestState::S_SENDING: + if (!res.Success) { + FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING"); + //printf("OutReq %s AnalyzeSendResults() S_SENDING -> failed\n", GetGuidAsString(reqId).c_str()); + } else { + if (ReportSendRequestAcc) { + if (s.UserQueues.Get()) { + s.UserQueues->AddSendRequestAcc(reqId); + } else { + // waitable request? + TSyncRequests::iterator k2 = SyncRequests.find(reqId); + if (k2 != SyncRequests.end()) { + TIntrusivePtr<TWaitResponse>& wr = k2->second; + wr->SetRequestSent(); + } + } + } + s.State = TOutRequestState::S_WAITING; + //printf("OutReq %s AnalyzeSendResults() S_SENDING -> S_WAITING\n", GetGuidAsString(reqId).c_str()); + s.TimePassed = 0; + } + break; + case TOutRequestState::S_CANCEL_AFTER_SENDING: + DoSendCancel(s.Address, reqId); + FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING"); + break; + case TOutRequestState::S_WAITING: + case TOutRequestState::S_WAITING_PING_SENT: + Y_ASSERT(0); + break; + case TOutRequestState::S_WAITING_PING_SENDING: + Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId); + if (!res.Success) { + FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING"); + //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> failed\n", GetGuidAsString(reqId).c_str()); + } else { + s.PingTransferId = -1; + s.State = TOutRequestState::S_WAITING_PING_SENT; + //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> S_WAITING_PING_SENT\n", GetGuidAsString(reqId).c_str()); + s.TimePassed = 0; + } + break; + default: + Y_ASSERT(0); + break; + } + } + } break; + case DIR_IN: { + TInRequestHash::iterator i = InRequests.find(tp.Guid); + if (i != InRequests.end()) { + Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED); + InRequests.erase(i); + //if (res.Success) + // printf("InReq %s AnalyzeSendResults() ... -> finished\n", GetGuidAsString(tp.Guid).c_str()); + //else + // printf("InReq %s AnalyzeSendResults() ... -> failed response send\n", GetGuidAsString(tp.Guid).c_str()); + } + } break; + default: + Y_ASSERT(0); + break; + } + TransferHash.erase(k1); + } + } + } + void SendPingsIfNeeded() { + NHPTimer::STime tChk = PingsSendT; + float deltaT = (float)NHPTimer::GetTimePassed(&tChk); + if (deltaT < 0.05) { + return; + } + PingsSendT = tChk; + deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3); + + { + for (TOutRequestHash::iterator i = OutRequests.begin(); i != OutRequests.end();) { + TOutRequestHash::iterator curIt = i++; + TOutRequestState& s = curIt->second; + const TGUID& guid = curIt->first; + switch (s.State) { + case TOutRequestState::S_WAITING: + s.TimePassed += deltaT; + if (s.TimePassed > HTTP_TIMEOUT) { + TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket; + ms->Write((char)PKT_PING); + ms->Write(guid); + int transId = SendWithHighPriority(s.Address, ms.Release()); + TransferHash[transId] = TTransferPurpose(DIR_OUT, guid); + s.State = TOutRequestState::S_WAITING_PING_SENDING; + //printf("OutReq %s SendPingsIfNeeded() S_WAITING -> S_WAITING_PING_SENDING\n", GetGuidAsString(guid).c_str()); + s.PingTransferId = transId; + } + break; + case TOutRequestState::S_WAITING_PING_SENT: + s.TimePassed += deltaT; + if (s.TimePassed > HTTP_TIMEOUT) { + //printf("OutReq %s SendPingsIfNeeded() S_WAITING_PING_SENT -> failed\n", GetGuidAsString(guid).c_str()); + FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT"); + } + break; + default: + break; + } + } + } + } + void Step() { + { + TGuard<TSpinLock> lock(cs); + DoSends(); + } + Host->Step(); + for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) { + switch (req->Req) { + case TStatsRequest::PENDING_SIZE: + Host->GetPendingDataSize(&req->PendingDataSize); + break; + case TStatsRequest::DEBUG_INFO: { + TGuard<TSpinLock> lock(cs); + req->DebugInfo = GetDebugInfoLocked(); + } break; + case TStatsRequest::HAS_IN_REQUEST: { + TGuard<TSpinLock> lock(cs); + req->RequestFound = (InRequests.find(req->RequestId) != InRequests.end()); + } break; + case TStatsRequest::GET_PEER_ADDRESS: { + TGuard<TSpinLock> lock(cs); + TInRequestHash::const_iterator i = InRequests.find(req->RequestId); + if (i != InRequests.end()) { + req->PeerAddress = i->second.Address; + } else { + TOutRequestHash::const_iterator o = OutRequests.find(req->RequestId); + if (o != OutRequests.end()) { + req->PeerAddress = o->second.Address; + } else { + req->PeerAddress = TUdpAddress(); + } + } + } break; + case TStatsRequest::GET_PEER_QUEUE_STATS: + req->QueueStats = Host->GetQueueStats(req->PeerAddress); + break; + default: + Y_ASSERT(0); + break; + } + req->Complete.Signal(); + } + { + TGuard<TSpinLock> lock(cs); + DoSends(); + ProcessIncomingPackets(); + AnalyzeSendResults(); + SendPingsIfNeeded(); + } + } + void Wait() { + Host->Wait(0.1f); + } + void DoSendCancel(const TUdpAddress& addr, const TGUID& req) { + TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket; + ms->Write((char)PKT_CANCEL); + ms->Write(req); + SendWithHighPriority(addr, ms); + } + void DoSends() { + { + TBreakRequest rb; + while (BreakReqList.Dequeue(&rb)) { + InRequests.erase(rb.ReqGuid); + } + } + { + // cancelling requests + TCancelRequest rc; + while (CancelReqList.Dequeue(&rc)) { + TOutRequestHash::iterator i = OutRequests.find(rc.ReqGuid); + if (i == OutRequests.end()) { + AnticipateCancels.insert(rc.ReqGuid); + continue; // cancelling non existing request is ok + } + TOutRequestState& s = i->second; + if (s.State == TOutRequestState::S_SENDING) { + // we are in trouble - have not sent request and we already have to cancel it, wait send + s.State = TOutRequestState::S_CANCEL_AFTER_SENDING; + } else { + DoSendCancel(s.Address, rc.ReqGuid); + FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side"); + } + } + } + { + // sending replies + for (TSendResponse* rd = nullptr; SendRespList.Dequeue(&rd); delete rd) { + TInRequestHash::iterator i = InRequests.find(rd->ReqGuid); + if (i == InRequests.end()) { + Y_ASSERT(0); + continue; + } + TInRequestState& s = i->second; + if (s.State == TInRequestState::S_CANCELED) { + // need not send response for the canceled request + InRequests.erase(i); + continue; + } + + Y_ASSERT(s.State == TInRequestState::S_WAITING); + s.State = TInRequestState::S_RESPONSE_SENDING; + //printf("InReq %s SendResponse() ... -> S_RESPONSE_SENDING (pkt %s)\n", GetGuidAsString(reqId).c_str(), GetGuidAsString(lowPktGuid).c_str()); + + TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket; + ui32 crc32 = 0; + int dataSize = rd->Data.ysize(); + if (rd->Data.ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(s.Address)) { + TIntrusivePtr<TSharedMemory> shm = new TSharedMemory; + if (shm->Create(dataSize)) { + ms->Write((char)PKT_LOCAL_RESPONSE); + ms->Write(rd->ReqGuid); + memcpy(shm->GetPtr(), &rd->Data[0], dataSize); + TVector<char> empty; + rd->Data.swap(empty); + ms->AttachSharedData(shm); + crc32 = CalcChecksum(ms->GetChain()); + } + } + if (ms->GetSharedData() == nullptr) { + ms->Write((char)PKT_RESPONSE); + ms->Write(rd->ReqGuid); + + // to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32 + // this means that we are calculating crc when shared memory is used + // it is hard to avoid since in SendResponse() we don't know if shared mem will be used (peer address is not available there) + TIncrementalChecksumCalcer csCalcer; + AddChain(&csCalcer, ms->GetChain()); + // here we are replicating the way WriteDestructive serializes data + csCalcer.AddBlock(&dataSize, sizeof(dataSize)); + csCalcer.AddBlockSum(rd->DataCrc32, dataSize); + crc32 = csCalcer.CalcChecksum(); + + ms->WriteDestructive(&rd->Data); + //ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses + //Y_ASSERT(chkCrc == crc32); + } + + int transId = Host->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority); + TransferHash[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid); + } + } + { + // sending requests + for (TSendRequest* rd = nullptr; SendReqList.Dequeue(&rd); delete rd) { + Y_ASSERT(OutRequests.find(rd->ReqGuid) == OutRequests.end()); + + { + TOutRequestState& s = OutRequests[rd->ReqGuid]; + s.State = TOutRequestState::S_SENDING; + s.Address = rd->Addr; + s.UserQueues = rd->UserQueues; + //printf("OutReq %s SendRequest() ... -> S_SENDING\n", GetGuidAsString(guid).c_str()); + } + + if (rd->WR.Get()) + SyncRequests[rd->ReqGuid] = rd->WR; + + if (AnticipateCancels.find(rd->ReqGuid) != AnticipateCancels.end()) { + FinishRequest(OutRequests.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "request canceled before transmitting"); + } else { + TGUID pktGuid = rd->ReqGuid; // request packet id should match request id + int transId = Host->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL); + TransferHash[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid); + } + } + } + if (!AnticipateCancels.empty()) { + AnticipateCancels.clear(); + } + } + + public: + void SendRequestImpl(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId, + TWaitResponse* wr, TRequesterUserQueues* userQueues) { + if (data && data->size() > MAX_PACKET_SIZE) { + Y_VERIFY(0, "data size is too large"); + } + //printf("SendRequest(%s)\n", url.c_str()); + if (wr) + wr->SetReqId(reqId); + + TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket; + if (data && data->ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(addr)) { + int dataSize = data->ysize(); + TIntrusivePtr<TSharedMemory> shm = new TSharedMemory; + if (shm->Create(dataSize)) { + ms->Write((char)PKT_LOCAL_REQUEST); + ms->WriteStroka(url); + memcpy(shm->GetPtr(), &(*data)[0], dataSize); + TVector<char> empty; + data->swap(empty); + ms->AttachSharedData(shm); + } + } + if (ms->GetSharedData() == nullptr) { + ms->Write((char)PKT_REQUEST); + ms->WriteStroka(url); + ms->WriteDestructive(data); + } + + SendReqList.Enqueue(new TSendRequest(addr, &ms, reqId, wr, userQueues)); + Host->CancelWait(); + } + + void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override { + SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get()); + } + void CancelRequest(const TGUID& reqId) override { + CancelReqList.Enqueue(TCancelRequest(reqId)); + Host->CancelWait(); + } + void BreakRequest(const TGUID& reqId) override { + BreakReqList.Enqueue(TBreakRequest(reqId)); + Host->CancelWait(); + } + + void SendResponseImpl(const TGUID& reqId, EPacketPriority prior, TVector<char>* data) // non-virtual, for direct call from TRequestOps + { + if (data && data->size() > MAX_PACKET_SIZE) { + Y_VERIFY(0, "data size is too large"); + } + SendRespList.Enqueue(new TSendResponse(reqId, prior, data)); + Host->CancelWait(); + } + void SendResponse(const TGUID& reqId, TVector<char>* data) override { + SendResponseImpl(reqId, PP_NORMAL, data); + } + void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override { + SendResponseImpl(reqId, PP_LOW, data); + } + TUdpHttpRequest* GetRequest() override { + return UserQueues->GetRequest(); + } + TUdpHttpResponse* GetResponse() override { + return UserQueues->GetResponse(); + } + bool GetRequestCancel(TGUID* req) override { + return UserQueues->GetRequestCancel(req); + } + bool GetSendRequestAcc(TGUID* req) override { + return UserQueues->GetSendRequestAcc(req); + } + TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override { + TIntrusivePtr<TWaitResponse> wr = WaitableRequest(addr, url, data); + wr->Wait(); + return wr->GetResponse(); + } + TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override { + TIntrusivePtr<TWaitResponse> wr = new TWaitResponse; + TGUID reqId; + CreateGuid(&reqId); + SendRequestImpl(addr, url, data, reqId, wr.Get(), nullptr); + return wr; + } + TMuxEvent& GetAsyncEvent() override { + return UserQueues->GetAsyncEvent(); + } + int GetPort() override { + return Socket.Get() ? Socket->GetPort() : 0; + } + void StopNoWait() override { + AbortTransactions = true; + KeepRunning = false; + UserQueues->AsyncSignal(); + // calcel all outgoing requests + TGuard<TSpinLock> lock(cs); + while (!OutRequests.empty()) { + // cancel without informing peer that we are cancelling the request + FinishRequest(OutRequests.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()"); + } + } + void ExecStatsRequest(TIntrusivePtr<TStatsRequest> req) { + StatsReqList.Enqueue(req); + Host->CancelWait(); + req->Complete.Wait(); + } + TUdpAddress GetPeerAddress(const TGUID& reqId) override { + TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_ADDRESS); + req->RequestId = reqId; + ExecStatsRequest(req); + return req->PeerAddress; + } + void GetPendingDataSize(TRequesterPendingDataStats* res) override { + TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::PENDING_SIZE); + ExecStatsRequest(req); + *res = req->PendingDataSize; + } + bool HasRequest(const TGUID& reqId) override { + TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::HAS_IN_REQUEST); + req->RequestId = reqId; + ExecStatsRequest(req); + return req->RequestFound; + } + + private: + void FinishOutstandingTransactions() { + // wait all pending requests, all new requests are canceled + while ((!OutRequests.empty() || !InRequests.empty() || !SendRespList.IsEmpty() || !SendReqList.IsEmpty()) && !PanicAttack) { + while (TUdpHttpRequest* req = GetRequest()) { + TInRequestHash::iterator i = InRequests.find(req->ReqId); + //printf("dropping request(%s) (thread %d)\n", req->Url.c_str(), ThreadId()); + delete req; + if (i == InRequests.end()) { + Y_ASSERT(0); + continue; + } + InRequests.erase(i); + } + Step(); + sleep(0); + } + } + static void* ExecServerThread(void* param) { + BindToSocket(0); + SetHighestThreadPriority(); + TUdpHttp* pThis = (TUdpHttp*)param; + pThis->Host = CreateUdpHost(pThis->Socket); + pThis->HasStarted.Signal(); + if (!pThis->Host) { + pThis->Socket.Drop(); + return nullptr; + } + NHPTimer::GetTime(&pThis->PingsSendT); + while (pThis->KeepRunning && !PanicAttack) { + if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) { + NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire); + double passed = NHPTimer::GetTimePassed(&chk); + if (passed > HeartbeatTimeout.load(std::memory_order_acquire)) { + StopAllNetLibaThreads(); + fprintf(stderr, "%s\tTUdpHttp\tWaiting for %0.2f, time limit %0.2f, commit a suicide!11\n", Now().ToStringUpToSeconds().c_str(), passed, HeartbeatTimeout.load(std::memory_order_acquire)); + fflush(stderr); +#ifndef _win_ + killpg(0, SIGKILL); +#endif + abort(); + break; + } + } + pThis->Step(); + pThis->Wait(); + } + if (!pThis->AbortTransactions && !PanicAttack) + pThis->FinishOutstandingTransactions(); + pThis->Host = nullptr; + return nullptr; + } + ~TUdpHttp() override { + if (myThread.Running()) { + KeepRunning = false; + myThread.Join(); + } + for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) { + req->Complete.Signal(); + } + } + + public: + TUdpHttp() + : myThread(TThread::TParams(ExecServerThread, (void*)this).SetName("nl6_udp_host")) + , KeepRunning(true) + , AbortTransactions(false) + , PingsSendT(0) + , ReportRequestCancel(false) + , ReportSendRequestAcc(false) + { + NHPTimer::GetTime(&PingsSendT); + QueueSizes = new TRequesterUserQueueSizes; + UserQueues = new TRequesterUserQueues(QueueSizes.Get()); + } + bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) { + Y_ASSERT(Host.Get() == nullptr); + Socket = socket; + myThread.Start(); + HasStarted.Wait(); + + if (Host.Get()) { + return true; + } + Socket.Drop(); + return false; + } + TString GetDebugInfoLocked() { + TString res = KeepRunning ? "State: running\n" : "State: stopping\n"; + res += Host->GetDebugInfo(); + + char buf[1000]; + TRequesterUserQueueSizes* qs = QueueSizes.Get(); + sprintf(buf, "\nRequest queue %d (%d bytes)\n", (int)AtomicGet(qs->ReqCount), (int)AtomicGet(qs->ReqQueueSize)); + res += buf; + sprintf(buf, "Response queue %d (%d bytes)\n", (int)AtomicGet(qs->RespCount), (int)AtomicGet(qs->RespQueueSize)); + res += buf; + + const char* outReqStateNames[] = { + "S_SENDING", + "S_WAITING", + "S_WAITING_PING_SENDING", + "S_WAITING_PING_SENT", + "S_CANCEL_AFTER_SENDING"}; + const char* inReqStateNames[] = { + "S_WAITING", + "S_RESPONSE_SENDING", + "S_CANCELED"}; + res += "\nOut requests:\n"; + for (TOutRequestHash::const_iterator i = OutRequests.begin(); i != OutRequests.end(); ++i) { + const TGUID& gg = i->first; + const TOutRequestState& s = i->second; + bool isSync = SyncRequests.find(gg) != SyncRequests.end(); + sprintf(buf, "%s\t%s %s TimePassed: %g %s\n", + GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), outReqStateNames[s.State], + s.TimePassed * 1000, + isSync ? "isSync" : ""); + res += buf; + } + res += "\nIn requests:\n"; + for (TInRequestHash::const_iterator i = InRequests.begin(); i != InRequests.end(); ++i) { + const TGUID& gg = i->first; + const TInRequestState& s = i->second; + sprintf(buf, "%s\t%s %s\n", + GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), inReqStateNames[s.State]); + res += buf; + } + return res; + } + TString GetDebugInfo() override { + TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::DEBUG_INFO); + ExecStatsRequest(req); + return req->DebugInfo; + } + void GetRequestQueueSize(TRequesterQueueStats* res) override { + TRequesterUserQueueSizes* qs = QueueSizes.Get(); + res->ReqCount = (int)AtomicGet(qs->ReqCount); + res->RespCount = (int)AtomicGet(qs->RespCount); + res->ReqQueueSize = (int)AtomicGet(qs->ReqQueueSize); + res->RespQueueSize = (int)AtomicGet(qs->RespQueueSize); + } + TRequesterUserQueueSizes* GetQueueSizes() const { + return QueueSizes.Get(); + } + IRequestOps* CreateSubRequester() override; + void EnableReportRequestCancel() override { + ReportRequestCancel = true; + } + void EnableReportSendRequestAcc() override { + ReportSendRequestAcc = true; + } + TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override { + TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_QUEUE_STATS); + req->PeerAddress = addr; + ExecStatsRequest(req); + return req->QueueStats; + } + }; + + ////////////////////////////////////////////////////////////////////////// + static void ReadShm(TSharedMemory* shm, TVector<char>* data) { + Y_ASSERT(shm); + int dataSize = shm->GetSize(); + data->yresize(dataSize); + memcpy(&(*data)[0], shm->GetPtr(), dataSize); + } + + static void LoadRequestData(TUdpHttpRequest* res) { + if (!res) + return; + { + TBlockChainIterator reqData(res->DataHolder->Data->GetChain()); + char pktType; + reqData.Read(&pktType, 1); + ReadArr(&reqData, &res->Url); + if (pktType == PKT_REQUEST) { + ReadYArr(&reqData, &res->Data); + } else if (pktType == PKT_LOCAL_REQUEST) { + ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data); + } else + Y_ASSERT(0); + if (reqData.HasFailed()) { + Y_ASSERT(0 && "wrong format, memory corruption suspected"); + res->Url = ""; + res->Data.clear(); + } + } + res->DataHolder.Reset(nullptr); + } + + static void LoadResponseData(TUdpHttpResponse* res) { + if (!res || res->DataHolder.Get() == nullptr) + return; + { + TBlockChainIterator reqData(res->DataHolder->Data->GetChain()); + char pktType; + reqData.Read(&pktType, 1); + TGUID guid; + reqData.Read(&guid, sizeof(guid)); + Y_ASSERT(res->ReqId == guid); + if (pktType == PKT_RESPONSE) { + ReadYArr(&reqData, &res->Data); + } else if (pktType == PKT_LOCAL_RESPONSE) { + ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data); + } else + Y_ASSERT(0); + if (reqData.HasFailed()) { + Y_ASSERT(0 && "wrong format, memory corruption suspected"); + res->Ok = TUdpHttpResponse::FAILED; + res->Data.clear(); + res->Error = "wrong response format"; + } + } + res->DataHolder.Reset(nullptr); + } + + ////////////////////////////////////////////////////////////////////////// + // IRequestOps::TWaitResponse + TUdpHttpResponse* IRequestOps::TWaitResponse::GetResponse() { + if (!Response) + return nullptr; + TUdpHttpResponse* res = Response; + Response = nullptr; + LoadResponseData(res); + return res; + } + + void IRequestOps::TWaitResponse::SetResponse(TUdpHttpResponse* r) { + Y_ASSERT(Response == nullptr || r == nullptr); + if (r) + Response = r; + CompleteEvent.Signal(); + } + + ////////////////////////////////////////////////////////////////////////// + // TRequesterUserQueues + TUdpHttpRequest* TRequesterUserQueues::GetRequest() { + TUdpHttpRequest* res = nullptr; + ReqList.Dequeue(&res); + if (res) { + AtomicAdd(QueueSizes->ReqCount, -1); + AtomicAdd(QueueSizes->ReqQueueSize, -GetPacketSize(res->DataHolder.Get())); + } + UpdateAsyncSignalState(); + LoadRequestData(res); + return res; + } + + TUdpHttpResponse* TRequesterUserQueues::GetResponse() { + TUdpHttpResponse* res = nullptr; + ResponseList.Dequeue(&res); + if (res) { + AtomicAdd(QueueSizes->RespCount, -1); + AtomicAdd(QueueSizes->RespQueueSize, -GetPacketSize(res->DataHolder.Get())); + } + UpdateAsyncSignalState(); + LoadResponseData(res); + return res; + } + + ////////////////////////////////////////////////////////////////////////// + class TRequestOps: public IRequestOps { + TIntrusivePtr<TUdpHttp> Requester; + TIntrusivePtr<TRequesterUserQueues> UserQueues; + + public: + TRequestOps(TUdpHttp* req) + : Requester(req) + { + UserQueues = new TRequesterUserQueues(req->GetQueueSizes()); + } + void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override { + Requester->SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get()); + } + void CancelRequest(const TGUID& reqId) override { + Requester->CancelRequest(reqId); + } + void BreakRequest(const TGUID& reqId) override { + Requester->BreakRequest(reqId); + } + + void SendResponse(const TGUID& reqId, TVector<char>* data) override { + Requester->SendResponseImpl(reqId, PP_NORMAL, data); + } + void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override { + Requester->SendResponseImpl(reqId, PP_LOW, data); + } + TUdpHttpRequest* GetRequest() override { + Y_ASSERT(0); + //return UserQueues.GetRequest(); + return nullptr; // all requests are routed to the main requester + } + TUdpHttpResponse* GetResponse() override { + return UserQueues->GetResponse(); + } + bool GetRequestCancel(TGUID*) override { + Y_ASSERT(0); + return false; // all request cancels are routed to the main requester + } + bool GetSendRequestAcc(TGUID* req) override { + return UserQueues->GetSendRequestAcc(req); + } + // sync mode + TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override { + return Requester->Request(addr, url, data); + } + TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override { + return Requester->WaitableRequest(addr, url, data); + } + // + TMuxEvent& GetAsyncEvent() override { + return UserQueues->GetAsyncEvent(); + } + }; + + IRequestOps* TUdpHttp::CreateSubRequester() { + return new TRequestOps(this); + } + + ////////////////////////////////////////////////////////////////////////// + void AbortOnFailedRequest(TUdpHttpResponse* answer) { + if (answer && answer->Ok == TUdpHttpResponse::FAILED) { + fprintf(stderr, "Failed request to host %s\n", GetAddressAsString(answer->PeerAddress).data()); + fprintf(stderr, "Error description: %s\n", answer->Error.data()); + fflush(nullptr); + Y_ASSERT(0); + abort(); + } + } + + TString GetDebugInfo(const TUdpAddress& addr, double timeout) { + NHPTimer::STime start; + NHPTimer::GetTime(&start); + TIntrusivePtr<IUdpHost> host = CreateUdpHost(0); + { + TAutoPtr<TRopeDataPacket> rq = new TRopeDataPacket; + rq->Write((char)PKT_GETDEBUGINFO); + ui32 crc32 = CalcChecksum(rq->GetChain()); + host->Send(addr, rq.Release(), crc32, nullptr, PP_HIGH); + } + for (;;) { + TAutoPtr<TRequest> ptr = host->GetRequest(); + if (ptr.Get()) { + TBlockChainIterator reqData(ptr->Data->GetChain()); + int sz = reqData.GetSize(); + TString res; + res.resize(sz); + reqData.Read(res.begin(), sz); + return res; + } + host->Step(); + host->Wait(0.1f); + + NHPTimer::STime now; + NHPTimer::GetTime(&now); + if (NHPTimer::GetSeconds(now - start) > timeout) { + return TString(); + } + } + } + + void Kill(const TUdpAddress& addr) { + TIntrusivePtr<IUdpHost> host = CreateUdpHost(0); + host->Kill(addr); + } + + void StopAllNetLibaThreads() { + PanicAttack = true; // AAAA!!!! + } + + void SetNetLibaHeartbeatTimeout(double timeoutSec) { + NetLibaHeartbeat(); + HeartbeatTimeout.store(timeoutSec, std::memory_order_release); + } + + void NetLibaHeartbeat() { + NHPTimer::STime now; + NHPTimer::GetTime(&now); + LastHeartbeat.store(now, std::memory_order_release); + } + + IRequester* CreateHttpUdpRequester(int port) { + if (PanicAttack) + return nullptr; + + TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket(); + socket->Open(port); + if (!socket->IsValid()) + return nullptr; + + return CreateHttpUdpRequester(socket); + } + + IRequester* CreateHttpUdpRequester(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) { + if (PanicAttack) + return nullptr; + + TIntrusivePtr<TUdpHttp> res(new TUdpHttp); + if (!res->Start(socket)) + return nullptr; + return res.Release(); + } + +} |