aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/v6/udp_http.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/udp_http.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/udp_http.cpp')
-rw-r--r--library/cpp/netliba/v6/udp_http.cpp1354
1 files changed, 1354 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/udp_http.cpp b/library/cpp/netliba/v6/udp_http.cpp
new file mode 100644
index 0000000000..9fa0b07818
--- /dev/null
+++ b/library/cpp/netliba/v6/udp_http.cpp
@@ -0,0 +1,1354 @@
+#include "stdafx.h"
+#include "udp_http.h"
+#include "udp_client_server.h"
+#include "udp_socket.h"
+#include "cpu_affinity.h"
+
+#include <library/cpp/threading/atomic/bool.h>
+
+#include <util/system/hp_timer.h>
+#include <util/thread/lfqueue.h>
+#include <util/system/thread.h>
+#include <util/system/spinlock.h>
+#if !defined(_win_)
+#include <signal.h>
+#include <pthread.h>
+#endif
+#include "block_chain.h"
+#include <util/system/shmat.h>
+
+#include <atomic>
+
+namespace NNetliba {
+ const float HTTP_TIMEOUT = 15.0f;
+ const int MIN_SHARED_MEM_PACKET = 1000;
+
+ static ::NAtomic::TBool PanicAttack;
+ static std::atomic<NHPTimer::STime> LastHeartbeat;
+ static std::atomic<double> HeartbeatTimeout;
+
+ static int GetPacketSize(TRequest* req) {
+ if (req && req->Data.Get())
+ return req->Data->GetSize();
+ return 0;
+ }
+
+ static bool IsLocalFast(const TUdpAddress& addr) {
+ if (addr.IsIPv4()) {
+ return IsLocalIPv4(addr.GetIPv4());
+ } else {
+ return IsLocalIPv6(addr.Network, addr.Interface);
+ }
+ }
+
+ bool IsLocal(const TUdpAddress& addr) {
+ InitLocalIPList();
+ return IsLocalFast(addr);
+ }
+
+ TUdpHttpRequest::~TUdpHttpRequest() {
+ }
+
+ TUdpHttpResponse::~TUdpHttpResponse() {
+ }
+
+ class TRequesterUserQueueSizes: public TThrRefBase {
+ public:
+ TAtomic ReqCount, RespCount;
+ TAtomic ReqQueueSize, RespQueueSize;
+
+ TRequesterUserQueueSizes()
+ : ReqCount(0)
+ , RespCount(0)
+ , ReqQueueSize(0)
+ , RespQueueSize(0)
+ {
+ }
+ };
+
+ template <class T>
+ void EraseList(TLockFreeQueue<T*>* data) {
+ T* ptr = nullptr;
+ while (data->Dequeue(&ptr)) {
+ delete ptr;
+ }
+ }
+
+ class TRequesterUserQueues: public TThrRefBase {
+ TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes;
+ TLockFreeQueue<TUdpHttpRequest*> ReqList;
+ TLockFreeQueue<TUdpHttpResponse*> ResponseList;
+ TLockFreeStack<TGUID> CancelList, SendRequestAccList; // any order will do
+ TMuxEvent AsyncEvent;
+
+ void UpdateAsyncSignalState() {
+ // not sure about this one. Idea is that AsyncEvent.Reset() is a memory barrier
+ if (ReqList.IsEmpty() && ResponseList.IsEmpty() && CancelList.IsEmpty() && SendRequestAccList.IsEmpty()) {
+ AsyncEvent.Reset();
+ if (!ReqList.IsEmpty() || !ResponseList.IsEmpty() || !CancelList.IsEmpty() || !SendRequestAccList.IsEmpty())
+ AsyncEvent.Signal();
+ }
+ }
+ ~TRequesterUserQueues() override {
+ EraseList(&ReqList);
+ EraseList(&ResponseList);
+ }
+
+ public:
+ TRequesterUserQueues(TRequesterUserQueueSizes* queueSizes)
+ : QueueSizes(queueSizes)
+ {
+ }
+ TUdpHttpRequest* GetRequest();
+ TUdpHttpResponse* GetResponse();
+ bool GetRequestCancel(TGUID* req) {
+ bool res = CancelList.Dequeue(req);
+ UpdateAsyncSignalState();
+ return res;
+ }
+ bool GetSendRequestAcc(TGUID* req) {
+ bool res = SendRequestAccList.Dequeue(req);
+ UpdateAsyncSignalState();
+ return res;
+ }
+
+ void AddRequest(TUdpHttpRequest* res) {
+ AtomicAdd(QueueSizes->ReqCount, 1);
+ AtomicAdd(QueueSizes->ReqQueueSize, GetPacketSize(res->DataHolder.Get()));
+ ReqList.Enqueue(res);
+ AsyncEvent.Signal();
+ }
+ void AddResponse(TUdpHttpResponse* res) {
+ AtomicAdd(QueueSizes->RespCount, 1);
+ AtomicAdd(QueueSizes->RespQueueSize, GetPacketSize(res->DataHolder.Get()));
+ ResponseList.Enqueue(res);
+ AsyncEvent.Signal();
+ }
+ void AddCancel(const TGUID& req) {
+ CancelList.Enqueue(req);
+ AsyncEvent.Signal();
+ }
+ void AddSendRequestAcc(const TGUID& req) {
+ SendRequestAccList.Enqueue(req);
+ AsyncEvent.Signal();
+ }
+ TMuxEvent& GetAsyncEvent() {
+ return AsyncEvent;
+ }
+ void AsyncSignal() {
+ AsyncEvent.Signal();
+ }
+ };
+
+ struct TOutRequestState {
+ enum EState {
+ S_SENDING,
+ S_WAITING,
+ S_WAITING_PING_SENDING,
+ S_WAITING_PING_SENT,
+ S_CANCEL_AFTER_SENDING
+ };
+ EState State;
+ TUdpAddress Address;
+ double TimePassed;
+ int PingTransferId;
+ TIntrusivePtr<TRequesterUserQueues> UserQueues;
+
+ TOutRequestState()
+ : State(S_SENDING)
+ , TimePassed(0)
+ , PingTransferId(-1)
+ {
+ }
+ };
+
+ struct TInRequestState {
+ enum EState {
+ S_WAITING,
+ S_RESPONSE_SENDING,
+ S_CANCELED,
+ };
+ EState State;
+ TUdpAddress Address;
+
+ TInRequestState()
+ : State(S_WAITING)
+ {
+ }
+ TInRequestState(const TUdpAddress& address)
+ : State(S_WAITING)
+ , Address(address)
+ {
+ }
+ };
+
+ enum EHttpPacket {
+ PKT_REQUEST,
+ PKT_PING,
+ PKT_PING_RESPONSE,
+ PKT_RESPONSE,
+ PKT_GETDEBUGINFO,
+ PKT_LOCAL_REQUEST,
+ PKT_LOCAL_RESPONSE,
+ PKT_CANCEL,
+ };
+
+ class TUdpHttp: public IRequester {
+ enum EDir {
+ DIR_OUT,
+ DIR_IN
+ };
+ struct TTransferPurpose {
+ EDir Dir;
+ TGUID Guid;
+ TTransferPurpose()
+ : Dir(DIR_OUT)
+ {
+ }
+ TTransferPurpose(EDir dir, TGUID guid)
+ : Dir(dir)
+ , Guid(guid)
+ {
+ }
+ };
+
+ struct TSendRequest {
+ TUdpAddress Addr;
+ TAutoPtr<TRopeDataPacket> Data;
+ TGUID ReqGuid;
+ TIntrusivePtr<TWaitResponse> WR;
+ TIntrusivePtr<TRequesterUserQueues> UserQueues;
+ ui32 Crc32;
+
+ TSendRequest()
+ : Crc32(0)
+ {
+ }
+ TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqguid, TWaitResponse* wr, TRequesterUserQueues* userQueues)
+ : Addr(addr)
+ , Data(*data)
+ , ReqGuid(reqguid)
+ , WR(wr)
+ , UserQueues(userQueues)
+ , Crc32(CalcChecksum(Data->GetChain()))
+ {
+ }
+ };
+ struct TSendResponse {
+ TVector<char> Data;
+ TGUID ReqGuid;
+ ui32 DataCrc32;
+ EPacketPriority Priority;
+
+ TSendResponse()
+ : DataCrc32(0)
+ , Priority(PP_NORMAL)
+ {
+ }
+ TSendResponse(const TGUID& reqguid, EPacketPriority prior, TVector<char>* data)
+ : ReqGuid(reqguid)
+ , DataCrc32(0)
+ , Priority(prior)
+ {
+ if (data && !data->empty()) {
+ data->swap(Data);
+ DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize());
+ }
+ }
+ };
+ struct TCancelRequest {
+ TGUID ReqGuid;
+
+ TCancelRequest() = default;
+ TCancelRequest(const TGUID& reqguid)
+ : ReqGuid(reqguid)
+ {
+ }
+ };
+ struct TBreakRequest {
+ TGUID ReqGuid;
+
+ TBreakRequest() = default;
+ TBreakRequest(const TGUID& reqguid)
+ : ReqGuid(reqguid)
+ {
+ }
+ };
+
+ TThread myThread;
+ bool KeepRunning, AbortTransactions;
+ TSpinLock cs;
+ TSystemEvent HasStarted;
+
+ NHPTimer::STime PingsSendT;
+
+ TIntrusivePtr<IUdpHost> Host;
+ TIntrusivePtr<NNetlibaSocket::ISocket> Socket;
+ typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash;
+ typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash;
+ TOutRequestHash OutRequests;
+ TInRequestHash InRequests;
+
+ typedef THashMap<int, TTransferPurpose> TTransferHash;
+ TTransferHash TransferHash;
+
+ typedef THashMap<TGUID, TIntrusivePtr<TWaitResponse>, TGUIDHash> TSyncRequests;
+ TSyncRequests SyncRequests;
+
+ // hold it here to not construct on every DoSends()
+ typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels;
+ TAnticipateCancels AnticipateCancels;
+
+ TLockFreeQueue<TSendRequest*> SendReqList;
+ TLockFreeQueue<TSendResponse*> SendRespList;
+ TLockFreeQueue<TCancelRequest> CancelReqList;
+ TLockFreeQueue<TBreakRequest> BreakReqList;
+
+ TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes;
+ TIntrusivePtr<TRequesterUserQueues> UserQueues;
+
+ struct TStatsRequest: public TThrRefBase {
+ enum EReq {
+ PENDING_SIZE,
+ DEBUG_INFO,
+ HAS_IN_REQUEST,
+ GET_PEER_ADDRESS,
+ GET_PEER_QUEUE_STATS,
+ };
+ EReq Req;
+ TRequesterPendingDataStats PendingDataSize;
+ TString DebugInfo;
+ TGUID RequestId;
+ TUdpAddress PeerAddress;
+ TIntrusivePtr<IPeerQueueStats> QueueStats;
+ bool RequestFound;
+ TSystemEvent Complete;
+
+ TStatsRequest(EReq req)
+ : Req(req)
+ , RequestFound(false)
+ {
+ }
+ };
+ TLockFreeQueue<TIntrusivePtr<TStatsRequest>> StatsReqList;
+
+ bool ReportRequestCancel;
+ bool ReportSendRequestAcc;
+
+ void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TAutoPtr<TRequest> data, const char* error = nullptr) {
+ TOutRequestState& s = i->second;
+ TUdpHttpResponse* res = new TUdpHttpResponse;
+ res->DataHolder = data;
+ res->ReqId = i->first;
+ res->PeerAddress = s.Address;
+ res->Ok = ok;
+ if (ok == TUdpHttpResponse::FAILED)
+ res->Error = error ? error : "request failed";
+ else if (ok == TUdpHttpResponse::CANCELED)
+ res->Error = error ? error : "request cancelled";
+ TSyncRequests::iterator k = SyncRequests.find(res->ReqId);
+ if (k != SyncRequests.end()) {
+ TIntrusivePtr<TWaitResponse>& wr = k->second;
+ wr->SetResponse(res);
+ SyncRequests.erase(k);
+ } else {
+ s.UserQueues->AddResponse(res);
+ }
+
+ OutRequests.erase(i);
+ }
+ int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) {
+ ui32 crc32 = CalcChecksum(data->GetChain());
+ return Host->Send(addr, data.Release(), crc32, nullptr, PP_HIGH);
+ }
+ void ProcessIncomingPackets() {
+ TVector<TGUID, TCustomAllocator<TGUID>> failedRequests;
+ for (;;) {
+ TAutoPtr<TRequest> req = Host->GetRequest();
+ if (req.Get() == nullptr) {
+ if (!failedRequests.empty()) {
+ // we want to handle following sequence of events
+ // <- send ping
+ // -> send response over IB
+ // -> send ping response (no such request) over UDP
+ // Now if we are lucky enough we can get IB response waiting in the IB receive queue
+ // at the same time response sender will receive "send complete" from IB
+ // indeed, IB delivered message (but it was not parsed by ib_cs.cpp yet)
+ // so after receiving "send response complete" event resposne sender can legally response
+ // to pings with "no such request"
+ // but ping responses can be sent over UDP
+ // So we can run into situation with negative ping response in
+ // UDP receive queue and response waiting unprocessed in IB receive queue
+ // to check that there is no response in the IB queue we have to process IB queues
+ // so we call IBStep()
+ Host->IBStep();
+ req = Host->GetRequest();
+ if (req.Get() == nullptr) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ TBlockChainIterator reqData(req->Data->GetChain());
+ char pktType;
+ reqData.Read(&pktType, 1);
+ switch (pktType) {
+ case PKT_REQUEST:
+ case PKT_LOCAL_REQUEST: {
+ //printf("recv PKT_REQUEST or PKT_LOCAL_REQUEST\n");
+ TGUID reqId = req->Guid;
+ TInRequestHash::iterator z = InRequests.find(reqId);
+ if (z != InRequests.end()) {
+ // oops, this request already exists!
+ // might happen if request can be stored in single packet
+ // and this packet had source IP broken during transmission and managed to pass crc checks
+ // since we already reported wrong source address for this request to the user
+ // the best thing we can do is to stop the program to avoid further complications
+ // but we just report the accident to stderr
+ fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n",
+ GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(),
+ GetAddressAsString(req->Address).c_str());
+ } else {
+ InRequests[reqId] = TInRequestState(req->Address);
+
+ //printf("InReq %s PKT_REQUEST recv ... -> S_WAITING\n", GetGuidAsString(reqId).c_str());
+
+ TUdpHttpRequest* res = new TUdpHttpRequest;
+ res->ReqId = reqId;
+ res->PeerAddress = req->Address;
+ res->DataHolder = req;
+
+ UserQueues->AddRequest(res);
+ }
+ } break;
+ case PKT_PING: {
+ //printf("recv PKT_PING\n");
+ TGUID guid;
+ reqData.Read(&guid, sizeof(guid));
+ bool ok = InRequests.find(guid) != InRequests.end();
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ms->Write((char)PKT_PING_RESPONSE);
+ ms->Write(guid);
+ ms->Write(ok);
+ SendWithHighPriority(req->Address, ms.Release());
+ //printf("InReq %s PKT_PING recv Sending PKT_PING_RESPONSE\n", GetGuidAsString(guid).c_str());
+ //printf("got PKT_PING, responding %d\n", (int)ok);
+ } break;
+ case PKT_PING_RESPONSE: {
+ //printf("recv PKT_PING_RESPONSE\n");
+ TGUID guid;
+ bool ok;
+ reqData.Read(&guid, sizeof(guid));
+ reqData.Read(&ok, sizeof(ok));
+ TOutRequestHash::iterator i = OutRequests.find(guid);
+ if (i == OutRequests.end()) {
+ ; //Y_ASSERT(0); // actually possible with some packet orders
+ } else {
+ if (!ok) {
+ // can not delete request at this point
+ // since we can receive failed ping and response at the same moment
+ // consider sequence: client sends ping, server sends response
+ // and replies false to ping as reply is sent
+ // we can not receive failed ping_response earlier then response itself
+ // but we can receive them simultaneously
+ failedRequests.push_back(guid);
+ //printf("OutReq %s PKT_PING_RESPONSE recv no such query -> failed\n", GetGuidAsString(guid).c_str());
+ } else {
+ TOutRequestState& s = i->second;
+ switch (s.State) {
+ case TOutRequestState::S_WAITING_PING_SENDING: {
+ Y_ASSERT(s.PingTransferId >= 0);
+ TTransferHash::iterator k = TransferHash.find(s.PingTransferId);
+ if (k != TransferHash.end())
+ TransferHash.erase(k);
+ else
+ Y_ASSERT(0);
+ s.PingTransferId = -1;
+ s.TimePassed = 0;
+ s.State = TOutRequestState::S_WAITING;
+ //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENDING -> S_WAITING\n", GetGuidAsString(guid).c_str());
+ } break;
+ case TOutRequestState::S_WAITING_PING_SENT:
+ s.TimePassed = 0;
+ s.State = TOutRequestState::S_WAITING;
+ //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENT -> S_WAITING\n", GetGuidAsString(guid).c_str());
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ }
+ }
+ } break;
+ case PKT_RESPONSE:
+ case PKT_LOCAL_RESPONSE: {
+ //printf("recv PKT_RESPONSE or PKT_LOCAL_RESPONSE\n");
+ TGUID guid;
+ reqData.Read(&guid, sizeof(guid));
+ TOutRequestHash::iterator i = OutRequests.find(guid);
+ if (i == OutRequests.end()) {
+ ; //Y_ASSERT(0); // does happen
+ //printf("OutReq %s PKT_RESPONSE recv for non-existing req\n", GetGuidAsString(guid).c_str());
+ } else {
+ FinishRequest(i, TUdpHttpResponse::OK, req);
+ //printf("OutReq %s PKT_RESPONSE recv ... -> ok\n", GetGuidAsString(guid).c_str());
+ }
+ } break;
+ case PKT_CANCEL: {
+ //printf("recv PKT_CANCEL\n");
+ TGUID guid;
+ reqData.Read(&guid, sizeof(guid));
+ TInRequestHash::iterator i = InRequests.find(guid);
+ if (i == InRequests.end()) {
+ ; //Y_ASSERT(0); // may happen
+ //printf("InReq %s PKT_CANCEL recv for non-existing req\n", GetGuidAsString(guid).c_str());
+ } else {
+ TInRequestState& s = i->second;
+ if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel)
+ UserQueues->AddCancel(guid);
+ s.State = TInRequestState::S_CANCELED;
+ //printf("InReq %s PKT_CANCEL recv\n", GetGuidAsString(guid).c_str());
+ }
+ } break;
+ case PKT_GETDEBUGINFO: {
+ //printf("recv PKT_GETDEBUGINFO\n");
+ TString dbgInfo = GetDebugInfoLocked();
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ms->Write(dbgInfo.c_str(), (int)dbgInfo.size());
+ SendWithHighPriority(req->Address, ms);
+ } break;
+ default:
+ Y_ASSERT(0);
+ }
+ }
+ // cleanup failed requests
+ for (size_t k = 0; k < failedRequests.size(); ++k) {
+ const TGUID& guid = failedRequests[k];
+ TOutRequestHash::iterator i = OutRequests.find(guid);
+ if (i != OutRequests.end())
+ FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: recv no such query");
+ }
+ }
+ void AnalyzeSendResults() {
+ TSendResult res;
+ while (Host->GetSendResult(&res)) {
+ //printf("Send result received\n");
+ TTransferHash::iterator k1 = TransferHash.find(res.TransferId);
+ if (k1 != TransferHash.end()) {
+ const TTransferPurpose& tp = k1->second;
+ switch (tp.Dir) {
+ case DIR_OUT: {
+ TOutRequestHash::iterator i = OutRequests.find(tp.Guid);
+ if (i != OutRequests.end()) {
+ const TGUID& reqId = i->first;
+ TOutRequestState& s = i->second;
+ switch (s.State) {
+ case TOutRequestState::S_SENDING:
+ if (!res.Success) {
+ FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING");
+ //printf("OutReq %s AnalyzeSendResults() S_SENDING -> failed\n", GetGuidAsString(reqId).c_str());
+ } else {
+ if (ReportSendRequestAcc) {
+ if (s.UserQueues.Get()) {
+ s.UserQueues->AddSendRequestAcc(reqId);
+ } else {
+ // waitable request?
+ TSyncRequests::iterator k2 = SyncRequests.find(reqId);
+ if (k2 != SyncRequests.end()) {
+ TIntrusivePtr<TWaitResponse>& wr = k2->second;
+ wr->SetRequestSent();
+ }
+ }
+ }
+ s.State = TOutRequestState::S_WAITING;
+ //printf("OutReq %s AnalyzeSendResults() S_SENDING -> S_WAITING\n", GetGuidAsString(reqId).c_str());
+ s.TimePassed = 0;
+ }
+ break;
+ case TOutRequestState::S_CANCEL_AFTER_SENDING:
+ DoSendCancel(s.Address, reqId);
+ FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING");
+ break;
+ case TOutRequestState::S_WAITING:
+ case TOutRequestState::S_WAITING_PING_SENT:
+ Y_ASSERT(0);
+ break;
+ case TOutRequestState::S_WAITING_PING_SENDING:
+ Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId);
+ if (!res.Success) {
+ FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING");
+ //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> failed\n", GetGuidAsString(reqId).c_str());
+ } else {
+ s.PingTransferId = -1;
+ s.State = TOutRequestState::S_WAITING_PING_SENT;
+ //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> S_WAITING_PING_SENT\n", GetGuidAsString(reqId).c_str());
+ s.TimePassed = 0;
+ }
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ }
+ } break;
+ case DIR_IN: {
+ TInRequestHash::iterator i = InRequests.find(tp.Guid);
+ if (i != InRequests.end()) {
+ Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED);
+ InRequests.erase(i);
+ //if (res.Success)
+ // printf("InReq %s AnalyzeSendResults() ... -> finished\n", GetGuidAsString(tp.Guid).c_str());
+ //else
+ // printf("InReq %s AnalyzeSendResults() ... -> failed response send\n", GetGuidAsString(tp.Guid).c_str());
+ }
+ } break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ TransferHash.erase(k1);
+ }
+ }
+ }
+ void SendPingsIfNeeded() {
+ NHPTimer::STime tChk = PingsSendT;
+ float deltaT = (float)NHPTimer::GetTimePassed(&tChk);
+ if (deltaT < 0.05) {
+ return;
+ }
+ PingsSendT = tChk;
+ deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3);
+
+ {
+ for (TOutRequestHash::iterator i = OutRequests.begin(); i != OutRequests.end();) {
+ TOutRequestHash::iterator curIt = i++;
+ TOutRequestState& s = curIt->second;
+ const TGUID& guid = curIt->first;
+ switch (s.State) {
+ case TOutRequestState::S_WAITING:
+ s.TimePassed += deltaT;
+ if (s.TimePassed > HTTP_TIMEOUT) {
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ms->Write((char)PKT_PING);
+ ms->Write(guid);
+ int transId = SendWithHighPriority(s.Address, ms.Release());
+ TransferHash[transId] = TTransferPurpose(DIR_OUT, guid);
+ s.State = TOutRequestState::S_WAITING_PING_SENDING;
+ //printf("OutReq %s SendPingsIfNeeded() S_WAITING -> S_WAITING_PING_SENDING\n", GetGuidAsString(guid).c_str());
+ s.PingTransferId = transId;
+ }
+ break;
+ case TOutRequestState::S_WAITING_PING_SENT:
+ s.TimePassed += deltaT;
+ if (s.TimePassed > HTTP_TIMEOUT) {
+ //printf("OutReq %s SendPingsIfNeeded() S_WAITING_PING_SENT -> failed\n", GetGuidAsString(guid).c_str());
+ FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT");
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+ void Step() {
+ {
+ TGuard<TSpinLock> lock(cs);
+ DoSends();
+ }
+ Host->Step();
+ for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) {
+ switch (req->Req) {
+ case TStatsRequest::PENDING_SIZE:
+ Host->GetPendingDataSize(&req->PendingDataSize);
+ break;
+ case TStatsRequest::DEBUG_INFO: {
+ TGuard<TSpinLock> lock(cs);
+ req->DebugInfo = GetDebugInfoLocked();
+ } break;
+ case TStatsRequest::HAS_IN_REQUEST: {
+ TGuard<TSpinLock> lock(cs);
+ req->RequestFound = (InRequests.find(req->RequestId) != InRequests.end());
+ } break;
+ case TStatsRequest::GET_PEER_ADDRESS: {
+ TGuard<TSpinLock> lock(cs);
+ TInRequestHash::const_iterator i = InRequests.find(req->RequestId);
+ if (i != InRequests.end()) {
+ req->PeerAddress = i->second.Address;
+ } else {
+ TOutRequestHash::const_iterator o = OutRequests.find(req->RequestId);
+ if (o != OutRequests.end()) {
+ req->PeerAddress = o->second.Address;
+ } else {
+ req->PeerAddress = TUdpAddress();
+ }
+ }
+ } break;
+ case TStatsRequest::GET_PEER_QUEUE_STATS:
+ req->QueueStats = Host->GetQueueStats(req->PeerAddress);
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ req->Complete.Signal();
+ }
+ {
+ TGuard<TSpinLock> lock(cs);
+ DoSends();
+ ProcessIncomingPackets();
+ AnalyzeSendResults();
+ SendPingsIfNeeded();
+ }
+ }
+ void Wait() {
+ Host->Wait(0.1f);
+ }
+ void DoSendCancel(const TUdpAddress& addr, const TGUID& req) {
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ms->Write((char)PKT_CANCEL);
+ ms->Write(req);
+ SendWithHighPriority(addr, ms);
+ }
+ void DoSends() {
+ {
+ TBreakRequest rb;
+ while (BreakReqList.Dequeue(&rb)) {
+ InRequests.erase(rb.ReqGuid);
+ }
+ }
+ {
+ // cancelling requests
+ TCancelRequest rc;
+ while (CancelReqList.Dequeue(&rc)) {
+ TOutRequestHash::iterator i = OutRequests.find(rc.ReqGuid);
+ if (i == OutRequests.end()) {
+ AnticipateCancels.insert(rc.ReqGuid);
+ continue; // cancelling non existing request is ok
+ }
+ TOutRequestState& s = i->second;
+ if (s.State == TOutRequestState::S_SENDING) {
+ // we are in trouble - have not sent request and we already have to cancel it, wait send
+ s.State = TOutRequestState::S_CANCEL_AFTER_SENDING;
+ } else {
+ DoSendCancel(s.Address, rc.ReqGuid);
+ FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side");
+ }
+ }
+ }
+ {
+ // sending replies
+ for (TSendResponse* rd = nullptr; SendRespList.Dequeue(&rd); delete rd) {
+ TInRequestHash::iterator i = InRequests.find(rd->ReqGuid);
+ if (i == InRequests.end()) {
+ Y_ASSERT(0);
+ continue;
+ }
+ TInRequestState& s = i->second;
+ if (s.State == TInRequestState::S_CANCELED) {
+ // need not send response for the canceled request
+ InRequests.erase(i);
+ continue;
+ }
+
+ Y_ASSERT(s.State == TInRequestState::S_WAITING);
+ s.State = TInRequestState::S_RESPONSE_SENDING;
+ //printf("InReq %s SendResponse() ... -> S_RESPONSE_SENDING (pkt %s)\n", GetGuidAsString(reqId).c_str(), GetGuidAsString(lowPktGuid).c_str());
+
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ui32 crc32 = 0;
+ int dataSize = rd->Data.ysize();
+ if (rd->Data.ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(s.Address)) {
+ TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
+ if (shm->Create(dataSize)) {
+ ms->Write((char)PKT_LOCAL_RESPONSE);
+ ms->Write(rd->ReqGuid);
+ memcpy(shm->GetPtr(), &rd->Data[0], dataSize);
+ TVector<char> empty;
+ rd->Data.swap(empty);
+ ms->AttachSharedData(shm);
+ crc32 = CalcChecksum(ms->GetChain());
+ }
+ }
+ if (ms->GetSharedData() == nullptr) {
+ ms->Write((char)PKT_RESPONSE);
+ ms->Write(rd->ReqGuid);
+
+ // to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32
+ // this means that we are calculating crc when shared memory is used
+ // it is hard to avoid since in SendResponse() we don't know if shared mem will be used (peer address is not available there)
+ TIncrementalChecksumCalcer csCalcer;
+ AddChain(&csCalcer, ms->GetChain());
+ // here we are replicating the way WriteDestructive serializes data
+ csCalcer.AddBlock(&dataSize, sizeof(dataSize));
+ csCalcer.AddBlockSum(rd->DataCrc32, dataSize);
+ crc32 = csCalcer.CalcChecksum();
+
+ ms->WriteDestructive(&rd->Data);
+ //ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses
+ //Y_ASSERT(chkCrc == crc32);
+ }
+
+ int transId = Host->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority);
+ TransferHash[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid);
+ }
+ }
+ {
+ // sending requests
+ for (TSendRequest* rd = nullptr; SendReqList.Dequeue(&rd); delete rd) {
+ Y_ASSERT(OutRequests.find(rd->ReqGuid) == OutRequests.end());
+
+ {
+ TOutRequestState& s = OutRequests[rd->ReqGuid];
+ s.State = TOutRequestState::S_SENDING;
+ s.Address = rd->Addr;
+ s.UserQueues = rd->UserQueues;
+ //printf("OutReq %s SendRequest() ... -> S_SENDING\n", GetGuidAsString(guid).c_str());
+ }
+
+ if (rd->WR.Get())
+ SyncRequests[rd->ReqGuid] = rd->WR;
+
+ if (AnticipateCancels.find(rd->ReqGuid) != AnticipateCancels.end()) {
+ FinishRequest(OutRequests.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "request canceled before transmitting");
+ } else {
+ TGUID pktGuid = rd->ReqGuid; // request packet id should match request id
+ int transId = Host->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL);
+ TransferHash[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid);
+ }
+ }
+ }
+ if (!AnticipateCancels.empty()) {
+ AnticipateCancels.clear();
+ }
+ }
+
+ public:
+ void SendRequestImpl(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId,
+ TWaitResponse* wr, TRequesterUserQueues* userQueues) {
+ if (data && data->size() > MAX_PACKET_SIZE) {
+ Y_VERIFY(0, "data size is too large");
+ }
+ //printf("SendRequest(%s)\n", url.c_str());
+ if (wr)
+ wr->SetReqId(reqId);
+
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ if (data && data->ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(addr)) {
+ int dataSize = data->ysize();
+ TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
+ if (shm->Create(dataSize)) {
+ ms->Write((char)PKT_LOCAL_REQUEST);
+ ms->WriteStroka(url);
+ memcpy(shm->GetPtr(), &(*data)[0], dataSize);
+ TVector<char> empty;
+ data->swap(empty);
+ ms->AttachSharedData(shm);
+ }
+ }
+ if (ms->GetSharedData() == nullptr) {
+ ms->Write((char)PKT_REQUEST);
+ ms->WriteStroka(url);
+ ms->WriteDestructive(data);
+ }
+
+ SendReqList.Enqueue(new TSendRequest(addr, &ms, reqId, wr, userQueues));
+ Host->CancelWait();
+ }
+
+ void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override {
+ SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get());
+ }
+ void CancelRequest(const TGUID& reqId) override {
+ CancelReqList.Enqueue(TCancelRequest(reqId));
+ Host->CancelWait();
+ }
+ void BreakRequest(const TGUID& reqId) override {
+ BreakReqList.Enqueue(TBreakRequest(reqId));
+ Host->CancelWait();
+ }
+
+ void SendResponseImpl(const TGUID& reqId, EPacketPriority prior, TVector<char>* data) // non-virtual, for direct call from TRequestOps
+ {
+ if (data && data->size() > MAX_PACKET_SIZE) {
+ Y_VERIFY(0, "data size is too large");
+ }
+ SendRespList.Enqueue(new TSendResponse(reqId, prior, data));
+ Host->CancelWait();
+ }
+ void SendResponse(const TGUID& reqId, TVector<char>* data) override {
+ SendResponseImpl(reqId, PP_NORMAL, data);
+ }
+ void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override {
+ SendResponseImpl(reqId, PP_LOW, data);
+ }
+ TUdpHttpRequest* GetRequest() override {
+ return UserQueues->GetRequest();
+ }
+ TUdpHttpResponse* GetResponse() override {
+ return UserQueues->GetResponse();
+ }
+ bool GetRequestCancel(TGUID* req) override {
+ return UserQueues->GetRequestCancel(req);
+ }
+ bool GetSendRequestAcc(TGUID* req) override {
+ return UserQueues->GetSendRequestAcc(req);
+ }
+ TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
+ TIntrusivePtr<TWaitResponse> wr = WaitableRequest(addr, url, data);
+ wr->Wait();
+ return wr->GetResponse();
+ }
+ TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
+ TIntrusivePtr<TWaitResponse> wr = new TWaitResponse;
+ TGUID reqId;
+ CreateGuid(&reqId);
+ SendRequestImpl(addr, url, data, reqId, wr.Get(), nullptr);
+ return wr;
+ }
+ TMuxEvent& GetAsyncEvent() override {
+ return UserQueues->GetAsyncEvent();
+ }
+ int GetPort() override {
+ return Socket.Get() ? Socket->GetPort() : 0;
+ }
+ void StopNoWait() override {
+ AbortTransactions = true;
+ KeepRunning = false;
+ UserQueues->AsyncSignal();
+ // calcel all outgoing requests
+ TGuard<TSpinLock> lock(cs);
+ while (!OutRequests.empty()) {
+ // cancel without informing peer that we are cancelling the request
+ FinishRequest(OutRequests.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()");
+ }
+ }
+ void ExecStatsRequest(TIntrusivePtr<TStatsRequest> req) {
+ StatsReqList.Enqueue(req);
+ Host->CancelWait();
+ req->Complete.Wait();
+ }
+ TUdpAddress GetPeerAddress(const TGUID& reqId) override {
+ TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_ADDRESS);
+ req->RequestId = reqId;
+ ExecStatsRequest(req);
+ return req->PeerAddress;
+ }
+ void GetPendingDataSize(TRequesterPendingDataStats* res) override {
+ TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::PENDING_SIZE);
+ ExecStatsRequest(req);
+ *res = req->PendingDataSize;
+ }
+ bool HasRequest(const TGUID& reqId) override {
+ TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::HAS_IN_REQUEST);
+ req->RequestId = reqId;
+ ExecStatsRequest(req);
+ return req->RequestFound;
+ }
+
+ private:
+ void FinishOutstandingTransactions() {
+ // wait all pending requests, all new requests are canceled
+ while ((!OutRequests.empty() || !InRequests.empty() || !SendRespList.IsEmpty() || !SendReqList.IsEmpty()) && !PanicAttack) {
+ while (TUdpHttpRequest* req = GetRequest()) {
+ TInRequestHash::iterator i = InRequests.find(req->ReqId);
+ //printf("dropping request(%s) (thread %d)\n", req->Url.c_str(), ThreadId());
+ delete req;
+ if (i == InRequests.end()) {
+ Y_ASSERT(0);
+ continue;
+ }
+ InRequests.erase(i);
+ }
+ Step();
+ sleep(0);
+ }
+ }
+ static void* ExecServerThread(void* param) {
+ BindToSocket(0);
+ SetHighestThreadPriority();
+ TUdpHttp* pThis = (TUdpHttp*)param;
+ pThis->Host = CreateUdpHost(pThis->Socket);
+ pThis->HasStarted.Signal();
+ if (!pThis->Host) {
+ pThis->Socket.Drop();
+ return nullptr;
+ }
+ NHPTimer::GetTime(&pThis->PingsSendT);
+ while (pThis->KeepRunning && !PanicAttack) {
+ if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) {
+ NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire);
+ double passed = NHPTimer::GetTimePassed(&chk);
+ if (passed > HeartbeatTimeout.load(std::memory_order_acquire)) {
+ StopAllNetLibaThreads();
+ fprintf(stderr, "%s\tTUdpHttp\tWaiting for %0.2f, time limit %0.2f, commit a suicide!11\n", Now().ToStringUpToSeconds().c_str(), passed, HeartbeatTimeout.load(std::memory_order_acquire));
+ fflush(stderr);
+#ifndef _win_
+ killpg(0, SIGKILL);
+#endif
+ abort();
+ break;
+ }
+ }
+ pThis->Step();
+ pThis->Wait();
+ }
+ if (!pThis->AbortTransactions && !PanicAttack)
+ pThis->FinishOutstandingTransactions();
+ pThis->Host = nullptr;
+ return nullptr;
+ }
+ ~TUdpHttp() override {
+ if (myThread.Running()) {
+ KeepRunning = false;
+ myThread.Join();
+ }
+ for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) {
+ req->Complete.Signal();
+ }
+ }
+
+ public:
+ TUdpHttp()
+ : myThread(TThread::TParams(ExecServerThread, (void*)this).SetName("nl6_udp_host"))
+ , KeepRunning(true)
+ , AbortTransactions(false)
+ , PingsSendT(0)
+ , ReportRequestCancel(false)
+ , ReportSendRequestAcc(false)
+ {
+ NHPTimer::GetTime(&PingsSendT);
+ QueueSizes = new TRequesterUserQueueSizes;
+ UserQueues = new TRequesterUserQueues(QueueSizes.Get());
+ }
+ bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
+ Y_ASSERT(Host.Get() == nullptr);
+ Socket = socket;
+ myThread.Start();
+ HasStarted.Wait();
+
+ if (Host.Get()) {
+ return true;
+ }
+ Socket.Drop();
+ return false;
+ }
+ TString GetDebugInfoLocked() {
+ TString res = KeepRunning ? "State: running\n" : "State: stopping\n";
+ res += Host->GetDebugInfo();
+
+ char buf[1000];
+ TRequesterUserQueueSizes* qs = QueueSizes.Get();
+ sprintf(buf, "\nRequest queue %d (%d bytes)\n", (int)AtomicGet(qs->ReqCount), (int)AtomicGet(qs->ReqQueueSize));
+ res += buf;
+ sprintf(buf, "Response queue %d (%d bytes)\n", (int)AtomicGet(qs->RespCount), (int)AtomicGet(qs->RespQueueSize));
+ res += buf;
+
+ const char* outReqStateNames[] = {
+ "S_SENDING",
+ "S_WAITING",
+ "S_WAITING_PING_SENDING",
+ "S_WAITING_PING_SENT",
+ "S_CANCEL_AFTER_SENDING"};
+ const char* inReqStateNames[] = {
+ "S_WAITING",
+ "S_RESPONSE_SENDING",
+ "S_CANCELED"};
+ res += "\nOut requests:\n";
+ for (TOutRequestHash::const_iterator i = OutRequests.begin(); i != OutRequests.end(); ++i) {
+ const TGUID& gg = i->first;
+ const TOutRequestState& s = i->second;
+ bool isSync = SyncRequests.find(gg) != SyncRequests.end();
+ sprintf(buf, "%s\t%s %s TimePassed: %g %s\n",
+ GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), outReqStateNames[s.State],
+ s.TimePassed * 1000,
+ isSync ? "isSync" : "");
+ res += buf;
+ }
+ res += "\nIn requests:\n";
+ for (TInRequestHash::const_iterator i = InRequests.begin(); i != InRequests.end(); ++i) {
+ const TGUID& gg = i->first;
+ const TInRequestState& s = i->second;
+ sprintf(buf, "%s\t%s %s\n",
+ GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), inReqStateNames[s.State]);
+ res += buf;
+ }
+ return res;
+ }
+ TString GetDebugInfo() override {
+ TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::DEBUG_INFO);
+ ExecStatsRequest(req);
+ return req->DebugInfo;
+ }
+ void GetRequestQueueSize(TRequesterQueueStats* res) override {
+ TRequesterUserQueueSizes* qs = QueueSizes.Get();
+ res->ReqCount = (int)AtomicGet(qs->ReqCount);
+ res->RespCount = (int)AtomicGet(qs->RespCount);
+ res->ReqQueueSize = (int)AtomicGet(qs->ReqQueueSize);
+ res->RespQueueSize = (int)AtomicGet(qs->RespQueueSize);
+ }
+ TRequesterUserQueueSizes* GetQueueSizes() const {
+ return QueueSizes.Get();
+ }
+ IRequestOps* CreateSubRequester() override;
+ void EnableReportRequestCancel() override {
+ ReportRequestCancel = true;
+ }
+ void EnableReportSendRequestAcc() override {
+ ReportSendRequestAcc = true;
+ }
+ TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override {
+ TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_QUEUE_STATS);
+ req->PeerAddress = addr;
+ ExecStatsRequest(req);
+ return req->QueueStats;
+ }
+ };
+
+ //////////////////////////////////////////////////////////////////////////
+ static void ReadShm(TSharedMemory* shm, TVector<char>* data) {
+ Y_ASSERT(shm);
+ int dataSize = shm->GetSize();
+ data->yresize(dataSize);
+ memcpy(&(*data)[0], shm->GetPtr(), dataSize);
+ }
+
+ static void LoadRequestData(TUdpHttpRequest* res) {
+ if (!res)
+ return;
+ {
+ TBlockChainIterator reqData(res->DataHolder->Data->GetChain());
+ char pktType;
+ reqData.Read(&pktType, 1);
+ ReadArr(&reqData, &res->Url);
+ if (pktType == PKT_REQUEST) {
+ ReadYArr(&reqData, &res->Data);
+ } else if (pktType == PKT_LOCAL_REQUEST) {
+ ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data);
+ } else
+ Y_ASSERT(0);
+ if (reqData.HasFailed()) {
+ Y_ASSERT(0 && "wrong format, memory corruption suspected");
+ res->Url = "";
+ res->Data.clear();
+ }
+ }
+ res->DataHolder.Reset(nullptr);
+ }
+
+ static void LoadResponseData(TUdpHttpResponse* res) {
+ if (!res || res->DataHolder.Get() == nullptr)
+ return;
+ {
+ TBlockChainIterator reqData(res->DataHolder->Data->GetChain());
+ char pktType;
+ reqData.Read(&pktType, 1);
+ TGUID guid;
+ reqData.Read(&guid, sizeof(guid));
+ Y_ASSERT(res->ReqId == guid);
+ if (pktType == PKT_RESPONSE) {
+ ReadYArr(&reqData, &res->Data);
+ } else if (pktType == PKT_LOCAL_RESPONSE) {
+ ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data);
+ } else
+ Y_ASSERT(0);
+ if (reqData.HasFailed()) {
+ Y_ASSERT(0 && "wrong format, memory corruption suspected");
+ res->Ok = TUdpHttpResponse::FAILED;
+ res->Data.clear();
+ res->Error = "wrong response format";
+ }
+ }
+ res->DataHolder.Reset(nullptr);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // IRequestOps::TWaitResponse
+ TUdpHttpResponse* IRequestOps::TWaitResponse::GetResponse() {
+ if (!Response)
+ return nullptr;
+ TUdpHttpResponse* res = Response;
+ Response = nullptr;
+ LoadResponseData(res);
+ return res;
+ }
+
+ void IRequestOps::TWaitResponse::SetResponse(TUdpHttpResponse* r) {
+ Y_ASSERT(Response == nullptr || r == nullptr);
+ if (r)
+ Response = r;
+ CompleteEvent.Signal();
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // TRequesterUserQueues
+ TUdpHttpRequest* TRequesterUserQueues::GetRequest() {
+ TUdpHttpRequest* res = nullptr;
+ ReqList.Dequeue(&res);
+ if (res) {
+ AtomicAdd(QueueSizes->ReqCount, -1);
+ AtomicAdd(QueueSizes->ReqQueueSize, -GetPacketSize(res->DataHolder.Get()));
+ }
+ UpdateAsyncSignalState();
+ LoadRequestData(res);
+ return res;
+ }
+
+ TUdpHttpResponse* TRequesterUserQueues::GetResponse() {
+ TUdpHttpResponse* res = nullptr;
+ ResponseList.Dequeue(&res);
+ if (res) {
+ AtomicAdd(QueueSizes->RespCount, -1);
+ AtomicAdd(QueueSizes->RespQueueSize, -GetPacketSize(res->DataHolder.Get()));
+ }
+ UpdateAsyncSignalState();
+ LoadResponseData(res);
+ return res;
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ class TRequestOps: public IRequestOps {
+ TIntrusivePtr<TUdpHttp> Requester;
+ TIntrusivePtr<TRequesterUserQueues> UserQueues;
+
+ public:
+ TRequestOps(TUdpHttp* req)
+ : Requester(req)
+ {
+ UserQueues = new TRequesterUserQueues(req->GetQueueSizes());
+ }
+ void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override {
+ Requester->SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get());
+ }
+ void CancelRequest(const TGUID& reqId) override {
+ Requester->CancelRequest(reqId);
+ }
+ void BreakRequest(const TGUID& reqId) override {
+ Requester->BreakRequest(reqId);
+ }
+
+ void SendResponse(const TGUID& reqId, TVector<char>* data) override {
+ Requester->SendResponseImpl(reqId, PP_NORMAL, data);
+ }
+ void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override {
+ Requester->SendResponseImpl(reqId, PP_LOW, data);
+ }
+ TUdpHttpRequest* GetRequest() override {
+ Y_ASSERT(0);
+ //return UserQueues.GetRequest();
+ return nullptr; // all requests are routed to the main requester
+ }
+ TUdpHttpResponse* GetResponse() override {
+ return UserQueues->GetResponse();
+ }
+ bool GetRequestCancel(TGUID*) override {
+ Y_ASSERT(0);
+ return false; // all request cancels are routed to the main requester
+ }
+ bool GetSendRequestAcc(TGUID* req) override {
+ return UserQueues->GetSendRequestAcc(req);
+ }
+ // sync mode
+ TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
+ return Requester->Request(addr, url, data);
+ }
+ TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
+ return Requester->WaitableRequest(addr, url, data);
+ }
+ //
+ TMuxEvent& GetAsyncEvent() override {
+ return UserQueues->GetAsyncEvent();
+ }
+ };
+
+ IRequestOps* TUdpHttp::CreateSubRequester() {
+ return new TRequestOps(this);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ void AbortOnFailedRequest(TUdpHttpResponse* answer) {
+ if (answer && answer->Ok == TUdpHttpResponse::FAILED) {
+ fprintf(stderr, "Failed request to host %s\n", GetAddressAsString(answer->PeerAddress).data());
+ fprintf(stderr, "Error description: %s\n", answer->Error.data());
+ fflush(nullptr);
+ Y_ASSERT(0);
+ abort();
+ }
+ }
+
+ TString GetDebugInfo(const TUdpAddress& addr, double timeout) {
+ NHPTimer::STime start;
+ NHPTimer::GetTime(&start);
+ TIntrusivePtr<IUdpHost> host = CreateUdpHost(0);
+ {
+ TAutoPtr<TRopeDataPacket> rq = new TRopeDataPacket;
+ rq->Write((char)PKT_GETDEBUGINFO);
+ ui32 crc32 = CalcChecksum(rq->GetChain());
+ host->Send(addr, rq.Release(), crc32, nullptr, PP_HIGH);
+ }
+ for (;;) {
+ TAutoPtr<TRequest> ptr = host->GetRequest();
+ if (ptr.Get()) {
+ TBlockChainIterator reqData(ptr->Data->GetChain());
+ int sz = reqData.GetSize();
+ TString res;
+ res.resize(sz);
+ reqData.Read(res.begin(), sz);
+ return res;
+ }
+ host->Step();
+ host->Wait(0.1f);
+
+ NHPTimer::STime now;
+ NHPTimer::GetTime(&now);
+ if (NHPTimer::GetSeconds(now - start) > timeout) {
+ return TString();
+ }
+ }
+ }
+
+ void Kill(const TUdpAddress& addr) {
+ TIntrusivePtr<IUdpHost> host = CreateUdpHost(0);
+ host->Kill(addr);
+ }
+
+ void StopAllNetLibaThreads() {
+ PanicAttack = true; // AAAA!!!!
+ }
+
+ void SetNetLibaHeartbeatTimeout(double timeoutSec) {
+ NetLibaHeartbeat();
+ HeartbeatTimeout.store(timeoutSec, std::memory_order_release);
+ }
+
+ void NetLibaHeartbeat() {
+ NHPTimer::STime now;
+ NHPTimer::GetTime(&now);
+ LastHeartbeat.store(now, std::memory_order_release);
+ }
+
+ IRequester* CreateHttpUdpRequester(int port) {
+ if (PanicAttack)
+ return nullptr;
+
+ TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket();
+ socket->Open(port);
+ if (!socket->IsValid())
+ return nullptr;
+
+ return CreateHttpUdpRequester(socket);
+ }
+
+ IRequester* CreateHttpUdpRequester(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
+ if (PanicAttack)
+ return nullptr;
+
+ TIntrusivePtr<TUdpHttp> res(new TUdpHttp);
+ if (!res->Start(socket))
+ return nullptr;
+ return res.Release();
+ }
+
+}