aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/netliba_udp_http.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/netliba_udp_http.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/netliba_udp_http.cpp')
-rw-r--r--library/cpp/neh/netliba_udp_http.cpp808
1 files changed, 808 insertions, 0 deletions
diff --git a/library/cpp/neh/netliba_udp_http.cpp b/library/cpp/neh/netliba_udp_http.cpp
new file mode 100644
index 0000000000..a4df426f02
--- /dev/null
+++ b/library/cpp/neh/netliba_udp_http.cpp
@@ -0,0 +1,808 @@
+#include "netliba_udp_http.h"
+#include "utils.h"
+
+#include <library/cpp/netliba/v6/cpu_affinity.h>
+#include <library/cpp/netliba/v6/stdafx.h>
+#include <library/cpp/netliba/v6/udp_client_server.h>
+#include <library/cpp/netliba/v6/udp_socket.h>
+
+#include <library/cpp/netliba/v6/block_chain.h> // depend on another headers
+
+#include <util/system/hp_timer.h>
+#include <util/system/shmat.h>
+#include <util/system/spinlock.h>
+#include <util/system/thread.h>
+#include <util/system/types.h>
+#include <util/system/yassert.h>
+#include <util/thread/lfqueue.h>
+
+#include <atomic>
+
+#if !defined(_win_)
+#include <signal.h>
+#include <pthread.h>
+#endif
+
+using namespace NNetliba;
+
+namespace {
+ const float HTTP_TIMEOUT = 15.0f;
+ const size_t MIN_SHARED_MEM_PACKET = 1000;
+ const size_t MAX_PACKET_SIZE = 0x70000000;
+
+ NNeh::TAtomicBool PanicAttack;
+ std::atomic<NHPTimer::STime> LastHeartbeat;
+ std::atomic<double> HeartbeatTimeout;
+
+ bool IsLocal(const TUdpAddress& addr) {
+ return addr.IsIPv4() ? IsLocalIPv4(addr.GetIPv4()) : IsLocalIPv6(addr.Network, addr.Interface);
+ }
+
+ void StopAllNetLibaThreads() {
+ PanicAttack = true; // AAAA!!!!
+ }
+
+ void ReadShm(TSharedMemory* shm, TVector<char>* data) {
+ Y_ASSERT(shm);
+ int dataSize = shm->GetSize();
+ data->yresize(dataSize);
+ memcpy(&(*data)[0], shm->GetPtr(), dataSize);
+ }
+
+ void ReadShm(TSharedMemory* shm, TString* data) {
+ Y_ASSERT(shm);
+ size_t dataSize = shm->GetSize();
+ data->ReserveAndResize(dataSize);
+ memcpy(data->begin(), shm->GetPtr(), dataSize);
+ }
+
+ template <class T>
+ void EraseList(TLockFreeQueue<T*>* data) {
+ T* ptr = 0;
+ while (data->Dequeue(&ptr)) {
+ delete ptr;
+ }
+ }
+
+ enum EHttpPacket {
+ PKT_REQUEST,
+ PKT_PING,
+ PKT_PING_RESPONSE,
+ PKT_RESPONSE,
+ PKT_LOCAL_REQUEST,
+ PKT_LOCAL_RESPONSE,
+ PKT_CANCEL,
+ };
+}
+
+namespace NNehNetliba {
+ TUdpHttpMessage::TUdpHttpMessage(const TGUID& reqId, const TUdpAddress& peerAddr)
+ : ReqId(reqId)
+ , PeerAddress(peerAddr)
+ {
+ }
+
+ TUdpHttpRequest::TUdpHttpRequest(TAutoPtr<TRequest>& dataHolder, const TGUID& reqId, const TUdpAddress& peerAddr)
+ : TUdpHttpMessage(reqId, peerAddr)
+ {
+ TBlockChainIterator reqData(dataHolder->Data->GetChain());
+ char pktType;
+ reqData.Read(&pktType, 1);
+ ReadArr(&reqData, &Url);
+ if (pktType == PKT_REQUEST) {
+ ReadYArr(&reqData, &Data);
+ } else if (pktType == PKT_LOCAL_REQUEST) {
+ ReadShm(dataHolder->Data->GetSharedData(), &Data);
+ } else {
+ Y_ASSERT(0);
+ }
+
+ if (reqData.HasFailed()) {
+ Y_ASSERT(0 && "wrong format, memory corruption suspected");
+ Url = "";
+ Data.clear();
+ }
+ }
+
+ TUdpHttpResponse::TUdpHttpResponse(TAutoPtr<TRequest>& dataHolder, const TGUID& reqId, const TUdpAddress& peerAddr, EResult result, const char* error)
+ : TUdpHttpMessage(reqId, peerAddr)
+ , Ok(result)
+ {
+ if (result == TUdpHttpResponse::FAILED) {
+ Error = error ? error : "request failed";
+ } else if (result == TUdpHttpResponse::CANCELED) {
+ Error = error ? error : "request cancelled";
+ } else {
+ TBlockChainIterator reqData(dataHolder->Data->GetChain());
+ if (Y_UNLIKELY(reqData.HasFailed())) {
+ Y_ASSERT(0 && "wrong format, memory corruption suspected");
+ Ok = TUdpHttpResponse::FAILED;
+ Data.clear();
+ Error = "wrong response format";
+ } else {
+ char pktType;
+ reqData.Read(&pktType, 1);
+ TGUID guid;
+ reqData.Read(&guid, sizeof(guid));
+ Y_ASSERT(ReqId == guid);
+ if (pktType == PKT_RESPONSE) {
+ ReadArr<TString>(&reqData, &Data);
+ } else if (pktType == PKT_LOCAL_RESPONSE) {
+ ReadShm(dataHolder->Data->GetSharedData(), &Data);
+ } else {
+ Y_ASSERT(0);
+ }
+ }
+ }
+ }
+
+ class TUdpHttp: public IRequester {
+ enum EDir {
+ DIR_OUT,
+ DIR_IN
+ };
+
+ struct TInRequestState {
+ enum EState {
+ S_WAITING,
+ S_RESPONSE_SENDING,
+ S_CANCELED,
+ };
+
+ TInRequestState()
+ : State(S_WAITING)
+ {
+ }
+
+ TInRequestState(const TUdpAddress& address)
+ : State(S_WAITING)
+ , Address(address)
+ {
+ }
+
+ EState State;
+ TUdpAddress Address;
+ };
+
+ struct TOutRequestState {
+ enum EState {
+ S_SENDING,
+ S_WAITING,
+ S_WAITING_PING_SENDING,
+ S_WAITING_PING_SENT,
+ S_CANCEL_AFTER_SENDING
+ };
+
+ TOutRequestState()
+ : State(S_SENDING)
+ , TimePassed(0)
+ , PingTransferId(-1)
+ {
+ }
+
+ EState State;
+ TUdpAddress Address;
+ double TimePassed;
+ int PingTransferId;
+ IEventsCollectorRef EventsCollector;
+ };
+
+ struct TTransferPurpose {
+ EDir Dir;
+ TGUID Guid;
+
+ TTransferPurpose()
+ : Dir(DIR_OUT)
+ {
+ }
+
+ TTransferPurpose(EDir dir, TGUID guid)
+ : Dir(dir)
+ , Guid(guid)
+ {
+ }
+ };
+
+ struct TSendRequest {
+ TSendRequest() = default;
+
+ TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqGuid, const IEventsCollectorRef& eventsCollector)
+ : Addr(addr)
+ , Data(*data)
+ , ReqGuid(reqGuid)
+ , EventsCollector(eventsCollector)
+ , Crc32(CalcChecksum(Data->GetChain()))
+ {
+ }
+
+ TUdpAddress Addr;
+ TAutoPtr<TRopeDataPacket> Data;
+ TGUID ReqGuid;
+ IEventsCollectorRef EventsCollector;
+ ui32 Crc32;
+ };
+
+ struct TSendResponse {
+ TSendResponse() = default;
+
+ TSendResponse(const TGUID& reqGuid, EPacketPriority prior, TVector<char>* data)
+ : ReqGuid(reqGuid)
+ , DataCrc32(0)
+ , Priority(prior)
+ {
+ if (data && !data->empty()) {
+ data->swap(Data);
+ DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize());
+ }
+ }
+
+ TVector<char> Data;
+ TGUID ReqGuid;
+ ui32 DataCrc32;
+ EPacketPriority Priority;
+ };
+
+ typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash;
+ typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash;
+
+ public:
+ TUdpHttp(const IEventsCollectorRef& eventsCollector)
+ : MyThread_(ExecServerThread, (void*)this)
+ , AbortTransactions_(false)
+ , Port_(0)
+ , EventCollector_(eventsCollector)
+ , ReportRequestCancel_(false)
+ , ReporRequestAck_(false)
+ , PhysicalCpu_(-1)
+ {
+ }
+
+ ~TUdpHttp() override {
+ if (MyThread_.Running()) {
+ AtomicSet(KeepRunning_, 0);
+ MyThread_.Join();
+ }
+ }
+
+ bool Start(int port, int physicalCpu) {
+ Y_ASSERT(Host_.Get() == nullptr);
+ Port_ = port;
+ PhysicalCpu_ = physicalCpu;
+ MyThread_.Start();
+ HasStarted_.Wait();
+ return Host_.Get() != nullptr;
+ }
+
+ void EnableReportRequestCancel() override {
+ ReportRequestCancel_ = true;
+ }
+
+ void EnableReportRequestAck() override {
+ ReporRequestAck_ = true;
+ }
+
+ void SendRequest(const TUdpAddress& addr, const TString& url, const TString& data, const TGUID& reqId) override {
+ Y_VERIFY(
+ data.size() < MAX_PACKET_SIZE,
+ "data size is too large; data.size()=%" PRISZT ", MAX_PACKET_SIZE=%" PRISZT,
+ data.size(), MAX_PACKET_SIZE);
+
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ if (data.size() > MIN_SHARED_MEM_PACKET && IsLocal(addr)) {
+ TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
+ if (shm->Create(data.size())) {
+ ms->Write((char)PKT_LOCAL_REQUEST);
+ ms->WriteStroka(url);
+ memcpy(shm->GetPtr(), data.begin(), data.size());
+ ms->AttachSharedData(shm);
+ }
+ }
+ if (ms->GetSharedData() == nullptr) {
+ ms->Write((char)PKT_REQUEST);
+ ms->WriteStroka(url);
+ struct TStrokaStorage: public TThrRefBase, public TString {
+ TStrokaStorage(const TString& s)
+ : TString(s)
+ {
+ }
+ };
+ TStrokaStorage* ss = new TStrokaStorage(data);
+ ms->Write((int)ss->size());
+ ms->AddBlock(ss, ss->begin(), ss->size());
+ }
+
+ SendReqList_.Enqueue(new TSendRequest(addr, &ms, reqId, EventCollector_));
+ Host_->CancelWait();
+ }
+
+ void CancelRequest(const TGUID& reqId) override {
+ CancelReqList_.Enqueue(reqId);
+ Host_->CancelWait();
+ }
+
+ void SendResponse(const TGUID& reqId, TVector<char>* data) override {
+ if (data && data->size() > MAX_PACKET_SIZE) {
+ Y_FAIL(
+ "data size is too large; data->size()=%" PRISZT ", MAX_PACKET_SIZE=%" PRISZT,
+ data->size(), MAX_PACKET_SIZE);
+ }
+ SendRespList_.Enqueue(new TSendResponse(reqId, PP_NORMAL, data));
+ Host_->CancelWait();
+ }
+
+ void StopNoWait() override {
+ AbortTransactions_ = true;
+ AtomicSet(KeepRunning_, 0);
+ // calcel all outgoing requests
+ TGuard<TSpinLock> lock(Spn_);
+ while (!OutRequests_.empty()) {
+ // cancel without informing peer that we are cancelling the request
+ FinishRequest(OutRequests_.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()");
+ }
+ }
+
+ private:
+ void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TRequestPtr data, const char* error = nullptr) {
+ TOutRequestState& s = i->second;
+ s.EventsCollector->AddResponse(new TUdpHttpResponse(data, i->first, s.Address, ok, error));
+ OutRequests_.erase(i);
+ }
+
+ int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) {
+ ui32 crc32 = CalcChecksum(data->GetChain());
+ return Host_->Send(addr, data.Release(), crc32, nullptr, PP_HIGH);
+ }
+
+ void ProcessIncomingPackets() {
+ TVector<TGUID> failedRequests;
+ for (;;) {
+ TAutoPtr<TRequest> req = Host_->GetRequest();
+ if (req.Get() == nullptr)
+ break;
+
+ TBlockChainIterator reqData(req->Data->GetChain());
+ char pktType;
+ reqData.Read(&pktType, 1);
+ switch (pktType) {
+ case PKT_REQUEST:
+ case PKT_LOCAL_REQUEST: {
+ TGUID reqId = req->Guid;
+ TInRequestHash::iterator z = InRequests_.find(reqId);
+ if (z != InRequests_.end()) {
+ // oops, this request already exists!
+ // might happen if request can be stored in single packet
+ // and this packet had source IP broken during transmission and managed to pass crc checks
+ // since we already reported wrong source address for this request to the user
+ // the best thing we can do is to stop the program to avoid further complications
+ // but we just report the accident to stderr
+ fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n",
+ GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(),
+ GetAddressAsString(req->Address).c_str());
+ } else {
+ InRequests_[reqId] = TInRequestState(req->Address);
+ EventCollector_->AddRequest(new TUdpHttpRequest(req, reqId, req->Address));
+ }
+ } break;
+ case PKT_PING: {
+ TGUID guid;
+ reqData.Read(&guid, sizeof(guid));
+ bool ok = InRequests_.find(guid) != InRequests_.end();
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ms->Write((char)PKT_PING_RESPONSE);
+ ms->Write(guid);
+ ms->Write(ok);
+ SendWithHighPriority(req->Address, ms.Release());
+ } break;
+ case PKT_PING_RESPONSE: {
+ TGUID guid;
+ bool ok;
+ reqData.Read(&guid, sizeof(guid));
+ reqData.Read(&ok, sizeof(ok));
+ TOutRequestHash::iterator i = OutRequests_.find(guid);
+ if (i == OutRequests_.end())
+ ; //Y_ASSERT(0); // actually possible with some packet orders
+ else {
+ if (!ok) {
+ // can not delete request at this point
+ // since we can receive failed ping and response at the same moment
+ // consider sequence: client sends ping, server sends response
+ // and replies false to ping as reply is sent
+ // we can not receive failed ping_response earlier then response itself
+ // but we can receive them simultaneously
+ failedRequests.push_back(guid);
+ } else {
+ TOutRequestState& s = i->second;
+ switch (s.State) {
+ case TOutRequestState::S_WAITING_PING_SENDING: {
+ Y_ASSERT(s.PingTransferId >= 0);
+ TTransferHash::iterator k = TransferHash_.find(s.PingTransferId);
+ if (k != TransferHash_.end())
+ TransferHash_.erase(k);
+ else
+ Y_ASSERT(0);
+ s.PingTransferId = -1;
+ s.TimePassed = 0;
+ s.State = TOutRequestState::S_WAITING;
+ } break;
+ case TOutRequestState::S_WAITING_PING_SENT:
+ s.TimePassed = 0;
+ s.State = TOutRequestState::S_WAITING;
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ }
+ }
+ } break;
+ case PKT_RESPONSE:
+ case PKT_LOCAL_RESPONSE: {
+ TGUID guid;
+ reqData.Read(&guid, sizeof(guid));
+ TOutRequestHash::iterator i = OutRequests_.find(guid);
+ if (i == OutRequests_.end()) {
+ ; //Y_ASSERT(0); // does happen
+ } else {
+ FinishRequest(i, TUdpHttpResponse::OK, req);
+ }
+ } break;
+ case PKT_CANCEL: {
+ TGUID guid;
+ reqData.Read(&guid, sizeof(guid));
+ TInRequestHash::iterator i = InRequests_.find(guid);
+ if (i == InRequests_.end()) {
+ ; //Y_ASSERT(0); // may happen
+ } else {
+ TInRequestState& s = i->second;
+ if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel_)
+ EventCollector_->AddCancel(guid);
+ s.State = TInRequestState::S_CANCELED;
+ }
+ } break;
+ default:
+ Y_ASSERT(0);
+ }
+ }
+ // cleanup failed requests
+ for (size_t k = 0; k < failedRequests.size(); ++k) {
+ const TGUID& guid = failedRequests[k];
+ TOutRequestHash::iterator i = OutRequests_.find(guid);
+ if (i != OutRequests_.end())
+ FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "failed udp ping");
+ }
+ }
+
+ void AnalyzeSendResults() {
+ TSendResult res;
+ while (Host_->GetSendResult(&res)) {
+ TTransferHash::iterator k = TransferHash_.find(res.TransferId);
+ if (k != TransferHash_.end()) {
+ const TTransferPurpose& tp = k->second;
+ switch (tp.Dir) {
+ case DIR_OUT: {
+ TOutRequestHash::iterator i = OutRequests_.find(tp.Guid);
+ if (i != OutRequests_.end()) {
+ const TGUID& reqId = i->first;
+ TOutRequestState& s = i->second;
+ switch (s.State) {
+ case TOutRequestState::S_SENDING:
+ if (!res.Success) {
+ FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING");
+ } else {
+ if (ReporRequestAck_ && !!s.EventsCollector) {
+ s.EventsCollector->AddRequestAck(reqId);
+ }
+ s.State = TOutRequestState::S_WAITING;
+ s.TimePassed = 0;
+ }
+ break;
+ case TOutRequestState::S_CANCEL_AFTER_SENDING:
+ DoSendCancel(s.Address, reqId);
+ FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING");
+ break;
+ case TOutRequestState::S_WAITING:
+ case TOutRequestState::S_WAITING_PING_SENT:
+ Y_ASSERT(0);
+ break;
+ case TOutRequestState::S_WAITING_PING_SENDING:
+ Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId);
+ if (!res.Success) {
+ FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING");
+ } else {
+ s.PingTransferId = -1;
+ s.State = TOutRequestState::S_WAITING_PING_SENT;
+ s.TimePassed = 0;
+ }
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ }
+ } break;
+ case DIR_IN: {
+ TInRequestHash::iterator i = InRequests_.find(tp.Guid);
+ if (i != InRequests_.end()) {
+ Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED);
+ InRequests_.erase(i);
+ }
+ } break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ TransferHash_.erase(k);
+ }
+ }
+ }
+
+ void SendPingsIfNeeded() {
+ NHPTimer::STime tChk = PingsSendT_;
+ float deltaT = (float)NHPTimer::GetTimePassed(&tChk);
+ if (deltaT < 0.05) {
+ return;
+ }
+ PingsSendT_ = tChk;
+ deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3);
+
+ {
+ for (TOutRequestHash::iterator i = OutRequests_.begin(); i != OutRequests_.end();) {
+ TOutRequestHash::iterator curIt = i++;
+ TOutRequestState& s = curIt->second;
+ const TGUID& guid = curIt->first;
+ switch (s.State) {
+ case TOutRequestState::S_WAITING:
+ s.TimePassed += deltaT;
+ if (s.TimePassed > HTTP_TIMEOUT) {
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ms->Write((char)PKT_PING);
+ ms->Write(guid);
+ int transId = SendWithHighPriority(s.Address, ms.Release());
+ TransferHash_[transId] = TTransferPurpose(DIR_OUT, guid);
+ s.State = TOutRequestState::S_WAITING_PING_SENDING;
+ s.PingTransferId = transId;
+ }
+ break;
+ case TOutRequestState::S_WAITING_PING_SENT:
+ s.TimePassed += deltaT;
+ if (s.TimePassed > HTTP_TIMEOUT) {
+ FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT");
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+
+ void Step() {
+ {
+ TGuard<TSpinLock> lock(Spn_);
+ DoSends();
+ }
+ Host_->Step();
+ {
+ TGuard<TSpinLock> lock(Spn_);
+ DoSends();
+ ProcessIncomingPackets();
+ AnalyzeSendResults();
+ SendPingsIfNeeded();
+ }
+ }
+
+ void Wait() {
+ Host_->Wait(0.1f);
+ }
+
+ void DoSendCancel(const TUdpAddress& addr, const TGUID& req) {
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ms->Write((char)PKT_CANCEL);
+ ms->Write(req);
+ SendWithHighPriority(addr, ms);
+ }
+
+ void DoSends() {
+ {
+ // cancelling requests
+ TGUID reqGuid;
+ while (CancelReqList_.Dequeue(&reqGuid)) {
+ TOutRequestHash::iterator i = OutRequests_.find(reqGuid);
+ if (i == OutRequests_.end()) {
+ AnticipateCancels_.insert(reqGuid);
+ continue; // cancelling non existing request is ok
+ }
+ TOutRequestState& s = i->second;
+ if (s.State == TOutRequestState::S_SENDING) {
+ // we are in trouble - have not sent request and we already have to cancel it, wait send
+ s.State = TOutRequestState::S_CANCEL_AFTER_SENDING;
+ s.EventsCollector->AddCancel(i->first);
+ } else {
+ DoSendCancel(s.Address, reqGuid);
+ FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side");
+ }
+ }
+ }
+ {
+ // sending replies
+ for (TSendResponse* rd = nullptr; SendRespList_.Dequeue(&rd); delete rd) {
+ TInRequestHash::iterator i = InRequests_.find(rd->ReqGuid);
+ if (i == InRequests_.end()) {
+ Y_ASSERT(0);
+ continue;
+ }
+ TInRequestState& s = i->second;
+ if (s.State == TInRequestState::S_CANCELED) {
+ // need not send response for the canceled request
+ InRequests_.erase(i);
+ continue;
+ }
+
+ Y_ASSERT(s.State == TInRequestState::S_WAITING);
+ s.State = TInRequestState::S_RESPONSE_SENDING;
+
+ TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
+ ui32 crc32 = 0;
+ int dataSize = rd->Data.ysize();
+ if (rd->Data.size() > MIN_SHARED_MEM_PACKET && IsLocal(s.Address)) {
+ TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
+ if (shm->Create(dataSize)) {
+ ms->Write((char)PKT_LOCAL_RESPONSE);
+ ms->Write(rd->ReqGuid);
+ memcpy(shm->GetPtr(), &rd->Data[0], dataSize);
+ TVector<char> empty;
+ rd->Data.swap(empty);
+ ms->AttachSharedData(shm);
+ crc32 = CalcChecksum(ms->GetChain());
+ }
+ }
+ if (ms->GetSharedData() == nullptr) {
+ ms->Write((char)PKT_RESPONSE);
+ ms->Write(rd->ReqGuid);
+
+ // to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32
+ // this means that we are calculating crc when shared memory is used
+ // it is hard to avoid since in SendResponse() we don't know if shared mem will be used
+ // (peer address is not available there)
+ TIncrementalChecksumCalcer csCalcer;
+ AddChain(&csCalcer, ms->GetChain());
+ // here we are replicating the way WriteDestructive serializes data
+ csCalcer.AddBlock(&dataSize, sizeof(dataSize));
+ csCalcer.AddBlockSum(rd->DataCrc32, dataSize);
+ crc32 = csCalcer.CalcChecksum();
+
+ ms->WriteDestructive(&rd->Data);
+ //ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses
+ //Y_ASSERT(chkCrc == crc32);
+ }
+
+ int transId = Host_->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority);
+ TransferHash_[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid);
+ }
+ }
+ {
+ // sending requests
+ for (TSendRequest* rd = nullptr; SendReqList_.Dequeue(&rd); delete rd) {
+ Y_ASSERT(OutRequests_.find(rd->ReqGuid) == OutRequests_.end());
+
+ {
+ TOutRequestState& s = OutRequests_[rd->ReqGuid];
+ s.State = TOutRequestState::S_SENDING;
+ s.Address = rd->Addr;
+ s.EventsCollector = rd->EventsCollector;
+ }
+
+ if (AnticipateCancels_.find(rd->ReqGuid) != AnticipateCancels_.end()) {
+ FinishRequest(OutRequests_.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "Canceled (before transmit)");
+ } else {
+ TGUID pktGuid = rd->ReqGuid; // request packet id should match request id
+ int transId = Host_->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL);
+ TransferHash_[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid);
+ }
+ }
+ }
+ if (!AnticipateCancels_.empty()) {
+ AnticipateCancels_.clear();
+ }
+ }
+
+ void FinishOutstandingTransactions() {
+ // wait all pending requests, all new requests are canceled
+ while ((!OutRequests_.empty() || !InRequests_.empty() || !SendRespList_.IsEmpty() || !SendReqList_.IsEmpty()) && !PanicAttack) {
+ Step();
+ sleep(0);
+ }
+ }
+
+ static void* ExecServerThread(void* param) {
+ TUdpHttp* pThis = (TUdpHttp*)param;
+ if (pThis->GetPhysicalCpu() >= 0) {
+ BindToSocket(pThis->GetPhysicalCpu());
+ }
+ SetHighestThreadPriority();
+
+ TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket();
+ socket->Open(pThis->Port_);
+ if (socket->IsValid()) {
+ pThis->Port_ = socket->GetPort();
+ pThis->Host_ = CreateUdpHost(socket);
+ } else {
+ pThis->Host_ = nullptr;
+ }
+
+ pThis->HasStarted_.Signal();
+ if (!pThis->Host_)
+ return nullptr;
+
+ NHPTimer::GetTime(&pThis->PingsSendT_);
+ while (AtomicGet(pThis->KeepRunning_) && !PanicAttack) {
+ if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) {
+ NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire);
+ if (NHPTimer::GetTimePassed(&chk) > HeartbeatTimeout.load(std::memory_order_acquire)) {
+ StopAllNetLibaThreads();
+#ifndef _win_
+ killpg(0, SIGKILL);
+#endif
+ abort();
+ break;
+ }
+ }
+ pThis->Step();
+ pThis->Wait();
+ }
+ if (!pThis->AbortTransactions_ && !PanicAttack) {
+ pThis->FinishOutstandingTransactions();
+ }
+ pThis->Host_ = nullptr;
+ return nullptr;
+ }
+
+ int GetPhysicalCpu() const noexcept {
+ return PhysicalCpu_;
+ }
+
+ private:
+ TThread MyThread_;
+ TAtomic KeepRunning_ = 1;
+ bool AbortTransactions_;
+ TSpinLock Spn_;
+ TSystemEvent HasStarted_;
+
+ NHPTimer::STime PingsSendT_;
+
+ TIntrusivePtr<IUdpHost> Host_;
+ int Port_;
+ TOutRequestHash OutRequests_;
+ TInRequestHash InRequests_;
+
+ typedef THashMap<int, TTransferPurpose> TTransferHash;
+ TTransferHash TransferHash_;
+
+ // hold it here to not construct on every DoSends()
+ typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels;
+ TAnticipateCancels AnticipateCancels_;
+
+ TLockFreeQueue<TSendRequest*> SendReqList_;
+ TLockFreeQueue<TSendResponse*> SendRespList_;
+ TLockFreeQueue<TGUID> CancelReqList_;
+
+ TIntrusivePtr<IEventsCollector> EventCollector_;
+
+ bool ReportRequestCancel_;
+ bool ReporRequestAck_;
+ int PhysicalCpu_;
+ };
+
+ IRequesterRef CreateHttpUdpRequester(int port, const IEventsCollectorRef& ec, int physicalCpu) {
+ TUdpHttp* udpHttp = new TUdpHttp(ec);
+ IRequesterRef res(udpHttp);
+ if (!udpHttp->Start(port, physicalCpu)) {
+ if (port) {
+ ythrow yexception() << "netliba can't bind port=" << port;
+ } else {
+ ythrow yexception() << "netliba can't bind random port";
+ }
+ }
+ return res;
+ }
+}