diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/udp_client_server.cpp | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/udp_client_server.cpp')
-rw-r--r-- | library/cpp/netliba/v6/udp_client_server.cpp | 1321 |
1 files changed, 1321 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/udp_client_server.cpp b/library/cpp/netliba/v6/udp_client_server.cpp new file mode 100644 index 0000000000..3eaf6e5e96 --- /dev/null +++ b/library/cpp/netliba/v6/udp_client_server.cpp @@ -0,0 +1,1321 @@ +#include "stdafx.h" +#include "udp_client_server.h" +#include "net_acks.h" +#include <util/generic/guid.h> +#include <util/system/hp_timer.h> +#include <util/datetime/cputimer.h> +#include <util/system/yield.h> +#include <util/system/unaligned_mem.h> +#include "block_chain.h" +#include <util/system/shmat.h> +#include "udp_debug.h" +#include "udp_socket.h" +#include "ib_cs.h" + +#include <library/cpp/netliba/socket/socket.h> + +#include <util/random/random.h> +#include <util/system/sanitizers.h> + +#include <atomic> + +namespace NNetliba { + // rely on UDP checksum in packets, check crc only for complete packets + // UPDATE: looks like UDP checksum is not enough, network errors do happen, we saw 600+ retransmits of a ~1MB data packet + + const float UDP_TRANSFER_TIMEOUT = 90.0f; + const float DEFAULT_MAX_WAIT_TIME = 1; + const float UDP_KEEP_PEER_INFO = 600; + // траффик может идти, а новых данных для конкретного пакета может не добавляться. + // это возможно когда мы прерываем процесс в момент передачи и перезапускаем его на том же порту, + // тогда на приемнике повиснет пакет. Этот пакет мы зашибем по этому таймауту + const float UDP_MAX_INPUT_DATA_WAIT = UDP_TRANSFER_TIMEOUT * 2; + + enum { + UDP_PACKET_SIZE_FULL = 8900, // used for ping to detect jumbo-frame support + UDP_PACKET_SIZE = 8800, // max data in packet + UDP_PACKET_SIZE_SMALL = 1350, // 1180 would be better taking into account that 1280 is guaranteed ipv6 minimum MTU + UDP_PACKET_BUF_SIZE = UDP_PACKET_SIZE + 100, + }; + + ////////////////////////////////////////////////////////////////////////// + struct TUdpCompleteInTransfer { + TGUID PacketGuid; + }; + + ////////////////////////////////////////////////////////////////////////// + struct TUdpRecvPacket { + int DataStart, DataSize; + ui32 BlockSum; + // Data[] should be last member in struct, this fact is used to create truncated TUdpRecvPacket in CreateNewSmallPacket() + char Data[UDP_PACKET_BUF_SIZE]; + }; + + struct TUdpInTransfer { + private: + TVector<TUdpRecvPacket*> Packets; + + public: + sockaddr_in6 ToAddress; + int PacketSize, LastPacketSize; + bool HasLastPacket; + TVector<int> NewPacketsToAck; + TCongestionControlPtr Congestion; + float TimeSinceLastRecv; + int Attempt; + TGUID PacketGuid; + int Crc32; + TIntrusivePtr<TSharedMemory> SharedData; + TRequesterPendingDataStats* Stats; + + TUdpInTransfer() + : PacketSize(0) + , LastPacketSize(0) + , HasLastPacket(false) + , TimeSinceLastRecv(0) + , Attempt(0) + , Crc32(0) + , Stats(nullptr) + { + Zero(ToAddress); + } + ~TUdpInTransfer() { + if (Stats) { + Stats->InpCount -= 1; + } + EraseAllPackets(); + } + void EraseAllPackets() { + for (int i = 0; i < Packets.ysize(); ++i) { + ErasePacket(i); + } + Packets.clear(); + HasLastPacket = false; + } + void AttachStats(TRequesterPendingDataStats* stats) { + Stats = stats; + Stats->InpCount += 1; + Y_ASSERT(Packets.empty()); + } + void ErasePacket(int id) { + TUdpRecvPacket* pkt = Packets[id]; + if (pkt) { + if (Stats) { + Stats->InpDataSize -= PacketSize; + } + TRopeDataPacket::FreeBuf((char*)pkt); + Packets[id] = nullptr; + } + } + void AssignPacket(int id, TUdpRecvPacket* pkt) { + ErasePacket(id); + if (pkt && Stats) { + Stats->InpDataSize += PacketSize; + } + Packets[id] = pkt; + } + int GetPacketCount() const { + return Packets.ysize(); + } + void SetPacketCount(int n) { + Packets.resize(n, nullptr); + } + const TUdpRecvPacket* GetPacket(int id) const { + return Packets[id]; + } + TUdpRecvPacket* ExtractPacket(int id) { + TUdpRecvPacket* res = Packets[id]; + if (res) { + if (Stats) { + Stats->InpDataSize -= PacketSize; + } + Packets[id] = nullptr; + } + return res; + } + }; + + struct TUdpOutTransfer { + sockaddr_in6 ToAddress; + TAutoPtr<TRopeDataPacket> Data; + int PacketCount; + int PacketSize, LastPacketSize; + TAckTracker AckTracker; + int Attempt; + TGUID PacketGuid; + int Crc32; + EPacketPriority PacketPriority; + TRequesterPendingDataStats* Stats; + + TUdpOutTransfer() + : PacketCount(0) + , PacketSize(0) + , LastPacketSize(0) + , Attempt(0) + , Crc32(0) + , PacketPriority(PP_LOW) + , Stats(nullptr) + { + Zero(ToAddress); + } + ~TUdpOutTransfer() { + if (Stats) { + Stats->OutCount -= 1; + Stats->OutDataSize -= Data->GetSize(); + } + } + void AttachStats(TRequesterPendingDataStats* stats) { + Stats = stats; + Stats->OutCount += 1; + Stats->OutDataSize += Data->GetSize(); + } + }; + + struct TTransferKey { + TUdpAddress Address; + int Id; + }; + inline bool operator==(const TTransferKey& a, const TTransferKey& b) { + return a.Address == b.Address && a.Id == b.Id; + } + struct TTransferKeyHash { + int operator()(const TTransferKey& k) const { + return (ui32)k.Address.Interface + (ui32)k.Address.Port * (ui32)389461 + (ui32)k.Id; + } + }; + + struct TUdpAddressHash { + int operator()(const TUdpAddress& addr) const { + return (ui32)addr.Interface + (ui32)addr.Port * (ui32)389461; + } + }; + + class TUdpHostRevBufAlloc: public TNonCopyable { + TUdpRecvPacket* RecvPktBuf; + + void AllocNewBuf() { + RecvPktBuf = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(sizeof(TUdpRecvPacket)); + } + + public: + TUdpHostRevBufAlloc() { + AllocNewBuf(); + } + ~TUdpHostRevBufAlloc() { + FreeBuf(RecvPktBuf); + } + void FreeBuf(TUdpRecvPacket* pkt) { + TRopeDataPacket::FreeBuf((char*)pkt); + } + TUdpRecvPacket* ExtractPacket() { + TUdpRecvPacket* res = RecvPktBuf; + AllocNewBuf(); + return res; + } + TUdpRecvPacket* CreateNewSmallPacket(int sz) { + int pktStructSz = sizeof(TUdpRecvPacket) - Y_ARRAY_SIZE(RecvPktBuf->Data) + sz; + TUdpRecvPacket* pkt = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(pktStructSz); + return pkt; + } + int GetBufSize() const { + return Y_ARRAY_SIZE(RecvPktBuf->Data); + } + char* GetDataPtr() const { + return RecvPktBuf->Data; + } + }; + + static TAtomic transferIdCounter = (long)(GetCycleCount() & 0x1fffffff); + inline int GetTransferId() { + int res = AtomicAdd(transferIdCounter, 1); + while (res < 0) { + // negative transfer ids are treated as errors, so wrap transfer id + AtomicCas(&transferIdCounter, 0, transferIdCounter); + res = AtomicAdd(transferIdCounter, 1); + } + return res; + } + + static bool IBDetection = true; + class TUdpHost: public IUdpHost { + struct TPeerLink { + TIntrusivePtr<TCongestionControl> UdpCongestion; + TIntrusivePtr<IIBPeer> IBPeer; + double TimeNoActiveTransfers; + + TPeerLink() + : TimeNoActiveTransfers(0) + { + } + bool Update(float deltaT, const TUdpAddress& toAddress, float* maxWaitTime) { + bool updateOk = UdpCongestion->UpdateAlive(toAddress, deltaT, UDP_TRANSFER_TIMEOUT, maxWaitTime); + return updateOk; + } + void StartSleep(const TUdpAddress& toAddress, float* maxWaitTime) { + //printf("peer_link start sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount()); + UdpCongestion->UpdateAlive(toAddress, 0, UDP_TRANSFER_TIMEOUT, maxWaitTime); + UdpCongestion->MarkAlive(); + TimeNoActiveTransfers = 0; + } + bool UpdateSleep(float deltaT) { + TimeNoActiveTransfers += deltaT; + if (IBPeer.Get()) { + //printf("peer_link update sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount()); + if (IBPeer->GetState() == IIBPeer::OK) { + return true; + } + //printf("Drop broken IB connection\n"); + IBPeer = nullptr; + } + return (TimeNoActiveTransfers < UDP_KEEP_PEER_INFO); + } + }; + + TNetSocket s; + typedef THashMap<TTransferKey, TUdpInTransfer, TTransferKeyHash> TUdpInXferHash; + typedef THashMap<TTransferKey, TUdpOutTransfer, TTransferKeyHash> TUdpOutXferHash; + // congestion control per peer + typedef THashMap<TUdpAddress, TPeerLink, TUdpAddressHash> TPeerLinkHash; + typedef THashMap<TTransferKey, TUdpCompleteInTransfer, TTransferKeyHash> TUdpCompleteInXferHash; + typedef THashMap<TUdpAddress, TIntrusivePtr<TPeerQueueStats>, TUdpAddressHash> TQueueStatsHash; + TUdpInXferHash RecvQueue; + TUdpCompleteInXferHash RecvCompleted; + TUdpOutXferHash SendQueue; + TPeerLinkHash CongestionTrack, CongestionTrackHistory; + TList<TRequest*> ReceivedList; + NHPTimer::STime CurrentT; + TList<TSendResult> SendResults; + TList<TTransferKey> SendOrderLow, SendOrder, SendOrderHighPrior; + TAtomic IsWaiting; + float MaxWaitTime; + std::atomic<float> MaxWaitTime2; + float IBIdleTime; + TVector<TTransferKey> RecvCompletedQueue, KeepCompletedQueue; + float TimeSinceCompletedQueueClean, TimeSinceCongestionHistoryUpdate; + TRequesterPendingDataStats PendingDataStats; + TQueueStatsHash PeerQueueStats; + TIntrusivePtr<IIBClientServer> IB; + typedef THashMap<TIBMsgHandle, TTransferKey> TIBtoTransferKeyHash; + TIBtoTransferKeyHash IBKeyToTransferKey; + + char PktBuf[UDP_PACKET_BUF_SIZE]; + TUdpHostRevBufAlloc RecvBuf; + + TPeerLink& GetPeerLink(const TUdpAddress& ip) { + TPeerLinkHash::iterator z = CongestionTrack.find(ip); + if (z == CongestionTrack.end()) { + z = CongestionTrackHistory.find(ip); + if (z == CongestionTrackHistory.end()) { + TPeerLink& res = CongestionTrack[ip]; + Y_ASSERT(res.UdpCongestion.Get() == nullptr); + res.UdpCongestion = new TCongestionControl; + TQueueStatsHash::iterator zq = PeerQueueStats.find(ip); + if (zq != PeerQueueStats.end()) { + res.UdpCongestion->AttachQueueStats(zq->second); + } + return res; + } else { + TPeerLink& res = CongestionTrack[z->first]; + res = z->second; + CongestionTrackHistory.erase(z); + return res; + } + } else { + Y_ASSERT(CongestionTrackHistory.find(ip) == CongestionTrackHistory.end()); + return z->second; + } + } + void SucceededSend(int id) { + SendResults.push_back(TSendResult(id, true)); + } + void FailedSend(int id) { + SendResults.push_back(TSendResult(id, false)); + } + void SendData(TList<TTransferKey>* order, float deltaT, bool needCheckAlive); + void RecvCycle(); + + public: + TUdpHost() + : CurrentT(0) + , IsWaiting(0) + , MaxWaitTime(DEFAULT_MAX_WAIT_TIME) + , MaxWaitTime2(DEFAULT_MAX_WAIT_TIME) + , IBIdleTime(0) + , TimeSinceCompletedQueueClean(0) + , TimeSinceCongestionHistoryUpdate(0) + { + } + ~TUdpHost() override { + for (TList<TRequest*>::const_iterator i = ReceivedList.begin(); i != ReceivedList.end(); ++i) + delete *i; + } + + bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket); + + TRequest* GetRequest() override { + if (ReceivedList.empty()) { + if (IB.Get()) { + return IB->GetRequest(); + } + return nullptr; + } + TRequest* res = ReceivedList.front(); + ReceivedList.pop_front(); + return res; + } + + void AddToSendOrder(const TTransferKey& transferKey, EPacketPriority pp) { + if (pp == PP_LOW) + SendOrderLow.push_back(transferKey); + else if (pp == PP_NORMAL) + SendOrder.push_back(transferKey); + else if (pp == PP_HIGH) + SendOrderHighPrior.push_back(transferKey); + else + Y_ASSERT(0); + + CancelWait(); + } + + int Send(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data, int crc32, TGUID* packetGuid, EPacketPriority pp) override { + if (addr.Port == 0) { + // shortcut for broken addresses + if (packetGuid && packetGuid->IsEmpty()) + CreateGuid(packetGuid); + int reqId = GetTransferId(); + FailedSend(reqId); + return reqId; + } + TTransferKey transferKey; + transferKey.Address = addr; + transferKey.Id = GetTransferId(); + Y_ASSERT(SendQueue.find(transferKey) == SendQueue.end()); + + TPeerLink& peerInfo = GetPeerLink(transferKey.Address); + + TUdpOutTransfer& xfer = SendQueue[transferKey]; + GetWinsockAddr(&xfer.ToAddress, transferKey.Address); + xfer.Crc32 = crc32; + xfer.PacketPriority = pp; + if (!packetGuid || packetGuid->IsEmpty()) { + CreateGuid(&xfer.PacketGuid); + if (packetGuid) + *packetGuid = xfer.PacketGuid; + } else { + xfer.PacketGuid = *packetGuid; + } + xfer.Data.Reset(data.Release()); + xfer.AttachStats(&PendingDataStats); + xfer.AckTracker.AttachCongestionControl(peerInfo.UdpCongestion.Get()); + + bool isSentOverIB = false; + // we don't support priorities (=service levels in IB terms) currently + // so send only PP_NORMAL traffic over IB + if (pp == PP_NORMAL && peerInfo.IBPeer.Get() && xfer.Data->GetSharedData() == nullptr) { + TIBMsgHandle hndl = IB->Send(peerInfo.IBPeer, xfer.Data.Get(), xfer.PacketGuid); + if (hndl >= 0) { + IBKeyToTransferKey[hndl] = transferKey; + isSentOverIB = true; + } else { + // so we failed to use IB, ibPeer is either not connected yet or failed + if (peerInfo.IBPeer->GetState() == IIBPeer::FAILED) { + //printf("Disconnect failed IB peer\n"); + peerInfo.IBPeer = nullptr; + } + } + } + if (!isSentOverIB) { + AddToSendOrder(transferKey, pp); + } + + return transferKey.Id; + } + + bool GetSendResult(TSendResult* res) override { + if (SendResults.empty()) { + if (IB.Get()) { + TIBSendResult sr; + if (IB->GetSendResult(&sr)) { + TIBtoTransferKeyHash::iterator z = IBKeyToTransferKey.find(sr.Handle); + if (z == IBKeyToTransferKey.end()) { + Y_VERIFY(0, "unknown handle returned from IB"); + } + TTransferKey transferKey = z->second; + IBKeyToTransferKey.erase(z); + + TUdpOutXferHash::iterator i = SendQueue.find(transferKey); + if (i == SendQueue.end()) { + Y_VERIFY(0, "IBKeyToTransferKey refers nonexisting xfer"); + } + if (sr.Success) { + TUdpOutTransfer& xfer = i->second; + xfer.AckTracker.MarkAlive(); // do we really need this? + *res = TSendResult(transferKey.Id, sr.Success); + SendQueue.erase(i); + return true; + } else { + //printf("IB send failed, fall back to regular network\n"); + // Houston, we got a problem + // IB failed to send, try to use regular network + TUdpOutTransfer& xfer = i->second; + AddToSendOrder(transferKey, xfer.PacketPriority); + } + } + } + return false; + } + *res = SendResults.front(); + SendResults.pop_front(); + return true; + } + + void Step() override; + void IBStep() override; + + void Wait(float seconds) override { + if (seconds < 1e-3) + seconds = 0; + if (seconds > MaxWaitTime) + seconds = MaxWaitTime; + if (IBIdleTime < 0.010) { + seconds = 0; + } + if (seconds == 0) { + ThreadYield(); + } else { + AtomicAdd(IsWaiting, 1); + if (seconds > MaxWaitTime2) + seconds = MaxWaitTime2; + MaxWaitTime2 = DEFAULT_MAX_WAIT_TIME; + + if (seconds == 0) { + ThreadYield(); + } else { + if (IB.Get()) { + for (float done = 0; done < seconds;) { + float deltaSleep = Min(seconds - done, 0.002f); + s.Wait(deltaSleep); + NHPTimer::STime tChk; + NHPTimer::GetTime(&tChk); + if (IB->Step(tChk)) { + IBIdleTime = 0; + break; + } + done += deltaSleep; + } + } else { + s.Wait(seconds); + } + } + AtomicAdd(IsWaiting, -1); + } + } + + void CancelWait() override { + MaxWaitTime2 = 0; + if (AtomicAdd(IsWaiting, 0) == 1) { + s.SendSelfFakePacket(); + } + } + + void GetPendingDataSize(TRequesterPendingDataStats* res) override { + *res = PendingDataStats; +#ifndef NDEBUG + TRequesterPendingDataStats chk; + for (TUdpOutXferHash::const_iterator i = SendQueue.begin(); i != SendQueue.end(); ++i) { + TRopeDataPacket* pckt = i->second.Data.Get(); + if (pckt) { + chk.OutDataSize += pckt->GetSize(); + ++chk.OutCount; + } + } + for (TUdpInXferHash::const_iterator i = RecvQueue.begin(); i != RecvQueue.end(); ++i) { + const TUdpInTransfer& tr = i->second; + for (int p = 0; p < tr.GetPacketCount(); ++p) { + if (tr.GetPacket(p)) { + chk.InpDataSize += tr.PacketSize; + } + } + ++chk.InpCount; + } + Y_ASSERT(memcmp(&chk, res, sizeof(chk)) == 0); +#endif + } + TString GetDebugInfo() override; + TString GetPeerLinkDebug(const TPeerLinkHash& ch); + void Kill(const TUdpAddress& addr) override; + TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override; + }; + + bool TUdpHost::Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) { + if (s.IsValid()) { + Y_ASSERT(0); + return false; + } + s.Open(socket); + if (!s.IsValid()) + return false; + + if (IBDetection) + IB = CreateIBClientServer(); + + NHPTimer::GetTime(&CurrentT); + return true; + } + + static bool HasAllPackets(const TUdpInTransfer& res) { + if (!res.HasLastPacket) + return false; + for (int i = res.GetPacketCount() - 1; i >= 0; --i) { + if (!res.GetPacket(i)) + return false; + } + return true; + } + + // grouped acks, first int - packet_id, second int - bit mask for 32 packets preceding packet_id + const int SIZEOF_ACK = 8; + static int WriteAck(TUdpInTransfer* p, int* dst, int maxAcks) { + int ackCount = 0; + if (p->NewPacketsToAck.size() > 1) + Sort(p->NewPacketsToAck.begin(), p->NewPacketsToAck.end()); + int lastAcked = 0; + for (size_t idx = 0; idx < p->NewPacketsToAck.size(); ++idx) { + int pkt = p->NewPacketsToAck[idx]; + if (idx == p->NewPacketsToAck.size() - 1 || pkt > lastAcked + 30) { + *dst++ = pkt; + int bitMask = 0; + int backPackets = Min(pkt, 32); + for (int k = 0; k < backPackets; ++k) { + if (p->GetPacket(pkt - k - 1)) + bitMask |= 1 << k; + } + *dst++ = bitMask; + if (++ackCount >= maxAcks) + break; + lastAcked = pkt; + //printf("sending ack %d (mask %x)\n", pkt, bitMask); + } + } + p->NewPacketsToAck.clear(); + return ackCount; + } + + static void AckPacket(TUdpOutTransfer* p, int pkt, float deltaT, bool updateRTT) { + if (pkt < 0 || pkt >= p->PacketCount) { + Y_ASSERT(0); + return; + } + p->AckTracker.Ack(pkt, deltaT, updateRTT); + } + + static void ReadAcks(TUdpOutTransfer* p, const int* acks, int ackCount, float deltaT) { + for (int i = 0; i < ackCount; ++i) { + int pkt = *acks++; + int bitMask = *acks++; + bool updateRTT = i == ackCount - 1; // update RTT using only last packet in the pack + AckPacket(p, pkt, deltaT, updateRTT); + for (int k = 0; k < 32; ++k) { + if (bitMask & (1 << k)) + AckPacket(p, pkt - k - 1, deltaT, false); + } + } + } + + using namespace NNetlibaSocket::NNetliba; + + const ui64 KILL_PASSPHRASE1 = 0x98ff9cefb11d9a4cul; + const ui64 KILL_PASSPHRASE2 = 0xf7754c29e0be95eaul; + + template <class T> + inline T Read(char** data) { + T res = ReadUnaligned<T>(*data); + *data += sizeof(T); + return res; + } + template <class T> + inline void Write(char** data, T res) { + WriteUnaligned<T>(*data, res); + *data += sizeof(T); + } + + static void RequireResend(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) { + char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; + Write(&pktData, transferId); + Write(&pktData, (char)ACK_RESEND); + Write(&pktData, attempt); + s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); + } + + static void RequireResendNoShmem(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) { + char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; + Write(&pktData, transferId); + Write(&pktData, (char)ACK_RESEND_NOSHMEM); + Write(&pktData, attempt); + s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); + } + + static void AckComplete(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, const TGUID& packetGuid, int packetId) { + char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; + Write(&pktData, transferId); + Write(&pktData, (char)ACK_COMPLETE); + Write(&pktData, packetGuid); + Write(&pktData, packetId); // we need packetId to update RTT + s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); + } + + static void SendPing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) { + char pktBuf[UDP_PACKET_SIZE_FULL]; + char* pktData = pktBuf + UDP_LOW_LEVEL_HEADER_SIZE; + if (NSan::MSanIsOn()) { + Zero(pktBuf); + } + Write(&pktData, (int)0); + Write(&pktData, (char)PING); + Write(&pktData, selfNetworkOrderPort); + s.SendTo(pktBuf, UDP_PACKET_SIZE_FULL, toAddress, FF_DONT_FRAG); + } + + // not MTU discovery, just figure out IB address of the peer + static void SendFakePing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) { + char buf[100]; + char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; + Write(&pktData, (int)0); + Write(&pktData, (char)PING); + Write(&pktData, selfNetworkOrderPort); + s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); + } + + void TUdpHost::SendData(TList<TTransferKey>* order, float deltaT1, bool needCheckAlive) { + for (TList<TTransferKey>::iterator z = order->begin(); z != order->end();) { + // pick connection to send + const TTransferKey& transferKey = *z; + TUdpOutXferHash::iterator i = SendQueue.find(transferKey); + if (i == SendQueue.end()) { + z = order->erase(z); + continue; + } + ++z; + + // perform sending + int transferId = transferKey.Id; + TUdpOutTransfer& xfer = i->second; + + if (!xfer.AckTracker.IsInitialized()) { + TIntrusivePtr<TCongestionControl> congestion = xfer.AckTracker.GetCongestionControl(); + Y_ASSERT(congestion.Get() != nullptr); + if (!congestion->IsKnownMTU()) { + TLameMTUDiscovery* md = congestion->GetMTUDiscovery(); + if (md->IsTimedOut()) { + congestion->SetMTU(UDP_PACKET_SIZE_SMALL); + } else { + if (md->CanSend()) { + SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort()); + md->PingSent(); + } + continue; + } + } + // try to use large mtu, we could have selected small mtu due to connectivity problems + if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL || IB.Get() != nullptr) { + // recheck every ~50mb + int chkDenom = (50000000 / xfer.Data->GetSize()) | 1; + if ((NetAckRnd() % chkDenom) == 0) { + //printf("send rechecking ping\n"); + if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL) { + SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort()); + } else { + SendFakePing(s, xfer.ToAddress, s.GetNetworkOrderPort()); + } + } + } + xfer.PacketSize = congestion->GetMTU(); + xfer.LastPacketSize = xfer.Data->GetSize() % xfer.PacketSize; + xfer.PacketCount = xfer.Data->GetSize() / xfer.PacketSize + 1; + xfer.AckTracker.SetPacketCount(xfer.PacketCount); + } + + xfer.AckTracker.Step(deltaT1); + MaxWaitTime = Min(MaxWaitTime, xfer.AckTracker.GetTimeToNextPacketTimeout()); + if (needCheckAlive && !xfer.AckTracker.IsAlive()) { + FailedSend(transferId); + SendQueue.erase(i); + continue; + } + bool sendBufferOverflow = false; + while (xfer.AckTracker.CanSend()) { + NHPTimer::STime tCopy = CurrentT; + float deltaT2 = (float)NHPTimer::GetTimePassed(&tCopy); + deltaT2 = ClampVal(deltaT2, 0.0f, UDP_TRANSFER_TIMEOUT / 3); + + int pkt = xfer.AckTracker.GetPacketToSend(deltaT2); + if (pkt == -1) { + break; + } + + int dataSize = xfer.PacketSize; + if (pkt == xfer.PacketCount - 1) + dataSize = xfer.LastPacketSize; + + char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE; + Write(&pktData, transferId); + char pktType = xfer.PacketSize == UDP_PACKET_SIZE ? DATA : DATA_SMALL; + TSharedMemory* shm = xfer.Data->GetSharedData(); + if (shm) { + if (pktType == DATA) + pktType = DATA_SHMEM; + else + pktType = DATA_SMALL_SHMEM; + } + Write(&pktData, pktType); + Write(&pktData, xfer.Attempt); + Write(&pktData, pkt); + if (pkt == 0) { + Write(&pktData, xfer.PacketGuid); + Write(&pktData, xfer.Crc32); + if (shm) { + Write(&pktData, shm->GetId()); + Write(&pktData, shm->GetSize()); + } + } + TBlockChainIterator dataReader(xfer.Data->GetChain()); + dataReader.Seek(pkt * xfer.PacketSize); + dataReader.Read(pktData, dataSize); + pktData += dataSize; + int sendSize = (int)(pktData - PktBuf); + TNetSocket::ESendError sendErr = s.SendTo(PktBuf, sendSize, xfer.ToAddress, FF_ALLOW_FRAG); + if (sendErr != TNetSocket::SEND_OK) { + if (sendErr == TNetSocket::SEND_NO_ROUTE_TO_HOST) { + FailedSend(transferId); + SendQueue.erase(i); + break; + } else { + // most probably out of send buffer space (or something terrible has happened) + xfer.AckTracker.AddToResend(pkt); + sendBufferOverflow = true; + MaxWaitTime = 0; + //printf("failed send\n"); + break; + } + } + } + if (sendBufferOverflow) + break; + } + } + + void TUdpHost::RecvCycle() { + for (;;) { + sockaddr_in6 fromAddress; + int rv = RecvBuf.GetBufSize(); + bool recvOk = s.RecvFrom(RecvBuf.GetDataPtr(), &rv, &fromAddress); + if (!recvOk) + break; + + NHPTimer::STime tCopy = CurrentT; + float deltaT = (float)NHPTimer::GetTimePassed(&tCopy); + deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3); + + //int fromIP = fromAddress.sin_addr.s_addr; + + TTransferKey k; + char* pktData = RecvBuf.GetDataPtr() + UDP_LOW_LEVEL_HEADER_SIZE; + GetUdpAddress(&k.Address, fromAddress); + k.Id = Read<int>(&pktData); + int transferId = k.Id; + int cmd = Read<char>(&pktData); + Y_ASSERT(cmd == (int)*(RecvBuf.GetDataPtr() + CMD_POS)); + switch (cmd) { + case DATA: + case DATA_SMALL: + case DATA_SHMEM: + case DATA_SMALL_SHMEM: { + int attempt = Read<int>(&pktData); + int packetId = Read<int>(&pktData); + //printf("data packet %d (trans ID = %d)\n", packetId, transferId); + TUdpCompleteInXferHash::iterator itCompl = RecvCompleted.find(k); + if (itCompl != RecvCompleted.end()) { + Y_ASSERT(RecvQueue.find(k) == RecvQueue.end()); + const TUdpCompleteInTransfer& complete = itCompl->second; + bool sendAckComplete = true; + if (packetId == 0) { + // check packet GUID + char* tmpPktData = pktData; + TGUID packetGuid; + packetGuid = Read<TGUID>(&tmpPktData); + if (packetGuid != complete.PacketGuid) { + // we are receiving new data with the same transferId + // in this case we have to flush all the information about previous transfer + // and start over + //printf("same transferId for a different packet\n"); + RecvCompleted.erase(itCompl); + sendAckComplete = false; + } + } + if (sendAckComplete) { + AckComplete(s, fromAddress, transferId, complete.PacketGuid, packetId); + break; + } + } + TUdpInXferHash::iterator rq = RecvQueue.find(k); + if (rq == RecvQueue.end()) { + //printf("new input transfer\n"); + TUdpInTransfer& res = RecvQueue[k]; + res.ToAddress = fromAddress; + res.Attempt = attempt; + res.Congestion = GetPeerLink(k.Address).UdpCongestion.Get(); + res.PacketSize = 0; + res.HasLastPacket = false; + res.AttachStats(&PendingDataStats); + rq = RecvQueue.find(k); + Y_ASSERT(rq != RecvQueue.end()); + } + TUdpInTransfer& res = rq->second; + res.Congestion->MarkAlive(); + res.TimeSinceLastRecv = 0; + + if (packetId == 0) { + TGUID packetGuid; + packetGuid = Read<TGUID>(&pktData); + int crc32 = Read<int>(&pktData); + res.Crc32 = crc32; + res.PacketGuid = packetGuid; + if (cmd == DATA_SHMEM || cmd == DATA_SMALL_SHMEM) { + // link to attached shared memory + TGUID shmemId = Read<TGUID>(&pktData); + int shmemSize = Read<int>(&pktData); + if (res.SharedData.Get() == nullptr) { + res.SharedData = new TSharedMemory; + if (!res.SharedData->Open(shmemId, shmemSize)) { + res.SharedData = nullptr; + RequireResendNoShmem(s, res.ToAddress, transferId, res.Attempt); + break; + } + } + } + } + if (attempt != res.Attempt) { + RequireResend(s, res.ToAddress, transferId, res.Attempt); + break; + } else { + if (res.PacketSize == 0) { + res.PacketSize = (cmd == DATA || cmd == DATA_SHMEM ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL); + } else { + // check that all data is of same size + Y_ASSERT(cmd == DATA || cmd == DATA_SMALL); + Y_ASSERT(res.PacketSize == (cmd == DATA ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL)); + } + + int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData); + + Y_ASSERT(dataSize <= res.PacketSize); + if (dataSize > res.PacketSize) + break; // mem overrun protection + if (packetId >= res.GetPacketCount()) + res.SetPacketCount(packetId + 1); + { + TUdpRecvPacket* pkt = nullptr; + if (res.PacketSize == UDP_PACKET_SIZE_SMALL) { + // save memory by using smaller buffer at the cost of additional memcpy + pkt = RecvBuf.CreateNewSmallPacket(dataSize); + memcpy(pkt->Data, pktData, dataSize); + pkt->DataStart = 0; + pkt->DataSize = dataSize; + } else { + int dataStart = (int)(pktData - RecvBuf.GetDataPtr()); // data offset in the packet + pkt = RecvBuf.ExtractPacket(); + pkt->DataStart = dataStart; + pkt->DataSize = dataSize; + } + // calc packet sum, will be used to calc whole message crc + pkt->BlockSum = TIncrementalChecksumCalcer::CalcBlockSum(pkt->Data + pkt->DataStart, pkt->DataSize); + res.AssignPacket(packetId, pkt); + } + + if (dataSize != res.PacketSize) { + res.LastPacketSize = dataSize; + res.HasLastPacket = true; + } + + if (HasAllPackets(res)) { + //printf("received\n"); + TRequest* out = new TRequest; + out->Address = k.Address; + out->Guid = res.PacketGuid; + TIncrementalChecksumCalcer incCS; + int packetCount = res.GetPacketCount(); + out->Data.Reset(new TRopeDataPacket); + for (int i = 0; i < packetCount; ++i) { + TUdpRecvPacket* pkt = res.ExtractPacket(i); + Y_ASSERT(pkt->DataSize == ((i == packetCount - 1) ? res.LastPacketSize : res.PacketSize)); + out->Data->AddBlock((char*)pkt, pkt->Data + pkt->DataStart, pkt->DataSize); + incCS.AddBlockSum(pkt->BlockSum, pkt->DataSize); + } + out->Data->AttachSharedData(res.SharedData); + res.EraseAllPackets(); + + int crc32 = incCS.CalcChecksum(); // CalcChecksum(out->Data->GetChain()); +#ifdef SIMULATE_NETWORK_FAILURES + bool crcOk = crc32 == res.Crc32 ? (RandomNumber<size_t>() % 10) != 0 : false; +#else + bool crcOk = crc32 == res.Crc32; +#endif + if (crcOk) { + ReceivedList.push_back(out); + Y_ASSERT(RecvCompleted.find(k) == RecvCompleted.end()); + TUdpCompleteInTransfer& complete = RecvCompleted[k]; + RecvCompletedQueue.push_back(k); + complete.PacketGuid = res.PacketGuid; + AckComplete(s, res.ToAddress, transferId, complete.PacketGuid, packetId); + RecvQueue.erase(rq); + } else { + //printf("crc failed, require resend\n"); + delete out; + ++res.Attempt; + res.NewPacketsToAck.clear(); + RequireResend(s, res.ToAddress, transferId, res.Attempt); + } + } else { + res.NewPacketsToAck.push_back(packetId); + } + } + } break; + case ACK: { + TUdpOutXferHash::iterator i = SendQueue.find(k); + if (i == SendQueue.end()) + break; + TUdpOutTransfer& xfer = i->second; + if (!xfer.AckTracker.IsInitialized()) + break; + xfer.AckTracker.MarkAlive(); + int attempt = Read<int>(&pktData); + Y_ASSERT(attempt <= xfer.Attempt); + if (attempt != xfer.Attempt) + break; + ReadAcks(&xfer, (int*)pktData, (int)(RecvBuf.GetDataPtr() + rv - pktData) / SIZEOF_ACK, deltaT); + break; + } + case ACK_COMPLETE: { + TUdpOutXferHash::iterator i = SendQueue.find(k); + if (i == SendQueue.end()) + break; + TUdpOutTransfer& xfer = i->second; + xfer.AckTracker.MarkAlive(); + TGUID packetGuid; + packetGuid = Read<TGUID>(&pktData); + int packetId = Read<int>(&pktData); + if (packetGuid == xfer.PacketGuid) { + xfer.AckTracker.Ack(packetId, deltaT, true); // update RTT + xfer.AckTracker.AckAll(); // acking packets is required, otherwise they will be treated as lost (look AckTracker destructor) + SucceededSend(transferId); + SendQueue.erase(i); + } else { + // peer asserts that he has received this packet but packetGuid is wrong + // try to resend everything + // ++xfer.Attempt; // should not do this, only sender can modify attempt number, otherwise cycle is possible with out of order packets + xfer.AckTracker.Resend(); + } + break; + } break; + case ACK_RESEND: { + TUdpOutXferHash::iterator i = SendQueue.find(k); + if (i == SendQueue.end()) + break; + TUdpOutTransfer& xfer = i->second; + xfer.AckTracker.MarkAlive(); + int attempt = Read<int>(&pktData); + if (xfer.Attempt != attempt) { + // reset current tranfser & initialize new one + xfer.Attempt = attempt; + xfer.AckTracker.Resend(); + } + break; + } + case ACK_RESEND_NOSHMEM: { + // abort execution here + // failed to open shmem on recv side, need to transmit data without using shmem + Y_VERIFY(0, "not implemented yet"); + break; + } + case PING: { + sockaddr_in6 trueFromAddress = fromAddress; + int port = Read<int>(&pktData); + Y_ASSERT(trueFromAddress.sin6_family == AF_INET6); + trueFromAddress.sin6_port = port; + // can not set MTU for fromAddress here since asymmetrical mtu is possible + char* pktData2 = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE; + Write(&pktData2, (int)0); + Write(&pktData2, (char)PONG); + if (IB.Get()) { + const TIBConnectInfo& ibConnectInfo = IB->GetConnectInfo(); + Write(&pktData2, ibConnectInfo); + Write(&pktData2, trueFromAddress); + } + s.SendTo(PktBuf, pktData2 - PktBuf, trueFromAddress, FF_ALLOW_FRAG); + break; + } + case PONG: { + TPeerLink& peerInfo = GetPeerLink(k.Address); + peerInfo.UdpCongestion->SetMTU(UDP_PACKET_SIZE); + int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData); + if (dataSize == sizeof(TIBConnectInfo) + sizeof(sockaddr_in6)) { + if (IB.Get() != nullptr && peerInfo.IBPeer.Get() == nullptr) { + TIBConnectInfo info = Read<TIBConnectInfo>(&pktData); + sockaddr_in6 myAddress = Read<sockaddr_in6>(&pktData); + TUdpAddress myUdpAddress; + GetUdpAddress(&myUdpAddress, myAddress); + peerInfo.IBPeer = IB->ConnectPeer(info, k.Address, myUdpAddress); + } + } + break; + } + case KILL: { + ui64 p1 = Read<ui64>(&pktData); + ui64 p2 = Read<ui64>(&pktData); + int restSize = (int)(RecvBuf.GetDataPtr() + rv - pktData); + if (restSize == 0 && p1 == KILL_PASSPHRASE1 && p2 == KILL_PASSPHRASE2) { + abort(); + } + break; + } + default: + Y_ASSERT(0); + break; + } + } + } + + void TUdpHost::IBStep() { + if (IB.Get()) { + NHPTimer::STime tChk = CurrentT; + float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk); + if (IB->Step(tChk)) { + IBIdleTime = -chkDeltaT; + } + } + } + + void TUdpHost::Step() { + if (IB.Get()) { + NHPTimer::STime tChk = CurrentT; + float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk); + if (IB->Step(tChk)) { + IBIdleTime = -chkDeltaT; + } + if (chkDeltaT < 0.0005) { + return; + } + } + + if (UseTOSforAcks) { + s.SetTOS(0x20); + } else { + s.SetTOS(0); + } + + RecvCycle(); + + float deltaT = (float)NHPTimer::GetTimePassed(&CurrentT); + deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3); + + MaxWaitTime = DEFAULT_MAX_WAIT_TIME; + IBIdleTime += deltaT; + + bool needCheckAlive = false; + + // update alive ports + const float INACTIVE_CONGESTION_UPDATE_INTERVAL = 1; + TimeSinceCongestionHistoryUpdate += deltaT; + if (TimeSinceCongestionHistoryUpdate > INACTIVE_CONGESTION_UPDATE_INTERVAL) { + for (TPeerLinkHash::iterator i = CongestionTrackHistory.begin(); i != CongestionTrackHistory.end();) { + TPeerLink& pl = i->second; + if (!pl.UpdateSleep(TimeSinceCongestionHistoryUpdate)) { + TPeerLinkHash::iterator k = i++; + CongestionTrackHistory.erase(k); + needCheckAlive = true; + } else { + ++i; + } + } + TimeSinceCongestionHistoryUpdate = 0; + } + for (TPeerLinkHash::iterator i = CongestionTrack.begin(); i != CongestionTrack.end();) { + const TUdpAddress& addr = i->first; + TPeerLink& pl = i->second; + if (pl.UdpCongestion->GetTransferCount() == 0) { + pl.StartSleep(addr, &MaxWaitTime); + CongestionTrackHistory[i->first] = i->second; + TPeerLinkHash::iterator k = i++; + CongestionTrack.erase(k); + } else if (!pl.Update(deltaT, addr, &MaxWaitTime)) { + TPeerLinkHash::iterator k = i++; + CongestionTrack.erase(k); + needCheckAlive = true; + } else { + ++i; + } + } + + // send acks on received data + for (TUdpInXferHash::iterator i = RecvQueue.begin(); i != RecvQueue.end();) { + const TTransferKey& transKey = i->first; + int transferId = transKey.Id; + TUdpInTransfer& xfer = i->second; + xfer.TimeSinceLastRecv += deltaT; + if (xfer.TimeSinceLastRecv > UDP_MAX_INPUT_DATA_WAIT || (needCheckAlive && !xfer.Congestion->IsAlive())) { + TUdpInXferHash::iterator k = i++; + RecvQueue.erase(k); + continue; + } + Y_ASSERT(RecvCompleted.find(i->first) == RecvCompleted.end()); // state "Complete & incomplete" is incorrect + if (!xfer.NewPacketsToAck.empty()) { + char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE; + Write(&pktData, transferId); + Write(&pktData, (char)ACK); + Write(&pktData, xfer.Attempt); + int acks = WriteAck(&xfer, (int*)pktData, (int)(xfer.PacketSize - (pktData - PktBuf)) / SIZEOF_ACK); + pktData += acks * SIZEOF_ACK; + s.SendTo(PktBuf, (int)(pktData - PktBuf), xfer.ToAddress, FF_ALLOW_FRAG); + } + ++i; + } + + if (UseTOSforAcks) { + s.SetTOS(0x60); + } + + // send data for outbound connections + SendData(&SendOrderHighPrior, deltaT, needCheckAlive); + SendData(&SendOrder, deltaT, needCheckAlive); + SendData(&SendOrderLow, deltaT, needCheckAlive); + + // roll send order to avoid exotic problems with lots of peers and high traffic + SendOrderHighPrior.splice(SendOrderHighPrior.end(), SendOrderHighPrior, SendOrderHighPrior.begin()); + //SendOrder.splice(SendOrder.end(), SendOrder, SendOrder.begin()); // sending data in order has lower delay and shorter queue + + // clean completed queue + TimeSinceCompletedQueueClean += deltaT; + if (TimeSinceCompletedQueueClean > UDP_TRANSFER_TIMEOUT * 1.5) { + for (size_t i = 0; i < KeepCompletedQueue.size(); ++i) { + TUdpCompleteInXferHash::iterator k = RecvCompleted.find(KeepCompletedQueue[i]); + if (k != RecvCompleted.end()) + RecvCompleted.erase(k); + } + KeepCompletedQueue.clear(); + KeepCompletedQueue.swap(RecvCompletedQueue); + TimeSinceCompletedQueueClean = 0; + } + } + + TString TUdpHost::GetPeerLinkDebug(const TPeerLinkHash& ch) { + TString res; + char buf[1000]; + for (const auto& i : ch) { + const TUdpAddress& ip = i.first; + const TCongestionControl& cc = *i.second.UdpCongestion; + IIBPeer* ibPeer = i.second.IBPeer.Get(); + sprintf(buf, "%s\tIB: %d, RTT: %g Timeout: %g Window: %g MaxWin: %g FailRate: %g TimeSinceLastRecv: %g Transfers: %d MTU: %d\n", + GetAddressAsString(ip).c_str(), + ibPeer ? ibPeer->GetState() : -1, + cc.GetRTT() * 1000, cc.GetTimeout() * 1000, cc.GetWindow(), cc.GetMaxWindow(), cc.GetFailRate(), + cc.GetTimeSinceLastRecv() * 1000, cc.GetTransferCount(), cc.GetMTU()); + res += buf; + } + return res; + } + + TString TUdpHost::GetDebugInfo() { + TString res; + char buf[1000]; + sprintf(buf, "Receiving %d msgs, sending %d high prior, %d regular msgs, %d low prior msgs\n", + RecvQueue.ysize(), (int)SendOrderHighPrior.size(), (int)SendOrder.size(), (int)SendOrderLow.size()); + res += buf; + + TRequesterPendingDataStats pds; + GetPendingDataSize(&pds); + sprintf(buf, "Pending data size: %" PRIu64 "\n", pds.InpDataSize + pds.OutDataSize); + res += buf; + sprintf(buf, " in packets: %d, size %" PRIu64 "\n", pds.InpCount, pds.InpDataSize); + res += buf; + sprintf(buf, " out packets: %d, size %" PRIu64 "\n", pds.OutCount, pds.OutDataSize); + res += buf; + + res += "\nCongestion info:\n"; + res += GetPeerLinkDebug(CongestionTrack); + res += "\nCongestion info history:\n"; + res += GetPeerLinkDebug(CongestionTrackHistory); + + return res; + } + + static void SendKill(const TNetSocket& s, const sockaddr_in6& toAddress) { + char buf[100]; + char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; + Write(&pktData, (int)0); + Write(&pktData, (char)KILL); + Write(&pktData, KILL_PASSPHRASE1); + Write(&pktData, KILL_PASSPHRASE2); + s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); + } + + void TUdpHost::Kill(const TUdpAddress& addr) { + sockaddr_in6 target; + GetWinsockAddr(&target, addr); + SendKill(s, target); + } + + TIntrusivePtr<IPeerQueueStats> TUdpHost::GetQueueStats(const TUdpAddress& addr) { + TQueueStatsHash::iterator zq = PeerQueueStats.find(addr); + if (zq != PeerQueueStats.end()) { + return zq->second.Get(); + } + TPeerQueueStats* res = new TPeerQueueStats; + PeerQueueStats[addr] = res; + // attach to existing congestion tracker + TPeerLinkHash::iterator z; + z = CongestionTrack.find(addr); + if (z != CongestionTrack.end()) { + z->second.UdpCongestion->AttachQueueStats(res); + } + z = CongestionTrackHistory.find(addr); + if (z != CongestionTrackHistory.end()) { + z->second.UdpCongestion->AttachQueueStats(res); + } + return res; + } + + ////////////////////////////////////////////////////////////////////////// + + TIntrusivePtr<IUdpHost> CreateUdpHost(int port) { + TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateBestRecvSocket(); + socket->Open(port); + if (!socket->IsValid()) + return nullptr; + return CreateUdpHost(socket); + } + + TIntrusivePtr<IUdpHost> CreateUdpHost(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) { + if (!InitLocalIPList()) { + Y_ASSERT(0 && "Can not determine self IP address"); + return nullptr; + } + TIntrusivePtr<TUdpHost> res = new TUdpHost; + if (!res->Start(socket)) + return nullptr; + return res.Get(); + } + + void SetUdpMaxBandwidthPerIP(float f) { + f = Max(0.0f, f); + TCongestionControl::MaxPacketRate = f / UDP_PACKET_SIZE; + } + + void SetUdpSlowStart(bool enable) { + TCongestionControl::StartWindowSize = enable ? 0.5f : 3; + } + + void DisableIBDetection() { + IBDetection = false; + } + +} |