#include "stdafx.h"
#include "udp_http.h"
#include "udp_client_server.h"
#include "udp_socket.h"
#include "cpu_affinity.h"
#include <library/cpp/threading/atomic/bool.h>
#include <util/system/hp_timer.h>
#include <util/thread/lfqueue.h>
#include <util/system/thread.h>
#include <util/system/spinlock.h>
#if !defined(_win_)
#include <signal.h>
#include <pthread.h>
#endif
#include "block_chain.h"
#include <util/system/shmat.h>
#include <atomic>
namespace NNetliba {
const float HTTP_TIMEOUT = 15.0f;
const int MIN_SHARED_MEM_PACKET = 1000;
static ::NAtomic::TBool PanicAttack;
static std::atomic<NHPTimer::STime> LastHeartbeat;
static std::atomic<double> HeartbeatTimeout;
static int GetPacketSize(TRequest* req) {
if (req && req->Data.Get())
return req->Data->GetSize();
return 0;
}
static bool IsLocalFast(const TUdpAddress& addr) {
if (addr.IsIPv4()) {
return IsLocalIPv4(addr.GetIPv4());
} else {
return IsLocalIPv6(addr.Network, addr.Interface);
}
}
bool IsLocal(const TUdpAddress& addr) {
InitLocalIPList();
return IsLocalFast(addr);
}
TUdpHttpRequest::~TUdpHttpRequest() {
}
TUdpHttpResponse::~TUdpHttpResponse() {
}
class TRequesterUserQueueSizes: public TThrRefBase {
public:
TAtomic ReqCount, RespCount;
TAtomic ReqQueueSize, RespQueueSize;
TRequesterUserQueueSizes()
: ReqCount(0)
, RespCount(0)
, ReqQueueSize(0)
, RespQueueSize(0)
{
}
};
template <class T>
void EraseList(TLockFreeQueue<T*>* data) {
T* ptr = nullptr;
while (data->Dequeue(&ptr)) {
delete ptr;
}
}
class TRequesterUserQueues: public TThrRefBase {
TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes;
TLockFreeQueue<TUdpHttpRequest*> ReqList;
TLockFreeQueue<TUdpHttpResponse*> ResponseList;
TLockFreeStack<TGUID> CancelList, SendRequestAccList; // any order will do
TMuxEvent AsyncEvent;
void UpdateAsyncSignalState() {
// not sure about this one. Idea is that AsyncEvent.Reset() is a memory barrier
if (ReqList.IsEmpty() && ResponseList.IsEmpty() && CancelList.IsEmpty() && SendRequestAccList.IsEmpty()) {
AsyncEvent.Reset();
if (!ReqList.IsEmpty() || !ResponseList.IsEmpty() || !CancelList.IsEmpty() || !SendRequestAccList.IsEmpty())
AsyncEvent.Signal();
}
}
~TRequesterUserQueues() override {
EraseList(&ReqList);
EraseList(&ResponseList);
}
public:
TRequesterUserQueues(TRequesterUserQueueSizes* queueSizes)
: QueueSizes(queueSizes)
{
}
TUdpHttpRequest* GetRequest();
TUdpHttpResponse* GetResponse();
bool GetRequestCancel(TGUID* req) {
bool res = CancelList.Dequeue(req);
UpdateAsyncSignalState();
return res;
}
bool GetSendRequestAcc(TGUID* req) {
bool res = SendRequestAccList.Dequeue(req);
UpdateAsyncSignalState();
return res;
}
void AddRequest(TUdpHttpRequest* res) {
AtomicAdd(QueueSizes->ReqCount, 1);
AtomicAdd(QueueSizes->ReqQueueSize, GetPacketSize(res->DataHolder.Get()));
ReqList.Enqueue(res);
AsyncEvent.Signal();
}
void AddResponse(TUdpHttpResponse* res) {
AtomicAdd(QueueSizes->RespCount, 1);
AtomicAdd(QueueSizes->RespQueueSize, GetPacketSize(res->DataHolder.Get()));
ResponseList.Enqueue(res);
AsyncEvent.Signal();
}
void AddCancel(const TGUID& req) {
CancelList.Enqueue(req);
AsyncEvent.Signal();
}
void AddSendRequestAcc(const TGUID& req) {
SendRequestAccList.Enqueue(req);
AsyncEvent.Signal();
}
TMuxEvent& GetAsyncEvent() {
return AsyncEvent;
}
void AsyncSignal() {
AsyncEvent.Signal();
}
};
struct TOutRequestState {
enum EState {
S_SENDING,
S_WAITING,
S_WAITING_PING_SENDING,
S_WAITING_PING_SENT,
S_CANCEL_AFTER_SENDING
};
EState State;
TUdpAddress Address;
double TimePassed;
int PingTransferId;
TIntrusivePtr<TRequesterUserQueues> UserQueues;
TOutRequestState()
: State(S_SENDING)
, TimePassed(0)
, PingTransferId(-1)
{
}
};
struct TInRequestState {
enum EState {
S_WAITING,
S_RESPONSE_SENDING,
S_CANCELED,
};
EState State;
TUdpAddress Address;
TInRequestState()
: State(S_WAITING)
{
}
TInRequestState(const TUdpAddress& address)
: State(S_WAITING)
, Address(address)
{
}
};
enum EHttpPacket {
PKT_REQUEST,
PKT_PING,
PKT_PING_RESPONSE,
PKT_RESPONSE,
PKT_GETDEBUGINFO,
PKT_LOCAL_REQUEST,
PKT_LOCAL_RESPONSE,
PKT_CANCEL,
};
class TUdpHttp: public IRequester {
enum EDir {
DIR_OUT,
DIR_IN
};
struct TTransferPurpose {
EDir Dir;
TGUID Guid;
TTransferPurpose()
: Dir(DIR_OUT)
{
}
TTransferPurpose(EDir dir, TGUID guid)
: Dir(dir)
, Guid(guid)
{
}
};
struct TSendRequest {
TUdpAddress Addr;
TAutoPtr<TRopeDataPacket> Data;
TGUID ReqGuid;
TIntrusivePtr<TWaitResponse> WR;
TIntrusivePtr<TRequesterUserQueues> UserQueues;
ui32 Crc32;
TSendRequest()
: Crc32(0)
{
}
TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqguid, TWaitResponse* wr, TRequesterUserQueues* userQueues)
: Addr(addr)
, Data(*data)
, ReqGuid(reqguid)
, WR(wr)
, UserQueues(userQueues)
, Crc32(CalcChecksum(Data->GetChain()))
{
}
};
struct TSendResponse {
TVector<char> Data;
TGUID ReqGuid;
ui32 DataCrc32;
EPacketPriority Priority;
TSendResponse()
: DataCrc32(0)
, Priority(PP_NORMAL)
{
}
TSendResponse(const TGUID& reqguid, EPacketPriority prior, TVector<char>* data)
: ReqGuid(reqguid)
, DataCrc32(0)
, Priority(prior)
{
if (data && !data->empty()) {
data->swap(Data);
DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize());
}
}
};
struct TCancelRequest {
TGUID ReqGuid;
TCancelRequest() = default;
TCancelRequest(const TGUID& reqguid)
: ReqGuid(reqguid)
{
}
};
struct TBreakRequest {
TGUID ReqGuid;
TBreakRequest() = default;
TBreakRequest(const TGUID& reqguid)
: ReqGuid(reqguid)
{
}
};
TThread myThread;
bool KeepRunning, AbortTransactions;
TSpinLock cs;
TSystemEvent HasStarted;
NHPTimer::STime PingsSendT;
TIntrusivePtr<IUdpHost> Host;
TIntrusivePtr<NNetlibaSocket::ISocket> Socket;
typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash;
typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash;
TOutRequestHash OutRequests;
TInRequestHash InRequests;
typedef THashMap<int, TTransferPurpose> TTransferHash;
TTransferHash TransferHash;
typedef THashMap<TGUID, TIntrusivePtr<TWaitResponse>, TGUIDHash> TSyncRequests;
TSyncRequests SyncRequests;
// hold it here to not construct on every DoSends()
typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels;
TAnticipateCancels AnticipateCancels;
TLockFreeQueue<TSendRequest*> SendReqList;
TLockFreeQueue<TSendResponse*> SendRespList;
TLockFreeQueue<TCancelRequest> CancelReqList;
TLockFreeQueue<TBreakRequest> BreakReqList;
TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes;
TIntrusivePtr<TRequesterUserQueues> UserQueues;
struct TStatsRequest: public TThrRefBase {
enum EReq {
PENDING_SIZE,
DEBUG_INFO,
HAS_IN_REQUEST,
GET_PEER_ADDRESS,
GET_PEER_QUEUE_STATS,
};
EReq Req;
TRequesterPendingDataStats PendingDataSize;
TString DebugInfo;
TGUID RequestId;
TUdpAddress PeerAddress;
TIntrusivePtr<IPeerQueueStats> QueueStats;
bool RequestFound;
TSystemEvent Complete;
TStatsRequest(EReq req)
: Req(req)
, RequestFound(false)
{
}
};
TLockFreeQueue<TIntrusivePtr<TStatsRequest>> StatsReqList;
bool ReportRequestCancel;
bool ReportSendRequestAcc;
void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TAutoPtr<TRequest> data, const char* error = nullptr) {
TOutRequestState& s = i->second;
TUdpHttpResponse* res = new TUdpHttpResponse;
res->DataHolder = data;
res->ReqId = i->first;
res->PeerAddress = s.Address;
res->Ok = ok;
if (ok == TUdpHttpResponse::FAILED)
res->Error = error ? error : "request failed";
else if (ok == TUdpHttpResponse::CANCELED)
res->Error = error ? error : "request cancelled";
TSyncRequests::iterator k = SyncRequests.find(res->ReqId);
if (k != SyncRequests.end()) {
TIntrusivePtr<TWaitResponse>& wr = k->second;
wr->SetResponse(res);
SyncRequests.erase(k);
} else {
s.UserQueues->AddResponse(res);
}
OutRequests.erase(i);
}
int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) {
ui32 crc32 = CalcChecksum(data->GetChain());
return Host->Send(addr, data.Release(), crc32, nullptr, PP_HIGH);
}
void ProcessIncomingPackets() {
TVector<TGUID, TCustomAllocator<TGUID>> failedRequests;
for (;;) {
TAutoPtr<TRequest> req = Host->GetRequest();
if (req.Get() == nullptr) {
if (!failedRequests.empty()) {
// we want to handle following sequence of events
// <- send ping
// -> send response over IB
// -> send ping response (no such request) over UDP
// Now if we are lucky enough we can get IB response waiting in the IB receive queue
// at the same time response sender will receive "send complete" from IB
// indeed, IB delivered message (but it was not parsed by ib_cs.cpp yet)
// so after receiving "send response complete" event resposne sender can legally response
// to pings with "no such request"
// but ping responses can be sent over UDP
// So we can run into situation with negative ping response in
// UDP receive queue and response waiting unprocessed in IB receive queue
// to check that there is no response in the IB queue we have to process IB queues
// so we call IBStep()
Host->IBStep();
req = Host->GetRequest();
if (req.Get() == nullptr) {
break;
}
} else {
break;
}
}
TBlockChainIterator reqData(req->Data->GetChain());
char pktType;
reqData.Read(&pktType, 1);
switch (pktType) {
case PKT_REQUEST:
case PKT_LOCAL_REQUEST: {
//printf("recv PKT_REQUEST or PKT_LOCAL_REQUEST\n");
TGUID reqId = req->Guid;
TInRequestHash::iterator z = InRequests.find(reqId);
if (z != InRequests.end()) {
// oops, this request already exists!
// might happen if request can be stored in single packet
// and this packet had source IP broken during transmission and managed to pass crc checks
// since we already reported wrong source address for this request to the user
// the best thing we can do is to stop the program to avoid further complications
// but we just report the accident to stderr
fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n",
GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(),
GetAddressAsString(req->Address).c_str());
} else {
InRequests[reqId] = TInRequestState(req->Address);
//printf("InReq %s PKT_REQUEST recv ... -> S_WAITING\n", GetGuidAsString(reqId).c_str());
TUdpHttpRequest* res = new TUdpHttpRequest;
res->ReqId = reqId;
res->PeerAddress = req->Address;
res->DataHolder = req;
UserQueues->AddRequest(res);
}
} break;
case PKT_PING: {
//printf("recv PKT_PING\n");
TGUID guid;
reqData.Read(&guid, sizeof(guid));
bool ok = InRequests.find(guid) != InRequests.end();
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ms->Write((char)PKT_PING_RESPONSE);
ms->Write(guid);
ms->Write(ok);
SendWithHighPriority(req->Address, ms.Release());
//printf("InReq %s PKT_PING recv Sending PKT_PING_RESPONSE\n", GetGuidAsString(guid).c_str());
//printf("got PKT_PING, responding %d\n", (int)ok);
} break;
case PKT_PING_RESPONSE: {
//printf("recv PKT_PING_RESPONSE\n");
TGUID guid;
bool ok;
reqData.Read(&guid, sizeof(guid));
reqData.Read(&ok, sizeof(ok));
TOutRequestHash::iterator i = OutRequests.find(guid);
if (i == OutRequests.end()) {
; //Y_ASSERT(0); // actually possible with some packet orders
} else {
if (!ok) {
// can not delete request at this point
// since we can receive failed ping and response at the same moment
// consider sequence: client sends ping, server sends response
// and replies false to ping as reply is sent
// we can not receive failed ping_response earlier then response itself
// but we can receive them simultaneously
failedRequests.push_back(guid);
//printf("OutReq %s PKT_PING_RESPONSE recv no such query -> failed\n", GetGuidAsString(guid).c_str());
} else {
TOutRequestState& s = i->second;
switch (s.State) {
case TOutRequestState::S_WAITING_PING_SENDING: {
Y_ASSERT(s.PingTransferId >= 0);
TTransferHash::iterator k = TransferHash.find(s.PingTransferId);
if (k != TransferHash.end())
TransferHash.erase(k);
else
Y_ASSERT(0);
s.PingTransferId = -1;
s.TimePassed = 0;
s.State = TOutRequestState::S_WAITING;
//printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENDING -> S_WAITING\n", GetGuidAsString(guid).c_str());
} break;
case TOutRequestState::S_WAITING_PING_SENT:
s.TimePassed = 0;
s.State = TOutRequestState::S_WAITING;
//printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENT -> S_WAITING\n", GetGuidAsString(guid).c_str());
break;
default:
Y_ASSERT(0);
break;
}
}
}
} break;
case PKT_RESPONSE:
case PKT_LOCAL_RESPONSE: {
//printf("recv PKT_RESPONSE or PKT_LOCAL_RESPONSE\n");
TGUID guid;
reqData.Read(&guid, sizeof(guid));
TOutRequestHash::iterator i = OutRequests.find(guid);
if (i == OutRequests.end()) {
; //Y_ASSERT(0); // does happen
//printf("OutReq %s PKT_RESPONSE recv for non-existing req\n", GetGuidAsString(guid).c_str());
} else {
FinishRequest(i, TUdpHttpResponse::OK, req);
//printf("OutReq %s PKT_RESPONSE recv ... -> ok\n", GetGuidAsString(guid).c_str());
}
} break;
case PKT_CANCEL: {
//printf("recv PKT_CANCEL\n");
TGUID guid;
reqData.Read(&guid, sizeof(guid));
TInRequestHash::iterator i = InRequests.find(guid);
if (i == InRequests.end()) {
; //Y_ASSERT(0); // may happen
//printf("InReq %s PKT_CANCEL recv for non-existing req\n", GetGuidAsString(guid).c_str());
} else {
TInRequestState& s = i->second;
if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel)
UserQueues->AddCancel(guid);
s.State = TInRequestState::S_CANCELED;
//printf("InReq %s PKT_CANCEL recv\n", GetGuidAsString(guid).c_str());
}
} break;
case PKT_GETDEBUGINFO: {
//printf("recv PKT_GETDEBUGINFO\n");
TString dbgInfo = GetDebugInfoLocked();
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ms->Write(dbgInfo.c_str(), (int)dbgInfo.size());
SendWithHighPriority(req->Address, ms);
} break;
default:
Y_ASSERT(0);
}
}
// cleanup failed requests
for (size_t k = 0; k < failedRequests.size(); ++k) {
const TGUID& guid = failedRequests[k];
TOutRequestHash::iterator i = OutRequests.find(guid);
if (i != OutRequests.end())
FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: recv no such query");
}
}
void AnalyzeSendResults() {
TSendResult res;
while (Host->GetSendResult(&res)) {
//printf("Send result received\n");
TTransferHash::iterator k1 = TransferHash.find(res.TransferId);
if (k1 != TransferHash.end()) {
const TTransferPurpose& tp = k1->second;
switch (tp.Dir) {
case DIR_OUT: {
TOutRequestHash::iterator i = OutRequests.find(tp.Guid);
if (i != OutRequests.end()) {
const TGUID& reqId = i->first;
TOutRequestState& s = i->second;
switch (s.State) {
case TOutRequestState::S_SENDING:
if (!res.Success) {
FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING");
//printf("OutReq %s AnalyzeSendResults() S_SENDING -> failed\n", GetGuidAsString(reqId).c_str());
} else {
if (ReportSendRequestAcc) {
if (s.UserQueues.Get()) {
s.UserQueues->AddSendRequestAcc(reqId);
} else {
// waitable request?
TSyncRequests::iterator k2 = SyncRequests.find(reqId);
if (k2 != SyncRequests.end()) {
TIntrusivePtr<TWaitResponse>& wr = k2->second;
wr->SetRequestSent();
}
}
}
s.State = TOutRequestState::S_WAITING;
//printf("OutReq %s AnalyzeSendResults() S_SENDING -> S_WAITING\n", GetGuidAsString(reqId).c_str());
s.TimePassed = 0;
}
break;
case TOutRequestState::S_CANCEL_AFTER_SENDING:
DoSendCancel(s.Address, reqId);
FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING");
break;
case TOutRequestState::S_WAITING:
case TOutRequestState::S_WAITING_PING_SENT:
Y_ASSERT(0);
break;
case TOutRequestState::S_WAITING_PING_SENDING:
Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId);
if (!res.Success) {
FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING");
//printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> failed\n", GetGuidAsString(reqId).c_str());
} else {
s.PingTransferId = -1;
s.State = TOutRequestState::S_WAITING_PING_SENT;
//printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> S_WAITING_PING_SENT\n", GetGuidAsString(reqId).c_str());
s.TimePassed = 0;
}
break;
default:
Y_ASSERT(0);
break;
}
}
} break;
case DIR_IN: {
TInRequestHash::iterator i = InRequests.find(tp.Guid);
if (i != InRequests.end()) {
Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED);
InRequests.erase(i);
//if (res.Success)
// printf("InReq %s AnalyzeSendResults() ... -> finished\n", GetGuidAsString(tp.Guid).c_str());
//else
// printf("InReq %s AnalyzeSendResults() ... -> failed response send\n", GetGuidAsString(tp.Guid).c_str());
}
} break;
default:
Y_ASSERT(0);
break;
}
TransferHash.erase(k1);
}
}
}
void SendPingsIfNeeded() {
NHPTimer::STime tChk = PingsSendT;
float deltaT = (float)NHPTimer::GetTimePassed(&tChk);
if (deltaT < 0.05) {
return;
}
PingsSendT = tChk;
deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3);
{
for (TOutRequestHash::iterator i = OutRequests.begin(); i != OutRequests.end();) {
TOutRequestHash::iterator curIt = i++;
TOutRequestState& s = curIt->second;
const TGUID& guid = curIt->first;
switch (s.State) {
case TOutRequestState::S_WAITING:
s.TimePassed += deltaT;
if (s.TimePassed > HTTP_TIMEOUT) {
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ms->Write((char)PKT_PING);
ms->Write(guid);
int transId = SendWithHighPriority(s.Address, ms.Release());
TransferHash[transId] = TTransferPurpose(DIR_OUT, guid);
s.State = TOutRequestState::S_WAITING_PING_SENDING;
//printf("OutReq %s SendPingsIfNeeded() S_WAITING -> S_WAITING_PING_SENDING\n", GetGuidAsString(guid).c_str());
s.PingTransferId = transId;
}
break;
case TOutRequestState::S_WAITING_PING_SENT:
s.TimePassed += deltaT;
if (s.TimePassed > HTTP_TIMEOUT) {
//printf("OutReq %s SendPingsIfNeeded() S_WAITING_PING_SENT -> failed\n", GetGuidAsString(guid).c_str());
FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT");
}
break;
default:
break;
}
}
}
}
void Step() {
{
TGuard<TSpinLock> lock(cs);
DoSends();
}
Host->Step();
for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) {
switch (req->Req) {
case TStatsRequest::PENDING_SIZE:
Host->GetPendingDataSize(&req->PendingDataSize);
break;
case TStatsRequest::DEBUG_INFO: {
TGuard<TSpinLock> lock(cs);
req->DebugInfo = GetDebugInfoLocked();
} break;
case TStatsRequest::HAS_IN_REQUEST: {
TGuard<TSpinLock> lock(cs);
req->RequestFound = (InRequests.find(req->RequestId) != InRequests.end());
} break;
case TStatsRequest::GET_PEER_ADDRESS: {
TGuard<TSpinLock> lock(cs);
TInRequestHash::const_iterator i = InRequests.find(req->RequestId);
if (i != InRequests.end()) {
req->PeerAddress = i->second.Address;
} else {
TOutRequestHash::const_iterator o = OutRequests.find(req->RequestId);
if (o != OutRequests.end()) {
req->PeerAddress = o->second.Address;
} else {
req->PeerAddress = TUdpAddress();
}
}
} break;
case TStatsRequest::GET_PEER_QUEUE_STATS:
req->QueueStats = Host->GetQueueStats(req->PeerAddress);
break;
default:
Y_ASSERT(0);
break;
}
req->Complete.Signal();
}
{
TGuard<TSpinLock> lock(cs);
DoSends();
ProcessIncomingPackets();
AnalyzeSendResults();
SendPingsIfNeeded();
}
}
void Wait() {
Host->Wait(0.1f);
}
void DoSendCancel(const TUdpAddress& addr, const TGUID& req) {
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ms->Write((char)PKT_CANCEL);
ms->Write(req);
SendWithHighPriority(addr, ms);
}
void DoSends() {
{
TBreakRequest rb;
while (BreakReqList.Dequeue(&rb)) {
InRequests.erase(rb.ReqGuid);
}
}
{
// cancelling requests
TCancelRequest rc;
while (CancelReqList.Dequeue(&rc)) {
TOutRequestHash::iterator i = OutRequests.find(rc.ReqGuid);
if (i == OutRequests.end()) {
AnticipateCancels.insert(rc.ReqGuid);
continue; // cancelling non existing request is ok
}
TOutRequestState& s = i->second;
if (s.State == TOutRequestState::S_SENDING) {
// we are in trouble - have not sent request and we already have to cancel it, wait send
s.State = TOutRequestState::S_CANCEL_AFTER_SENDING;
} else {
DoSendCancel(s.Address, rc.ReqGuid);
FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side");
}
}
}
{
// sending replies
for (TSendResponse* rd = nullptr; SendRespList.Dequeue(&rd); delete rd) {
TInRequestHash::iterator i = InRequests.find(rd->ReqGuid);
if (i == InRequests.end()) {
Y_ASSERT(0);
continue;
}
TInRequestState& s = i->second;
if (s.State == TInRequestState::S_CANCELED) {
// need not send response for the canceled request
InRequests.erase(i);
continue;
}
Y_ASSERT(s.State == TInRequestState::S_WAITING);
s.State = TInRequestState::S_RESPONSE_SENDING;
//printf("InReq %s SendResponse() ... -> S_RESPONSE_SENDING (pkt %s)\n", GetGuidAsString(reqId).c_str(), GetGuidAsString(lowPktGuid).c_str());
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
ui32 crc32 = 0;
int dataSize = rd->Data.ysize();
if (rd->Data.ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(s.Address)) {
TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
if (shm->Create(dataSize)) {
ms->Write((char)PKT_LOCAL_RESPONSE);
ms->Write(rd->ReqGuid);
memcpy(shm->GetPtr(), &rd->Data[0], dataSize);
TVector<char> empty;
rd->Data.swap(empty);
ms->AttachSharedData(shm);
crc32 = CalcChecksum(ms->GetChain());
}
}
if (ms->GetSharedData() == nullptr) {
ms->Write((char)PKT_RESPONSE);
ms->Write(rd->ReqGuid);
// to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32
// this means that we are calculating crc when shared memory is used
// it is hard to avoid since in SendResponse() we don't know if shared mem will be used (peer address is not available there)
TIncrementalChecksumCalcer csCalcer;
AddChain(&csCalcer, ms->GetChain());
// here we are replicating the way WriteDestructive serializes data
csCalcer.AddBlock(&dataSize, sizeof(dataSize));
csCalcer.AddBlockSum(rd->DataCrc32, dataSize);
crc32 = csCalcer.CalcChecksum();
ms->WriteDestructive(&rd->Data);
//ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses
//Y_ASSERT(chkCrc == crc32);
}
int transId = Host->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority);
TransferHash[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid);
}
}
{
// sending requests
for (TSendRequest* rd = nullptr; SendReqList.Dequeue(&rd); delete rd) {
Y_ASSERT(OutRequests.find(rd->ReqGuid) == OutRequests.end());
{
TOutRequestState& s = OutRequests[rd->ReqGuid];
s.State = TOutRequestState::S_SENDING;
s.Address = rd->Addr;
s.UserQueues = rd->UserQueues;
//printf("OutReq %s SendRequest() ... -> S_SENDING\n", GetGuidAsString(guid).c_str());
}
if (rd->WR.Get())
SyncRequests[rd->ReqGuid] = rd->WR;
if (AnticipateCancels.find(rd->ReqGuid) != AnticipateCancels.end()) {
FinishRequest(OutRequests.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "request canceled before transmitting");
} else {
TGUID pktGuid = rd->ReqGuid; // request packet id should match request id
int transId = Host->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL);
TransferHash[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid);
}
}
}
if (!AnticipateCancels.empty()) {
AnticipateCancels.clear();
}
}
public:
void SendRequestImpl(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId,
TWaitResponse* wr, TRequesterUserQueues* userQueues) {
if (data && data->size() > MAX_PACKET_SIZE) {
Y_VERIFY(0, "data size is too large");
}
//printf("SendRequest(%s)\n", url.c_str());
if (wr)
wr->SetReqId(reqId);
TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
if (data && data->ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(addr)) {
int dataSize = data->ysize();
TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
if (shm->Create(dataSize)) {
ms->Write((char)PKT_LOCAL_REQUEST);
ms->WriteStroka(url);
memcpy(shm->GetPtr(), &(*data)[0], dataSize);
TVector<char> empty;
data->swap(empty);
ms->AttachSharedData(shm);
}
}
if (ms->GetSharedData() == nullptr) {
ms->Write((char)PKT_REQUEST);
ms->WriteStroka(url);
ms->WriteDestructive(data);
}
SendReqList.Enqueue(new TSendRequest(addr, &ms, reqId, wr, userQueues));
Host->CancelWait();
}
void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override {
SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get());
}
void CancelRequest(const TGUID& reqId) override {
CancelReqList.Enqueue(TCancelRequest(reqId));
Host->CancelWait();
}
void BreakRequest(const TGUID& reqId) override {
BreakReqList.Enqueue(TBreakRequest(reqId));
Host->CancelWait();
}
void SendResponseImpl(const TGUID& reqId, EPacketPriority prior, TVector<char>* data) // non-virtual, for direct call from TRequestOps
{
if (data && data->size() > MAX_PACKET_SIZE) {
Y_VERIFY(0, "data size is too large");
}
SendRespList.Enqueue(new TSendResponse(reqId, prior, data));
Host->CancelWait();
}
void SendResponse(const TGUID& reqId, TVector<char>* data) override {
SendResponseImpl(reqId, PP_NORMAL, data);
}
void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override {
SendResponseImpl(reqId, PP_LOW, data);
}
TUdpHttpRequest* GetRequest() override {
return UserQueues->GetRequest();
}
TUdpHttpResponse* GetResponse() override {
return UserQueues->GetResponse();
}
bool GetRequestCancel(TGUID* req) override {
return UserQueues->GetRequestCancel(req);
}
bool GetSendRequestAcc(TGUID* req) override {
return UserQueues->GetSendRequestAcc(req);
}
TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
TIntrusivePtr<TWaitResponse> wr = WaitableRequest(addr, url, data);
wr->Wait();
return wr->GetResponse();
}
TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
TIntrusivePtr<TWaitResponse> wr = new TWaitResponse;
TGUID reqId;
CreateGuid(&reqId);
SendRequestImpl(addr, url, data, reqId, wr.Get(), nullptr);
return wr;
}
TMuxEvent& GetAsyncEvent() override {
return UserQueues->GetAsyncEvent();
}
int GetPort() override {
return Socket.Get() ? Socket->GetPort() : 0;
}
void StopNoWait() override {
AbortTransactions = true;
KeepRunning = false;
UserQueues->AsyncSignal();
// calcel all outgoing requests
TGuard<TSpinLock> lock(cs);
while (!OutRequests.empty()) {
// cancel without informing peer that we are cancelling the request
FinishRequest(OutRequests.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()");
}
}
void ExecStatsRequest(TIntrusivePtr<TStatsRequest> req) {
StatsReqList.Enqueue(req);
Host->CancelWait();
req->Complete.Wait();
}
TUdpAddress GetPeerAddress(const TGUID& reqId) override {
TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_ADDRESS);
req->RequestId = reqId;
ExecStatsRequest(req);
return req->PeerAddress;
}
void GetPendingDataSize(TRequesterPendingDataStats* res) override {
TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::PENDING_SIZE);
ExecStatsRequest(req);
*res = req->PendingDataSize;
}
bool HasRequest(const TGUID& reqId) override {
TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::HAS_IN_REQUEST);
req->RequestId = reqId;
ExecStatsRequest(req);
return req->RequestFound;
}
private:
void FinishOutstandingTransactions() {
// wait all pending requests, all new requests are canceled
while ((!OutRequests.empty() || !InRequests.empty() || !SendRespList.IsEmpty() || !SendReqList.IsEmpty()) && !PanicAttack) {
while (TUdpHttpRequest* req = GetRequest()) {
TInRequestHash::iterator i = InRequests.find(req->ReqId);
//printf("dropping request(%s) (thread %d)\n", req->Url.c_str(), ThreadId());
delete req;
if (i == InRequests.end()) {
Y_ASSERT(0);
continue;
}
InRequests.erase(i);
}
Step();
sleep(0);
}
}
static void* ExecServerThread(void* param) {
BindToSocket(0);
SetHighestThreadPriority();
TUdpHttp* pThis = (TUdpHttp*)param;
pThis->Host = CreateUdpHost(pThis->Socket);
pThis->HasStarted.Signal();
if (!pThis->Host) {
pThis->Socket.Drop();
return nullptr;
}
NHPTimer::GetTime(&pThis->PingsSendT);
while (pThis->KeepRunning && !PanicAttack) {
if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) {
NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire);
double passed = NHPTimer::GetTimePassed(&chk);
if (passed > HeartbeatTimeout.load(std::memory_order_acquire)) {
StopAllNetLibaThreads();
fprintf(stderr, "%s\tTUdpHttp\tWaiting for %0.2f, time limit %0.2f, commit a suicide!11\n", Now().ToStringUpToSeconds().c_str(), passed, HeartbeatTimeout.load(std::memory_order_acquire));
fflush(stderr);
#ifndef _win_
killpg(0, SIGKILL);
#endif
abort();
break;
}
}
pThis->Step();
pThis->Wait();
}
if (!pThis->AbortTransactions && !PanicAttack)
pThis->FinishOutstandingTransactions();
pThis->Host = nullptr;
return nullptr;
}
~TUdpHttp() override {
if (myThread.Running()) {
KeepRunning = false;
myThread.Join();
}
for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) {
req->Complete.Signal();
}
}
public:
TUdpHttp()
: myThread(TThread::TParams(ExecServerThread, (void*)this).SetName("nl6_udp_host"))
, KeepRunning(true)
, AbortTransactions(false)
, PingsSendT(0)
, ReportRequestCancel(false)
, ReportSendRequestAcc(false)
{
NHPTimer::GetTime(&PingsSendT);
QueueSizes = new TRequesterUserQueueSizes;
UserQueues = new TRequesterUserQueues(QueueSizes.Get());
}
bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
Y_ASSERT(Host.Get() == nullptr);
Socket = socket;
myThread.Start();
HasStarted.Wait();
if (Host.Get()) {
return true;
}
Socket.Drop();
return false;
}
TString GetDebugInfoLocked() {
TString res = KeepRunning ? "State: running\n" : "State: stopping\n";
res += Host->GetDebugInfo();
char buf[1000];
TRequesterUserQueueSizes* qs = QueueSizes.Get();
sprintf(buf, "\nRequest queue %d (%d bytes)\n", (int)AtomicGet(qs->ReqCount), (int)AtomicGet(qs->ReqQueueSize));
res += buf;
sprintf(buf, "Response queue %d (%d bytes)\n", (int)AtomicGet(qs->RespCount), (int)AtomicGet(qs->RespQueueSize));
res += buf;
const char* outReqStateNames[] = {
"S_SENDING",
"S_WAITING",
"S_WAITING_PING_SENDING",
"S_WAITING_PING_SENT",
"S_CANCEL_AFTER_SENDING"};
const char* inReqStateNames[] = {
"S_WAITING",
"S_RESPONSE_SENDING",
"S_CANCELED"};
res += "\nOut requests:\n";
for (TOutRequestHash::const_iterator i = OutRequests.begin(); i != OutRequests.end(); ++i) {
const TGUID& gg = i->first;
const TOutRequestState& s = i->second;
bool isSync = SyncRequests.find(gg) != SyncRequests.end();
sprintf(buf, "%s\t%s %s TimePassed: %g %s\n",
GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), outReqStateNames[s.State],
s.TimePassed * 1000,
isSync ? "isSync" : "");
res += buf;
}
res += "\nIn requests:\n";
for (TInRequestHash::const_iterator i = InRequests.begin(); i != InRequests.end(); ++i) {
const TGUID& gg = i->first;
const TInRequestState& s = i->second;
sprintf(buf, "%s\t%s %s\n",
GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), inReqStateNames[s.State]);
res += buf;
}
return res;
}
TString GetDebugInfo() override {
TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::DEBUG_INFO);
ExecStatsRequest(req);
return req->DebugInfo;
}
void GetRequestQueueSize(TRequesterQueueStats* res) override {
TRequesterUserQueueSizes* qs = QueueSizes.Get();
res->ReqCount = (int)AtomicGet(qs->ReqCount);
res->RespCount = (int)AtomicGet(qs->RespCount);
res->ReqQueueSize = (int)AtomicGet(qs->ReqQueueSize);
res->RespQueueSize = (int)AtomicGet(qs->RespQueueSize);
}
TRequesterUserQueueSizes* GetQueueSizes() const {
return QueueSizes.Get();
}
IRequestOps* CreateSubRequester() override;
void EnableReportRequestCancel() override {
ReportRequestCancel = true;
}
void EnableReportSendRequestAcc() override {
ReportSendRequestAcc = true;
}
TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override {
TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_QUEUE_STATS);
req->PeerAddress = addr;
ExecStatsRequest(req);
return req->QueueStats;
}
};
//////////////////////////////////////////////////////////////////////////
static void ReadShm(TSharedMemory* shm, TVector<char>* data) {
Y_ASSERT(shm);
int dataSize = shm->GetSize();
data->yresize(dataSize);
memcpy(&(*data)[0], shm->GetPtr(), dataSize);
}
static void LoadRequestData(TUdpHttpRequest* res) {
if (!res)
return;
{
TBlockChainIterator reqData(res->DataHolder->Data->GetChain());
char pktType;
reqData.Read(&pktType, 1);
ReadArr(&reqData, &res->Url);
if (pktType == PKT_REQUEST) {
ReadYArr(&reqData, &res->Data);
} else if (pktType == PKT_LOCAL_REQUEST) {
ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data);
} else
Y_ASSERT(0);
if (reqData.HasFailed()) {
Y_ASSERT(0 && "wrong format, memory corruption suspected");
res->Url = "";
res->Data.clear();
}
}
res->DataHolder.Reset(nullptr);
}
static void LoadResponseData(TUdpHttpResponse* res) {
if (!res || res->DataHolder.Get() == nullptr)
return;
{
TBlockChainIterator reqData(res->DataHolder->Data->GetChain());
char pktType;
reqData.Read(&pktType, 1);
TGUID guid;
reqData.Read(&guid, sizeof(guid));
Y_ASSERT(res->ReqId == guid);
if (pktType == PKT_RESPONSE) {
ReadYArr(&reqData, &res->Data);
} else if (pktType == PKT_LOCAL_RESPONSE) {
ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data);
} else
Y_ASSERT(0);
if (reqData.HasFailed()) {
Y_ASSERT(0 && "wrong format, memory corruption suspected");
res->Ok = TUdpHttpResponse::FAILED;
res->Data.clear();
res->Error = "wrong response format";
}
}
res->DataHolder.Reset(nullptr);
}
//////////////////////////////////////////////////////////////////////////
// IRequestOps::TWaitResponse
TUdpHttpResponse* IRequestOps::TWaitResponse::GetResponse() {
if (!Response)
return nullptr;
TUdpHttpResponse* res = Response;
Response = nullptr;
LoadResponseData(res);
return res;
}
void IRequestOps::TWaitResponse::SetResponse(TUdpHttpResponse* r) {
Y_ASSERT(Response == nullptr || r == nullptr);
if (r)
Response = r;
CompleteEvent.Signal();
}
//////////////////////////////////////////////////////////////////////////
// TRequesterUserQueues
TUdpHttpRequest* TRequesterUserQueues::GetRequest() {
TUdpHttpRequest* res = nullptr;
ReqList.Dequeue(&res);
if (res) {
AtomicAdd(QueueSizes->ReqCount, -1);
AtomicAdd(QueueSizes->ReqQueueSize, -GetPacketSize(res->DataHolder.Get()));
}
UpdateAsyncSignalState();
LoadRequestData(res);
return res;
}
TUdpHttpResponse* TRequesterUserQueues::GetResponse() {
TUdpHttpResponse* res = nullptr;
ResponseList.Dequeue(&res);
if (res) {
AtomicAdd(QueueSizes->RespCount, -1);
AtomicAdd(QueueSizes->RespQueueSize, -GetPacketSize(res->DataHolder.Get()));
}
UpdateAsyncSignalState();
LoadResponseData(res);
return res;
}
//////////////////////////////////////////////////////////////////////////
class TRequestOps: public IRequestOps {
TIntrusivePtr<TUdpHttp> Requester;
TIntrusivePtr<TRequesterUserQueues> UserQueues;
public:
TRequestOps(TUdpHttp* req)
: Requester(req)
{
UserQueues = new TRequesterUserQueues(req->GetQueueSizes());
}
void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override {
Requester->SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get());
}
void CancelRequest(const TGUID& reqId) override {
Requester->CancelRequest(reqId);
}
void BreakRequest(const TGUID& reqId) override {
Requester->BreakRequest(reqId);
}
void SendResponse(const TGUID& reqId, TVector<char>* data) override {
Requester->SendResponseImpl(reqId, PP_NORMAL, data);
}
void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override {
Requester->SendResponseImpl(reqId, PP_LOW, data);
}
TUdpHttpRequest* GetRequest() override {
Y_ASSERT(0);
//return UserQueues.GetRequest();
return nullptr; // all requests are routed to the main requester
}
TUdpHttpResponse* GetResponse() override {
return UserQueues->GetResponse();
}
bool GetRequestCancel(TGUID*) override {
Y_ASSERT(0);
return false; // all request cancels are routed to the main requester
}
bool GetSendRequestAcc(TGUID* req) override {
return UserQueues->GetSendRequestAcc(req);
}
// sync mode
TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
return Requester->Request(addr, url, data);
}
TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
return Requester->WaitableRequest(addr, url, data);
}
//
TMuxEvent& GetAsyncEvent() override {
return UserQueues->GetAsyncEvent();
}
};
IRequestOps* TUdpHttp::CreateSubRequester() {
return new TRequestOps(this);
}
//////////////////////////////////////////////////////////////////////////
void AbortOnFailedRequest(TUdpHttpResponse* answer) {
if (answer && answer->Ok == TUdpHttpResponse::FAILED) {
fprintf(stderr, "Failed request to host %s\n", GetAddressAsString(answer->PeerAddress).data());
fprintf(stderr, "Error description: %s\n", answer->Error.data());
fflush(nullptr);
Y_ASSERT(0);
abort();
}
}
TString GetDebugInfo(const TUdpAddress& addr, double timeout) {
NHPTimer::STime start;
NHPTimer::GetTime(&start);
TIntrusivePtr<IUdpHost> host = CreateUdpHost(0);
{
TAutoPtr<TRopeDataPacket> rq = new TRopeDataPacket;
rq->Write((char)PKT_GETDEBUGINFO);
ui32 crc32 = CalcChecksum(rq->GetChain());
host->Send(addr, rq.Release(), crc32, nullptr, PP_HIGH);
}
for (;;) {
TAutoPtr<TRequest> ptr = host->GetRequest();
if (ptr.Get()) {
TBlockChainIterator reqData(ptr->Data->GetChain());
int sz = reqData.GetSize();
TString res;
res.resize(sz);
reqData.Read(res.begin(), sz);
return res;
}
host->Step();
host->Wait(0.1f);
NHPTimer::STime now;
NHPTimer::GetTime(&now);
if (NHPTimer::GetSeconds(now - start) > timeout) {
return TString();
}
}
}
void Kill(const TUdpAddress& addr) {
TIntrusivePtr<IUdpHost> host = CreateUdpHost(0);
host->Kill(addr);
}
void StopAllNetLibaThreads() {
PanicAttack = true; // AAAA!!!!
}
void SetNetLibaHeartbeatTimeout(double timeoutSec) {
NetLibaHeartbeat();
HeartbeatTimeout.store(timeoutSec, std::memory_order_release);
}
void NetLibaHeartbeat() {
NHPTimer::STime now;
NHPTimer::GetTime(&now);
LastHeartbeat.store(now, std::memory_order_release);
}
IRequester* CreateHttpUdpRequester(int port) {
if (PanicAttack)
return nullptr;
TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket();
socket->Open(port);
if (!socket->IsValid())
return nullptr;
return CreateHttpUdpRequester(socket);
}
IRequester* CreateHttpUdpRequester(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
if (PanicAttack)
return nullptr;
TIntrusivePtr<TUdpHttp> res(new TUdpHttp);
if (!res->Start(socket))
return nullptr;
return res.Release();
}
}