aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/v6/udp_client_server.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/udp_client_server.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/udp_client_server.cpp')
-rw-r--r--library/cpp/netliba/v6/udp_client_server.cpp1321
1 files changed, 1321 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/udp_client_server.cpp b/library/cpp/netliba/v6/udp_client_server.cpp
new file mode 100644
index 0000000000..3eaf6e5e96
--- /dev/null
+++ b/library/cpp/netliba/v6/udp_client_server.cpp
@@ -0,0 +1,1321 @@
+#include "stdafx.h"
+#include "udp_client_server.h"
+#include "net_acks.h"
+#include <util/generic/guid.h>
+#include <util/system/hp_timer.h>
+#include <util/datetime/cputimer.h>
+#include <util/system/yield.h>
+#include <util/system/unaligned_mem.h>
+#include "block_chain.h"
+#include <util/system/shmat.h>
+#include "udp_debug.h"
+#include "udp_socket.h"
+#include "ib_cs.h"
+
+#include <library/cpp/netliba/socket/socket.h>
+
+#include <util/random/random.h>
+#include <util/system/sanitizers.h>
+
+#include <atomic>
+
+namespace NNetliba {
+ // rely on UDP checksum in packets, check crc only for complete packets
+ // UPDATE: looks like UDP checksum is not enough, network errors do happen, we saw 600+ retransmits of a ~1MB data packet
+
+ const float UDP_TRANSFER_TIMEOUT = 90.0f;
+ const float DEFAULT_MAX_WAIT_TIME = 1;
+ const float UDP_KEEP_PEER_INFO = 600;
+ // траффик может идти, а новых данных для конкретного пакета может не добавляться.
+ // это возможно когда мы прерываем процесс в момент передачи и перезапускаем его на том же порту,
+ // тогда на приемнике повиснет пакет. Этот пакет мы зашибем по этому таймауту
+ const float UDP_MAX_INPUT_DATA_WAIT = UDP_TRANSFER_TIMEOUT * 2;
+
+ enum {
+ UDP_PACKET_SIZE_FULL = 8900, // used for ping to detect jumbo-frame support
+ UDP_PACKET_SIZE = 8800, // max data in packet
+ UDP_PACKET_SIZE_SMALL = 1350, // 1180 would be better taking into account that 1280 is guaranteed ipv6 minimum MTU
+ UDP_PACKET_BUF_SIZE = UDP_PACKET_SIZE + 100,
+ };
+
+ //////////////////////////////////////////////////////////////////////////
+ struct TUdpCompleteInTransfer {
+ TGUID PacketGuid;
+ };
+
+ //////////////////////////////////////////////////////////////////////////
+ struct TUdpRecvPacket {
+ int DataStart, DataSize;
+ ui32 BlockSum;
+ // Data[] should be last member in struct, this fact is used to create truncated TUdpRecvPacket in CreateNewSmallPacket()
+ char Data[UDP_PACKET_BUF_SIZE];
+ };
+
+ struct TUdpInTransfer {
+ private:
+ TVector<TUdpRecvPacket*> Packets;
+
+ public:
+ sockaddr_in6 ToAddress;
+ int PacketSize, LastPacketSize;
+ bool HasLastPacket;
+ TVector<int> NewPacketsToAck;
+ TCongestionControlPtr Congestion;
+ float TimeSinceLastRecv;
+ int Attempt;
+ TGUID PacketGuid;
+ int Crc32;
+ TIntrusivePtr<TSharedMemory> SharedData;
+ TRequesterPendingDataStats* Stats;
+
+ TUdpInTransfer()
+ : PacketSize(0)
+ , LastPacketSize(0)
+ , HasLastPacket(false)
+ , TimeSinceLastRecv(0)
+ , Attempt(0)
+ , Crc32(0)
+ , Stats(nullptr)
+ {
+ Zero(ToAddress);
+ }
+ ~TUdpInTransfer() {
+ if (Stats) {
+ Stats->InpCount -= 1;
+ }
+ EraseAllPackets();
+ }
+ void EraseAllPackets() {
+ for (int i = 0; i < Packets.ysize(); ++i) {
+ ErasePacket(i);
+ }
+ Packets.clear();
+ HasLastPacket = false;
+ }
+ void AttachStats(TRequesterPendingDataStats* stats) {
+ Stats = stats;
+ Stats->InpCount += 1;
+ Y_ASSERT(Packets.empty());
+ }
+ void ErasePacket(int id) {
+ TUdpRecvPacket* pkt = Packets[id];
+ if (pkt) {
+ if (Stats) {
+ Stats->InpDataSize -= PacketSize;
+ }
+ TRopeDataPacket::FreeBuf((char*)pkt);
+ Packets[id] = nullptr;
+ }
+ }
+ void AssignPacket(int id, TUdpRecvPacket* pkt) {
+ ErasePacket(id);
+ if (pkt && Stats) {
+ Stats->InpDataSize += PacketSize;
+ }
+ Packets[id] = pkt;
+ }
+ int GetPacketCount() const {
+ return Packets.ysize();
+ }
+ void SetPacketCount(int n) {
+ Packets.resize(n, nullptr);
+ }
+ const TUdpRecvPacket* GetPacket(int id) const {
+ return Packets[id];
+ }
+ TUdpRecvPacket* ExtractPacket(int id) {
+ TUdpRecvPacket* res = Packets[id];
+ if (res) {
+ if (Stats) {
+ Stats->InpDataSize -= PacketSize;
+ }
+ Packets[id] = nullptr;
+ }
+ return res;
+ }
+ };
+
+ struct TUdpOutTransfer {
+ sockaddr_in6 ToAddress;
+ TAutoPtr<TRopeDataPacket> Data;
+ int PacketCount;
+ int PacketSize, LastPacketSize;
+ TAckTracker AckTracker;
+ int Attempt;
+ TGUID PacketGuid;
+ int Crc32;
+ EPacketPriority PacketPriority;
+ TRequesterPendingDataStats* Stats;
+
+ TUdpOutTransfer()
+ : PacketCount(0)
+ , PacketSize(0)
+ , LastPacketSize(0)
+ , Attempt(0)
+ , Crc32(0)
+ , PacketPriority(PP_LOW)
+ , Stats(nullptr)
+ {
+ Zero(ToAddress);
+ }
+ ~TUdpOutTransfer() {
+ if (Stats) {
+ Stats->OutCount -= 1;
+ Stats->OutDataSize -= Data->GetSize();
+ }
+ }
+ void AttachStats(TRequesterPendingDataStats* stats) {
+ Stats = stats;
+ Stats->OutCount += 1;
+ Stats->OutDataSize += Data->GetSize();
+ }
+ };
+
+ struct TTransferKey {
+ TUdpAddress Address;
+ int Id;
+ };
+ inline bool operator==(const TTransferKey& a, const TTransferKey& b) {
+ return a.Address == b.Address && a.Id == b.Id;
+ }
+ struct TTransferKeyHash {
+ int operator()(const TTransferKey& k) const {
+ return (ui32)k.Address.Interface + (ui32)k.Address.Port * (ui32)389461 + (ui32)k.Id;
+ }
+ };
+
+ struct TUdpAddressHash {
+ int operator()(const TUdpAddress& addr) const {
+ return (ui32)addr.Interface + (ui32)addr.Port * (ui32)389461;
+ }
+ };
+
+ class TUdpHostRevBufAlloc: public TNonCopyable {
+ TUdpRecvPacket* RecvPktBuf;
+
+ void AllocNewBuf() {
+ RecvPktBuf = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(sizeof(TUdpRecvPacket));
+ }
+
+ public:
+ TUdpHostRevBufAlloc() {
+ AllocNewBuf();
+ }
+ ~TUdpHostRevBufAlloc() {
+ FreeBuf(RecvPktBuf);
+ }
+ void FreeBuf(TUdpRecvPacket* pkt) {
+ TRopeDataPacket::FreeBuf((char*)pkt);
+ }
+ TUdpRecvPacket* ExtractPacket() {
+ TUdpRecvPacket* res = RecvPktBuf;
+ AllocNewBuf();
+ return res;
+ }
+ TUdpRecvPacket* CreateNewSmallPacket(int sz) {
+ int pktStructSz = sizeof(TUdpRecvPacket) - Y_ARRAY_SIZE(RecvPktBuf->Data) + sz;
+ TUdpRecvPacket* pkt = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(pktStructSz);
+ return pkt;
+ }
+ int GetBufSize() const {
+ return Y_ARRAY_SIZE(RecvPktBuf->Data);
+ }
+ char* GetDataPtr() const {
+ return RecvPktBuf->Data;
+ }
+ };
+
+ static TAtomic transferIdCounter = (long)(GetCycleCount() & 0x1fffffff);
+ inline int GetTransferId() {
+ int res = AtomicAdd(transferIdCounter, 1);
+ while (res < 0) {
+ // negative transfer ids are treated as errors, so wrap transfer id
+ AtomicCas(&transferIdCounter, 0, transferIdCounter);
+ res = AtomicAdd(transferIdCounter, 1);
+ }
+ return res;
+ }
+
+ static bool IBDetection = true;
+ class TUdpHost: public IUdpHost {
+ struct TPeerLink {
+ TIntrusivePtr<TCongestionControl> UdpCongestion;
+ TIntrusivePtr<IIBPeer> IBPeer;
+ double TimeNoActiveTransfers;
+
+ TPeerLink()
+ : TimeNoActiveTransfers(0)
+ {
+ }
+ bool Update(float deltaT, const TUdpAddress& toAddress, float* maxWaitTime) {
+ bool updateOk = UdpCongestion->UpdateAlive(toAddress, deltaT, UDP_TRANSFER_TIMEOUT, maxWaitTime);
+ return updateOk;
+ }
+ void StartSleep(const TUdpAddress& toAddress, float* maxWaitTime) {
+ //printf("peer_link start sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount());
+ UdpCongestion->UpdateAlive(toAddress, 0, UDP_TRANSFER_TIMEOUT, maxWaitTime);
+ UdpCongestion->MarkAlive();
+ TimeNoActiveTransfers = 0;
+ }
+ bool UpdateSleep(float deltaT) {
+ TimeNoActiveTransfers += deltaT;
+ if (IBPeer.Get()) {
+ //printf("peer_link update sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount());
+ if (IBPeer->GetState() == IIBPeer::OK) {
+ return true;
+ }
+ //printf("Drop broken IB connection\n");
+ IBPeer = nullptr;
+ }
+ return (TimeNoActiveTransfers < UDP_KEEP_PEER_INFO);
+ }
+ };
+
+ TNetSocket s;
+ typedef THashMap<TTransferKey, TUdpInTransfer, TTransferKeyHash> TUdpInXferHash;
+ typedef THashMap<TTransferKey, TUdpOutTransfer, TTransferKeyHash> TUdpOutXferHash;
+ // congestion control per peer
+ typedef THashMap<TUdpAddress, TPeerLink, TUdpAddressHash> TPeerLinkHash;
+ typedef THashMap<TTransferKey, TUdpCompleteInTransfer, TTransferKeyHash> TUdpCompleteInXferHash;
+ typedef THashMap<TUdpAddress, TIntrusivePtr<TPeerQueueStats>, TUdpAddressHash> TQueueStatsHash;
+ TUdpInXferHash RecvQueue;
+ TUdpCompleteInXferHash RecvCompleted;
+ TUdpOutXferHash SendQueue;
+ TPeerLinkHash CongestionTrack, CongestionTrackHistory;
+ TList<TRequest*> ReceivedList;
+ NHPTimer::STime CurrentT;
+ TList<TSendResult> SendResults;
+ TList<TTransferKey> SendOrderLow, SendOrder, SendOrderHighPrior;
+ TAtomic IsWaiting;
+ float MaxWaitTime;
+ std::atomic<float> MaxWaitTime2;
+ float IBIdleTime;
+ TVector<TTransferKey> RecvCompletedQueue, KeepCompletedQueue;
+ float TimeSinceCompletedQueueClean, TimeSinceCongestionHistoryUpdate;
+ TRequesterPendingDataStats PendingDataStats;
+ TQueueStatsHash PeerQueueStats;
+ TIntrusivePtr<IIBClientServer> IB;
+ typedef THashMap<TIBMsgHandle, TTransferKey> TIBtoTransferKeyHash;
+ TIBtoTransferKeyHash IBKeyToTransferKey;
+
+ char PktBuf[UDP_PACKET_BUF_SIZE];
+ TUdpHostRevBufAlloc RecvBuf;
+
+ TPeerLink& GetPeerLink(const TUdpAddress& ip) {
+ TPeerLinkHash::iterator z = CongestionTrack.find(ip);
+ if (z == CongestionTrack.end()) {
+ z = CongestionTrackHistory.find(ip);
+ if (z == CongestionTrackHistory.end()) {
+ TPeerLink& res = CongestionTrack[ip];
+ Y_ASSERT(res.UdpCongestion.Get() == nullptr);
+ res.UdpCongestion = new TCongestionControl;
+ TQueueStatsHash::iterator zq = PeerQueueStats.find(ip);
+ if (zq != PeerQueueStats.end()) {
+ res.UdpCongestion->AttachQueueStats(zq->second);
+ }
+ return res;
+ } else {
+ TPeerLink& res = CongestionTrack[z->first];
+ res = z->second;
+ CongestionTrackHistory.erase(z);
+ return res;
+ }
+ } else {
+ Y_ASSERT(CongestionTrackHistory.find(ip) == CongestionTrackHistory.end());
+ return z->second;
+ }
+ }
+ void SucceededSend(int id) {
+ SendResults.push_back(TSendResult(id, true));
+ }
+ void FailedSend(int id) {
+ SendResults.push_back(TSendResult(id, false));
+ }
+ void SendData(TList<TTransferKey>* order, float deltaT, bool needCheckAlive);
+ void RecvCycle();
+
+ public:
+ TUdpHost()
+ : CurrentT(0)
+ , IsWaiting(0)
+ , MaxWaitTime(DEFAULT_MAX_WAIT_TIME)
+ , MaxWaitTime2(DEFAULT_MAX_WAIT_TIME)
+ , IBIdleTime(0)
+ , TimeSinceCompletedQueueClean(0)
+ , TimeSinceCongestionHistoryUpdate(0)
+ {
+ }
+ ~TUdpHost() override {
+ for (TList<TRequest*>::const_iterator i = ReceivedList.begin(); i != ReceivedList.end(); ++i)
+ delete *i;
+ }
+
+ bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket);
+
+ TRequest* GetRequest() override {
+ if (ReceivedList.empty()) {
+ if (IB.Get()) {
+ return IB->GetRequest();
+ }
+ return nullptr;
+ }
+ TRequest* res = ReceivedList.front();
+ ReceivedList.pop_front();
+ return res;
+ }
+
+ void AddToSendOrder(const TTransferKey& transferKey, EPacketPriority pp) {
+ if (pp == PP_LOW)
+ SendOrderLow.push_back(transferKey);
+ else if (pp == PP_NORMAL)
+ SendOrder.push_back(transferKey);
+ else if (pp == PP_HIGH)
+ SendOrderHighPrior.push_back(transferKey);
+ else
+ Y_ASSERT(0);
+
+ CancelWait();
+ }
+
+ int Send(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data, int crc32, TGUID* packetGuid, EPacketPriority pp) override {
+ if (addr.Port == 0) {
+ // shortcut for broken addresses
+ if (packetGuid && packetGuid->IsEmpty())
+ CreateGuid(packetGuid);
+ int reqId = GetTransferId();
+ FailedSend(reqId);
+ return reqId;
+ }
+ TTransferKey transferKey;
+ transferKey.Address = addr;
+ transferKey.Id = GetTransferId();
+ Y_ASSERT(SendQueue.find(transferKey) == SendQueue.end());
+
+ TPeerLink& peerInfo = GetPeerLink(transferKey.Address);
+
+ TUdpOutTransfer& xfer = SendQueue[transferKey];
+ GetWinsockAddr(&xfer.ToAddress, transferKey.Address);
+ xfer.Crc32 = crc32;
+ xfer.PacketPriority = pp;
+ if (!packetGuid || packetGuid->IsEmpty()) {
+ CreateGuid(&xfer.PacketGuid);
+ if (packetGuid)
+ *packetGuid = xfer.PacketGuid;
+ } else {
+ xfer.PacketGuid = *packetGuid;
+ }
+ xfer.Data.Reset(data.Release());
+ xfer.AttachStats(&PendingDataStats);
+ xfer.AckTracker.AttachCongestionControl(peerInfo.UdpCongestion.Get());
+
+ bool isSentOverIB = false;
+ // we don't support priorities (=service levels in IB terms) currently
+ // so send only PP_NORMAL traffic over IB
+ if (pp == PP_NORMAL && peerInfo.IBPeer.Get() && xfer.Data->GetSharedData() == nullptr) {
+ TIBMsgHandle hndl = IB->Send(peerInfo.IBPeer, xfer.Data.Get(), xfer.PacketGuid);
+ if (hndl >= 0) {
+ IBKeyToTransferKey[hndl] = transferKey;
+ isSentOverIB = true;
+ } else {
+ // so we failed to use IB, ibPeer is either not connected yet or failed
+ if (peerInfo.IBPeer->GetState() == IIBPeer::FAILED) {
+ //printf("Disconnect failed IB peer\n");
+ peerInfo.IBPeer = nullptr;
+ }
+ }
+ }
+ if (!isSentOverIB) {
+ AddToSendOrder(transferKey, pp);
+ }
+
+ return transferKey.Id;
+ }
+
+ bool GetSendResult(TSendResult* res) override {
+ if (SendResults.empty()) {
+ if (IB.Get()) {
+ TIBSendResult sr;
+ if (IB->GetSendResult(&sr)) {
+ TIBtoTransferKeyHash::iterator z = IBKeyToTransferKey.find(sr.Handle);
+ if (z == IBKeyToTransferKey.end()) {
+ Y_VERIFY(0, "unknown handle returned from IB");
+ }
+ TTransferKey transferKey = z->second;
+ IBKeyToTransferKey.erase(z);
+
+ TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
+ if (i == SendQueue.end()) {
+ Y_VERIFY(0, "IBKeyToTransferKey refers nonexisting xfer");
+ }
+ if (sr.Success) {
+ TUdpOutTransfer& xfer = i->second;
+ xfer.AckTracker.MarkAlive(); // do we really need this?
+ *res = TSendResult(transferKey.Id, sr.Success);
+ SendQueue.erase(i);
+ return true;
+ } else {
+ //printf("IB send failed, fall back to regular network\n");
+ // Houston, we got a problem
+ // IB failed to send, try to use regular network
+ TUdpOutTransfer& xfer = i->second;
+ AddToSendOrder(transferKey, xfer.PacketPriority);
+ }
+ }
+ }
+ return false;
+ }
+ *res = SendResults.front();
+ SendResults.pop_front();
+ return true;
+ }
+
+ void Step() override;
+ void IBStep() override;
+
+ void Wait(float seconds) override {
+ if (seconds < 1e-3)
+ seconds = 0;
+ if (seconds > MaxWaitTime)
+ seconds = MaxWaitTime;
+ if (IBIdleTime < 0.010) {
+ seconds = 0;
+ }
+ if (seconds == 0) {
+ ThreadYield();
+ } else {
+ AtomicAdd(IsWaiting, 1);
+ if (seconds > MaxWaitTime2)
+ seconds = MaxWaitTime2;
+ MaxWaitTime2 = DEFAULT_MAX_WAIT_TIME;
+
+ if (seconds == 0) {
+ ThreadYield();
+ } else {
+ if (IB.Get()) {
+ for (float done = 0; done < seconds;) {
+ float deltaSleep = Min(seconds - done, 0.002f);
+ s.Wait(deltaSleep);
+ NHPTimer::STime tChk;
+ NHPTimer::GetTime(&tChk);
+ if (IB->Step(tChk)) {
+ IBIdleTime = 0;
+ break;
+ }
+ done += deltaSleep;
+ }
+ } else {
+ s.Wait(seconds);
+ }
+ }
+ AtomicAdd(IsWaiting, -1);
+ }
+ }
+
+ void CancelWait() override {
+ MaxWaitTime2 = 0;
+ if (AtomicAdd(IsWaiting, 0) == 1) {
+ s.SendSelfFakePacket();
+ }
+ }
+
+ void GetPendingDataSize(TRequesterPendingDataStats* res) override {
+ *res = PendingDataStats;
+#ifndef NDEBUG
+ TRequesterPendingDataStats chk;
+ for (TUdpOutXferHash::const_iterator i = SendQueue.begin(); i != SendQueue.end(); ++i) {
+ TRopeDataPacket* pckt = i->second.Data.Get();
+ if (pckt) {
+ chk.OutDataSize += pckt->GetSize();
+ ++chk.OutCount;
+ }
+ }
+ for (TUdpInXferHash::const_iterator i = RecvQueue.begin(); i != RecvQueue.end(); ++i) {
+ const TUdpInTransfer& tr = i->second;
+ for (int p = 0; p < tr.GetPacketCount(); ++p) {
+ if (tr.GetPacket(p)) {
+ chk.InpDataSize += tr.PacketSize;
+ }
+ }
+ ++chk.InpCount;
+ }
+ Y_ASSERT(memcmp(&chk, res, sizeof(chk)) == 0);
+#endif
+ }
+ TString GetDebugInfo() override;
+ TString GetPeerLinkDebug(const TPeerLinkHash& ch);
+ void Kill(const TUdpAddress& addr) override;
+ TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override;
+ };
+
+ bool TUdpHost::Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
+ if (s.IsValid()) {
+ Y_ASSERT(0);
+ return false;
+ }
+ s.Open(socket);
+ if (!s.IsValid())
+ return false;
+
+ if (IBDetection)
+ IB = CreateIBClientServer();
+
+ NHPTimer::GetTime(&CurrentT);
+ return true;
+ }
+
+ static bool HasAllPackets(const TUdpInTransfer& res) {
+ if (!res.HasLastPacket)
+ return false;
+ for (int i = res.GetPacketCount() - 1; i >= 0; --i) {
+ if (!res.GetPacket(i))
+ return false;
+ }
+ return true;
+ }
+
+ // grouped acks, first int - packet_id, second int - bit mask for 32 packets preceding packet_id
+ const int SIZEOF_ACK = 8;
+ static int WriteAck(TUdpInTransfer* p, int* dst, int maxAcks) {
+ int ackCount = 0;
+ if (p->NewPacketsToAck.size() > 1)
+ Sort(p->NewPacketsToAck.begin(), p->NewPacketsToAck.end());
+ int lastAcked = 0;
+ for (size_t idx = 0; idx < p->NewPacketsToAck.size(); ++idx) {
+ int pkt = p->NewPacketsToAck[idx];
+ if (idx == p->NewPacketsToAck.size() - 1 || pkt > lastAcked + 30) {
+ *dst++ = pkt;
+ int bitMask = 0;
+ int backPackets = Min(pkt, 32);
+ for (int k = 0; k < backPackets; ++k) {
+ if (p->GetPacket(pkt - k - 1))
+ bitMask |= 1 << k;
+ }
+ *dst++ = bitMask;
+ if (++ackCount >= maxAcks)
+ break;
+ lastAcked = pkt;
+ //printf("sending ack %d (mask %x)\n", pkt, bitMask);
+ }
+ }
+ p->NewPacketsToAck.clear();
+ return ackCount;
+ }
+
+ static void AckPacket(TUdpOutTransfer* p, int pkt, float deltaT, bool updateRTT) {
+ if (pkt < 0 || pkt >= p->PacketCount) {
+ Y_ASSERT(0);
+ return;
+ }
+ p->AckTracker.Ack(pkt, deltaT, updateRTT);
+ }
+
+ static void ReadAcks(TUdpOutTransfer* p, const int* acks, int ackCount, float deltaT) {
+ for (int i = 0; i < ackCount; ++i) {
+ int pkt = *acks++;
+ int bitMask = *acks++;
+ bool updateRTT = i == ackCount - 1; // update RTT using only last packet in the pack
+ AckPacket(p, pkt, deltaT, updateRTT);
+ for (int k = 0; k < 32; ++k) {
+ if (bitMask & (1 << k))
+ AckPacket(p, pkt - k - 1, deltaT, false);
+ }
+ }
+ }
+
+ using namespace NNetlibaSocket::NNetliba;
+
+ const ui64 KILL_PASSPHRASE1 = 0x98ff9cefb11d9a4cul;
+ const ui64 KILL_PASSPHRASE2 = 0xf7754c29e0be95eaul;
+
+ template <class T>
+ inline T Read(char** data) {
+ T res = ReadUnaligned<T>(*data);
+ *data += sizeof(T);
+ return res;
+ }
+ template <class T>
+ inline void Write(char** data, T res) {
+ WriteUnaligned<T>(*data, res);
+ *data += sizeof(T);
+ }
+
+ static void RequireResend(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) {
+ char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
+ Write(&pktData, transferId);
+ Write(&pktData, (char)ACK_RESEND);
+ Write(&pktData, attempt);
+ s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
+ }
+
+ static void RequireResendNoShmem(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) {
+ char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
+ Write(&pktData, transferId);
+ Write(&pktData, (char)ACK_RESEND_NOSHMEM);
+ Write(&pktData, attempt);
+ s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
+ }
+
+ static void AckComplete(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, const TGUID& packetGuid, int packetId) {
+ char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
+ Write(&pktData, transferId);
+ Write(&pktData, (char)ACK_COMPLETE);
+ Write(&pktData, packetGuid);
+ Write(&pktData, packetId); // we need packetId to update RTT
+ s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
+ }
+
+ static void SendPing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) {
+ char pktBuf[UDP_PACKET_SIZE_FULL];
+ char* pktData = pktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
+ if (NSan::MSanIsOn()) {
+ Zero(pktBuf);
+ }
+ Write(&pktData, (int)0);
+ Write(&pktData, (char)PING);
+ Write(&pktData, selfNetworkOrderPort);
+ s.SendTo(pktBuf, UDP_PACKET_SIZE_FULL, toAddress, FF_DONT_FRAG);
+ }
+
+ // not MTU discovery, just figure out IB address of the peer
+ static void SendFakePing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) {
+ char buf[100];
+ char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
+ Write(&pktData, (int)0);
+ Write(&pktData, (char)PING);
+ Write(&pktData, selfNetworkOrderPort);
+ s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
+ }
+
+ void TUdpHost::SendData(TList<TTransferKey>* order, float deltaT1, bool needCheckAlive) {
+ for (TList<TTransferKey>::iterator z = order->begin(); z != order->end();) {
+ // pick connection to send
+ const TTransferKey& transferKey = *z;
+ TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
+ if (i == SendQueue.end()) {
+ z = order->erase(z);
+ continue;
+ }
+ ++z;
+
+ // perform sending
+ int transferId = transferKey.Id;
+ TUdpOutTransfer& xfer = i->second;
+
+ if (!xfer.AckTracker.IsInitialized()) {
+ TIntrusivePtr<TCongestionControl> congestion = xfer.AckTracker.GetCongestionControl();
+ Y_ASSERT(congestion.Get() != nullptr);
+ if (!congestion->IsKnownMTU()) {
+ TLameMTUDiscovery* md = congestion->GetMTUDiscovery();
+ if (md->IsTimedOut()) {
+ congestion->SetMTU(UDP_PACKET_SIZE_SMALL);
+ } else {
+ if (md->CanSend()) {
+ SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort());
+ md->PingSent();
+ }
+ continue;
+ }
+ }
+ // try to use large mtu, we could have selected small mtu due to connectivity problems
+ if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL || IB.Get() != nullptr) {
+ // recheck every ~50mb
+ int chkDenom = (50000000 / xfer.Data->GetSize()) | 1;
+ if ((NetAckRnd() % chkDenom) == 0) {
+ //printf("send rechecking ping\n");
+ if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL) {
+ SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort());
+ } else {
+ SendFakePing(s, xfer.ToAddress, s.GetNetworkOrderPort());
+ }
+ }
+ }
+ xfer.PacketSize = congestion->GetMTU();
+ xfer.LastPacketSize = xfer.Data->GetSize() % xfer.PacketSize;
+ xfer.PacketCount = xfer.Data->GetSize() / xfer.PacketSize + 1;
+ xfer.AckTracker.SetPacketCount(xfer.PacketCount);
+ }
+
+ xfer.AckTracker.Step(deltaT1);
+ MaxWaitTime = Min(MaxWaitTime, xfer.AckTracker.GetTimeToNextPacketTimeout());
+ if (needCheckAlive && !xfer.AckTracker.IsAlive()) {
+ FailedSend(transferId);
+ SendQueue.erase(i);
+ continue;
+ }
+ bool sendBufferOverflow = false;
+ while (xfer.AckTracker.CanSend()) {
+ NHPTimer::STime tCopy = CurrentT;
+ float deltaT2 = (float)NHPTimer::GetTimePassed(&tCopy);
+ deltaT2 = ClampVal(deltaT2, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
+
+ int pkt = xfer.AckTracker.GetPacketToSend(deltaT2);
+ if (pkt == -1) {
+ break;
+ }
+
+ int dataSize = xfer.PacketSize;
+ if (pkt == xfer.PacketCount - 1)
+ dataSize = xfer.LastPacketSize;
+
+ char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
+ Write(&pktData, transferId);
+ char pktType = xfer.PacketSize == UDP_PACKET_SIZE ? DATA : DATA_SMALL;
+ TSharedMemory* shm = xfer.Data->GetSharedData();
+ if (shm) {
+ if (pktType == DATA)
+ pktType = DATA_SHMEM;
+ else
+ pktType = DATA_SMALL_SHMEM;
+ }
+ Write(&pktData, pktType);
+ Write(&pktData, xfer.Attempt);
+ Write(&pktData, pkt);
+ if (pkt == 0) {
+ Write(&pktData, xfer.PacketGuid);
+ Write(&pktData, xfer.Crc32);
+ if (shm) {
+ Write(&pktData, shm->GetId());
+ Write(&pktData, shm->GetSize());
+ }
+ }
+ TBlockChainIterator dataReader(xfer.Data->GetChain());
+ dataReader.Seek(pkt * xfer.PacketSize);
+ dataReader.Read(pktData, dataSize);
+ pktData += dataSize;
+ int sendSize = (int)(pktData - PktBuf);
+ TNetSocket::ESendError sendErr = s.SendTo(PktBuf, sendSize, xfer.ToAddress, FF_ALLOW_FRAG);
+ if (sendErr != TNetSocket::SEND_OK) {
+ if (sendErr == TNetSocket::SEND_NO_ROUTE_TO_HOST) {
+ FailedSend(transferId);
+ SendQueue.erase(i);
+ break;
+ } else {
+ // most probably out of send buffer space (or something terrible has happened)
+ xfer.AckTracker.AddToResend(pkt);
+ sendBufferOverflow = true;
+ MaxWaitTime = 0;
+ //printf("failed send\n");
+ break;
+ }
+ }
+ }
+ if (sendBufferOverflow)
+ break;
+ }
+ }
+
+ void TUdpHost::RecvCycle() {
+ for (;;) {
+ sockaddr_in6 fromAddress;
+ int rv = RecvBuf.GetBufSize();
+ bool recvOk = s.RecvFrom(RecvBuf.GetDataPtr(), &rv, &fromAddress);
+ if (!recvOk)
+ break;
+
+ NHPTimer::STime tCopy = CurrentT;
+ float deltaT = (float)NHPTimer::GetTimePassed(&tCopy);
+ deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
+
+ //int fromIP = fromAddress.sin_addr.s_addr;
+
+ TTransferKey k;
+ char* pktData = RecvBuf.GetDataPtr() + UDP_LOW_LEVEL_HEADER_SIZE;
+ GetUdpAddress(&k.Address, fromAddress);
+ k.Id = Read<int>(&pktData);
+ int transferId = k.Id;
+ int cmd = Read<char>(&pktData);
+ Y_ASSERT(cmd == (int)*(RecvBuf.GetDataPtr() + CMD_POS));
+ switch (cmd) {
+ case DATA:
+ case DATA_SMALL:
+ case DATA_SHMEM:
+ case DATA_SMALL_SHMEM: {
+ int attempt = Read<int>(&pktData);
+ int packetId = Read<int>(&pktData);
+ //printf("data packet %d (trans ID = %d)\n", packetId, transferId);
+ TUdpCompleteInXferHash::iterator itCompl = RecvCompleted.find(k);
+ if (itCompl != RecvCompleted.end()) {
+ Y_ASSERT(RecvQueue.find(k) == RecvQueue.end());
+ const TUdpCompleteInTransfer& complete = itCompl->second;
+ bool sendAckComplete = true;
+ if (packetId == 0) {
+ // check packet GUID
+ char* tmpPktData = pktData;
+ TGUID packetGuid;
+ packetGuid = Read<TGUID>(&tmpPktData);
+ if (packetGuid != complete.PacketGuid) {
+ // we are receiving new data with the same transferId
+ // in this case we have to flush all the information about previous transfer
+ // and start over
+ //printf("same transferId for a different packet\n");
+ RecvCompleted.erase(itCompl);
+ sendAckComplete = false;
+ }
+ }
+ if (sendAckComplete) {
+ AckComplete(s, fromAddress, transferId, complete.PacketGuid, packetId);
+ break;
+ }
+ }
+ TUdpInXferHash::iterator rq = RecvQueue.find(k);
+ if (rq == RecvQueue.end()) {
+ //printf("new input transfer\n");
+ TUdpInTransfer& res = RecvQueue[k];
+ res.ToAddress = fromAddress;
+ res.Attempt = attempt;
+ res.Congestion = GetPeerLink(k.Address).UdpCongestion.Get();
+ res.PacketSize = 0;
+ res.HasLastPacket = false;
+ res.AttachStats(&PendingDataStats);
+ rq = RecvQueue.find(k);
+ Y_ASSERT(rq != RecvQueue.end());
+ }
+ TUdpInTransfer& res = rq->second;
+ res.Congestion->MarkAlive();
+ res.TimeSinceLastRecv = 0;
+
+ if (packetId == 0) {
+ TGUID packetGuid;
+ packetGuid = Read<TGUID>(&pktData);
+ int crc32 = Read<int>(&pktData);
+ res.Crc32 = crc32;
+ res.PacketGuid = packetGuid;
+ if (cmd == DATA_SHMEM || cmd == DATA_SMALL_SHMEM) {
+ // link to attached shared memory
+ TGUID shmemId = Read<TGUID>(&pktData);
+ int shmemSize = Read<int>(&pktData);
+ if (res.SharedData.Get() == nullptr) {
+ res.SharedData = new TSharedMemory;
+ if (!res.SharedData->Open(shmemId, shmemSize)) {
+ res.SharedData = nullptr;
+ RequireResendNoShmem(s, res.ToAddress, transferId, res.Attempt);
+ break;
+ }
+ }
+ }
+ }
+ if (attempt != res.Attempt) {
+ RequireResend(s, res.ToAddress, transferId, res.Attempt);
+ break;
+ } else {
+ if (res.PacketSize == 0) {
+ res.PacketSize = (cmd == DATA || cmd == DATA_SHMEM ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL);
+ } else {
+ // check that all data is of same size
+ Y_ASSERT(cmd == DATA || cmd == DATA_SMALL);
+ Y_ASSERT(res.PacketSize == (cmd == DATA ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL));
+ }
+
+ int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
+
+ Y_ASSERT(dataSize <= res.PacketSize);
+ if (dataSize > res.PacketSize)
+ break; // mem overrun protection
+ if (packetId >= res.GetPacketCount())
+ res.SetPacketCount(packetId + 1);
+ {
+ TUdpRecvPacket* pkt = nullptr;
+ if (res.PacketSize == UDP_PACKET_SIZE_SMALL) {
+ // save memory by using smaller buffer at the cost of additional memcpy
+ pkt = RecvBuf.CreateNewSmallPacket(dataSize);
+ memcpy(pkt->Data, pktData, dataSize);
+ pkt->DataStart = 0;
+ pkt->DataSize = dataSize;
+ } else {
+ int dataStart = (int)(pktData - RecvBuf.GetDataPtr()); // data offset in the packet
+ pkt = RecvBuf.ExtractPacket();
+ pkt->DataStart = dataStart;
+ pkt->DataSize = dataSize;
+ }
+ // calc packet sum, will be used to calc whole message crc
+ pkt->BlockSum = TIncrementalChecksumCalcer::CalcBlockSum(pkt->Data + pkt->DataStart, pkt->DataSize);
+ res.AssignPacket(packetId, pkt);
+ }
+
+ if (dataSize != res.PacketSize) {
+ res.LastPacketSize = dataSize;
+ res.HasLastPacket = true;
+ }
+
+ if (HasAllPackets(res)) {
+ //printf("received\n");
+ TRequest* out = new TRequest;
+ out->Address = k.Address;
+ out->Guid = res.PacketGuid;
+ TIncrementalChecksumCalcer incCS;
+ int packetCount = res.GetPacketCount();
+ out->Data.Reset(new TRopeDataPacket);
+ for (int i = 0; i < packetCount; ++i) {
+ TUdpRecvPacket* pkt = res.ExtractPacket(i);
+ Y_ASSERT(pkt->DataSize == ((i == packetCount - 1) ? res.LastPacketSize : res.PacketSize));
+ out->Data->AddBlock((char*)pkt, pkt->Data + pkt->DataStart, pkt->DataSize);
+ incCS.AddBlockSum(pkt->BlockSum, pkt->DataSize);
+ }
+ out->Data->AttachSharedData(res.SharedData);
+ res.EraseAllPackets();
+
+ int crc32 = incCS.CalcChecksum(); // CalcChecksum(out->Data->GetChain());
+#ifdef SIMULATE_NETWORK_FAILURES
+ bool crcOk = crc32 == res.Crc32 ? (RandomNumber<size_t>() % 10) != 0 : false;
+#else
+ bool crcOk = crc32 == res.Crc32;
+#endif
+ if (crcOk) {
+ ReceivedList.push_back(out);
+ Y_ASSERT(RecvCompleted.find(k) == RecvCompleted.end());
+ TUdpCompleteInTransfer& complete = RecvCompleted[k];
+ RecvCompletedQueue.push_back(k);
+ complete.PacketGuid = res.PacketGuid;
+ AckComplete(s, res.ToAddress, transferId, complete.PacketGuid, packetId);
+ RecvQueue.erase(rq);
+ } else {
+ //printf("crc failed, require resend\n");
+ delete out;
+ ++res.Attempt;
+ res.NewPacketsToAck.clear();
+ RequireResend(s, res.ToAddress, transferId, res.Attempt);
+ }
+ } else {
+ res.NewPacketsToAck.push_back(packetId);
+ }
+ }
+ } break;
+ case ACK: {
+ TUdpOutXferHash::iterator i = SendQueue.find(k);
+ if (i == SendQueue.end())
+ break;
+ TUdpOutTransfer& xfer = i->second;
+ if (!xfer.AckTracker.IsInitialized())
+ break;
+ xfer.AckTracker.MarkAlive();
+ int attempt = Read<int>(&pktData);
+ Y_ASSERT(attempt <= xfer.Attempt);
+ if (attempt != xfer.Attempt)
+ break;
+ ReadAcks(&xfer, (int*)pktData, (int)(RecvBuf.GetDataPtr() + rv - pktData) / SIZEOF_ACK, deltaT);
+ break;
+ }
+ case ACK_COMPLETE: {
+ TUdpOutXferHash::iterator i = SendQueue.find(k);
+ if (i == SendQueue.end())
+ break;
+ TUdpOutTransfer& xfer = i->second;
+ xfer.AckTracker.MarkAlive();
+ TGUID packetGuid;
+ packetGuid = Read<TGUID>(&pktData);
+ int packetId = Read<int>(&pktData);
+ if (packetGuid == xfer.PacketGuid) {
+ xfer.AckTracker.Ack(packetId, deltaT, true); // update RTT
+ xfer.AckTracker.AckAll(); // acking packets is required, otherwise they will be treated as lost (look AckTracker destructor)
+ SucceededSend(transferId);
+ SendQueue.erase(i);
+ } else {
+ // peer asserts that he has received this packet but packetGuid is wrong
+ // try to resend everything
+ // ++xfer.Attempt; // should not do this, only sender can modify attempt number, otherwise cycle is possible with out of order packets
+ xfer.AckTracker.Resend();
+ }
+ break;
+ } break;
+ case ACK_RESEND: {
+ TUdpOutXferHash::iterator i = SendQueue.find(k);
+ if (i == SendQueue.end())
+ break;
+ TUdpOutTransfer& xfer = i->second;
+ xfer.AckTracker.MarkAlive();
+ int attempt = Read<int>(&pktData);
+ if (xfer.Attempt != attempt) {
+ // reset current tranfser & initialize new one
+ xfer.Attempt = attempt;
+ xfer.AckTracker.Resend();
+ }
+ break;
+ }
+ case ACK_RESEND_NOSHMEM: {
+ // abort execution here
+ // failed to open shmem on recv side, need to transmit data without using shmem
+ Y_VERIFY(0, "not implemented yet");
+ break;
+ }
+ case PING: {
+ sockaddr_in6 trueFromAddress = fromAddress;
+ int port = Read<int>(&pktData);
+ Y_ASSERT(trueFromAddress.sin6_family == AF_INET6);
+ trueFromAddress.sin6_port = port;
+ // can not set MTU for fromAddress here since asymmetrical mtu is possible
+ char* pktData2 = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
+ Write(&pktData2, (int)0);
+ Write(&pktData2, (char)PONG);
+ if (IB.Get()) {
+ const TIBConnectInfo& ibConnectInfo = IB->GetConnectInfo();
+ Write(&pktData2, ibConnectInfo);
+ Write(&pktData2, trueFromAddress);
+ }
+ s.SendTo(PktBuf, pktData2 - PktBuf, trueFromAddress, FF_ALLOW_FRAG);
+ break;
+ }
+ case PONG: {
+ TPeerLink& peerInfo = GetPeerLink(k.Address);
+ peerInfo.UdpCongestion->SetMTU(UDP_PACKET_SIZE);
+ int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
+ if (dataSize == sizeof(TIBConnectInfo) + sizeof(sockaddr_in6)) {
+ if (IB.Get() != nullptr && peerInfo.IBPeer.Get() == nullptr) {
+ TIBConnectInfo info = Read<TIBConnectInfo>(&pktData);
+ sockaddr_in6 myAddress = Read<sockaddr_in6>(&pktData);
+ TUdpAddress myUdpAddress;
+ GetUdpAddress(&myUdpAddress, myAddress);
+ peerInfo.IBPeer = IB->ConnectPeer(info, k.Address, myUdpAddress);
+ }
+ }
+ break;
+ }
+ case KILL: {
+ ui64 p1 = Read<ui64>(&pktData);
+ ui64 p2 = Read<ui64>(&pktData);
+ int restSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
+ if (restSize == 0 && p1 == KILL_PASSPHRASE1 && p2 == KILL_PASSPHRASE2) {
+ abort();
+ }
+ break;
+ }
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ }
+ }
+
+ void TUdpHost::IBStep() {
+ if (IB.Get()) {
+ NHPTimer::STime tChk = CurrentT;
+ float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk);
+ if (IB->Step(tChk)) {
+ IBIdleTime = -chkDeltaT;
+ }
+ }
+ }
+
+ void TUdpHost::Step() {
+ if (IB.Get()) {
+ NHPTimer::STime tChk = CurrentT;
+ float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk);
+ if (IB->Step(tChk)) {
+ IBIdleTime = -chkDeltaT;
+ }
+ if (chkDeltaT < 0.0005) {
+ return;
+ }
+ }
+
+ if (UseTOSforAcks) {
+ s.SetTOS(0x20);
+ } else {
+ s.SetTOS(0);
+ }
+
+ RecvCycle();
+
+ float deltaT = (float)NHPTimer::GetTimePassed(&CurrentT);
+ deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
+
+ MaxWaitTime = DEFAULT_MAX_WAIT_TIME;
+ IBIdleTime += deltaT;
+
+ bool needCheckAlive = false;
+
+ // update alive ports
+ const float INACTIVE_CONGESTION_UPDATE_INTERVAL = 1;
+ TimeSinceCongestionHistoryUpdate += deltaT;
+ if (TimeSinceCongestionHistoryUpdate > INACTIVE_CONGESTION_UPDATE_INTERVAL) {
+ for (TPeerLinkHash::iterator i = CongestionTrackHistory.begin(); i != CongestionTrackHistory.end();) {
+ TPeerLink& pl = i->second;
+ if (!pl.UpdateSleep(TimeSinceCongestionHistoryUpdate)) {
+ TPeerLinkHash::iterator k = i++;
+ CongestionTrackHistory.erase(k);
+ needCheckAlive = true;
+ } else {
+ ++i;
+ }
+ }
+ TimeSinceCongestionHistoryUpdate = 0;
+ }
+ for (TPeerLinkHash::iterator i = CongestionTrack.begin(); i != CongestionTrack.end();) {
+ const TUdpAddress& addr = i->first;
+ TPeerLink& pl = i->second;
+ if (pl.UdpCongestion->GetTransferCount() == 0) {
+ pl.StartSleep(addr, &MaxWaitTime);
+ CongestionTrackHistory[i->first] = i->second;
+ TPeerLinkHash::iterator k = i++;
+ CongestionTrack.erase(k);
+ } else if (!pl.Update(deltaT, addr, &MaxWaitTime)) {
+ TPeerLinkHash::iterator k = i++;
+ CongestionTrack.erase(k);
+ needCheckAlive = true;
+ } else {
+ ++i;
+ }
+ }
+
+ // send acks on received data
+ for (TUdpInXferHash::iterator i = RecvQueue.begin(); i != RecvQueue.end();) {
+ const TTransferKey& transKey = i->first;
+ int transferId = transKey.Id;
+ TUdpInTransfer& xfer = i->second;
+ xfer.TimeSinceLastRecv += deltaT;
+ if (xfer.TimeSinceLastRecv > UDP_MAX_INPUT_DATA_WAIT || (needCheckAlive && !xfer.Congestion->IsAlive())) {
+ TUdpInXferHash::iterator k = i++;
+ RecvQueue.erase(k);
+ continue;
+ }
+ Y_ASSERT(RecvCompleted.find(i->first) == RecvCompleted.end()); // state "Complete & incomplete" is incorrect
+ if (!xfer.NewPacketsToAck.empty()) {
+ char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
+ Write(&pktData, transferId);
+ Write(&pktData, (char)ACK);
+ Write(&pktData, xfer.Attempt);
+ int acks = WriteAck(&xfer, (int*)pktData, (int)(xfer.PacketSize - (pktData - PktBuf)) / SIZEOF_ACK);
+ pktData += acks * SIZEOF_ACK;
+ s.SendTo(PktBuf, (int)(pktData - PktBuf), xfer.ToAddress, FF_ALLOW_FRAG);
+ }
+ ++i;
+ }
+
+ if (UseTOSforAcks) {
+ s.SetTOS(0x60);
+ }
+
+ // send data for outbound connections
+ SendData(&SendOrderHighPrior, deltaT, needCheckAlive);
+ SendData(&SendOrder, deltaT, needCheckAlive);
+ SendData(&SendOrderLow, deltaT, needCheckAlive);
+
+ // roll send order to avoid exotic problems with lots of peers and high traffic
+ SendOrderHighPrior.splice(SendOrderHighPrior.end(), SendOrderHighPrior, SendOrderHighPrior.begin());
+ //SendOrder.splice(SendOrder.end(), SendOrder, SendOrder.begin()); // sending data in order has lower delay and shorter queue
+
+ // clean completed queue
+ TimeSinceCompletedQueueClean += deltaT;
+ if (TimeSinceCompletedQueueClean > UDP_TRANSFER_TIMEOUT * 1.5) {
+ for (size_t i = 0; i < KeepCompletedQueue.size(); ++i) {
+ TUdpCompleteInXferHash::iterator k = RecvCompleted.find(KeepCompletedQueue[i]);
+ if (k != RecvCompleted.end())
+ RecvCompleted.erase(k);
+ }
+ KeepCompletedQueue.clear();
+ KeepCompletedQueue.swap(RecvCompletedQueue);
+ TimeSinceCompletedQueueClean = 0;
+ }
+ }
+
+ TString TUdpHost::GetPeerLinkDebug(const TPeerLinkHash& ch) {
+ TString res;
+ char buf[1000];
+ for (const auto& i : ch) {
+ const TUdpAddress& ip = i.first;
+ const TCongestionControl& cc = *i.second.UdpCongestion;
+ IIBPeer* ibPeer = i.second.IBPeer.Get();
+ sprintf(buf, "%s\tIB: %d, RTT: %g Timeout: %g Window: %g MaxWin: %g FailRate: %g TimeSinceLastRecv: %g Transfers: %d MTU: %d\n",
+ GetAddressAsString(ip).c_str(),
+ ibPeer ? ibPeer->GetState() : -1,
+ cc.GetRTT() * 1000, cc.GetTimeout() * 1000, cc.GetWindow(), cc.GetMaxWindow(), cc.GetFailRate(),
+ cc.GetTimeSinceLastRecv() * 1000, cc.GetTransferCount(), cc.GetMTU());
+ res += buf;
+ }
+ return res;
+ }
+
+ TString TUdpHost::GetDebugInfo() {
+ TString res;
+ char buf[1000];
+ sprintf(buf, "Receiving %d msgs, sending %d high prior, %d regular msgs, %d low prior msgs\n",
+ RecvQueue.ysize(), (int)SendOrderHighPrior.size(), (int)SendOrder.size(), (int)SendOrderLow.size());
+ res += buf;
+
+ TRequesterPendingDataStats pds;
+ GetPendingDataSize(&pds);
+ sprintf(buf, "Pending data size: %" PRIu64 "\n", pds.InpDataSize + pds.OutDataSize);
+ res += buf;
+ sprintf(buf, " in packets: %d, size %" PRIu64 "\n", pds.InpCount, pds.InpDataSize);
+ res += buf;
+ sprintf(buf, " out packets: %d, size %" PRIu64 "\n", pds.OutCount, pds.OutDataSize);
+ res += buf;
+
+ res += "\nCongestion info:\n";
+ res += GetPeerLinkDebug(CongestionTrack);
+ res += "\nCongestion info history:\n";
+ res += GetPeerLinkDebug(CongestionTrackHistory);
+
+ return res;
+ }
+
+ static void SendKill(const TNetSocket& s, const sockaddr_in6& toAddress) {
+ char buf[100];
+ char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
+ Write(&pktData, (int)0);
+ Write(&pktData, (char)KILL);
+ Write(&pktData, KILL_PASSPHRASE1);
+ Write(&pktData, KILL_PASSPHRASE2);
+ s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
+ }
+
+ void TUdpHost::Kill(const TUdpAddress& addr) {
+ sockaddr_in6 target;
+ GetWinsockAddr(&target, addr);
+ SendKill(s, target);
+ }
+
+ TIntrusivePtr<IPeerQueueStats> TUdpHost::GetQueueStats(const TUdpAddress& addr) {
+ TQueueStatsHash::iterator zq = PeerQueueStats.find(addr);
+ if (zq != PeerQueueStats.end()) {
+ return zq->second.Get();
+ }
+ TPeerQueueStats* res = new TPeerQueueStats;
+ PeerQueueStats[addr] = res;
+ // attach to existing congestion tracker
+ TPeerLinkHash::iterator z;
+ z = CongestionTrack.find(addr);
+ if (z != CongestionTrack.end()) {
+ z->second.UdpCongestion->AttachQueueStats(res);
+ }
+ z = CongestionTrackHistory.find(addr);
+ if (z != CongestionTrackHistory.end()) {
+ z->second.UdpCongestion->AttachQueueStats(res);
+ }
+ return res;
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+
+ TIntrusivePtr<IUdpHost> CreateUdpHost(int port) {
+ TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateBestRecvSocket();
+ socket->Open(port);
+ if (!socket->IsValid())
+ return nullptr;
+ return CreateUdpHost(socket);
+ }
+
+ TIntrusivePtr<IUdpHost> CreateUdpHost(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
+ if (!InitLocalIPList()) {
+ Y_ASSERT(0 && "Can not determine self IP address");
+ return nullptr;
+ }
+ TIntrusivePtr<TUdpHost> res = new TUdpHost;
+ if (!res->Start(socket))
+ return nullptr;
+ return res.Get();
+ }
+
+ void SetUdpMaxBandwidthPerIP(float f) {
+ f = Max(0.0f, f);
+ TCongestionControl::MaxPacketRate = f / UDP_PACKET_SIZE;
+ }
+
+ void SetUdpSlowStart(bool enable) {
+ TCongestionControl::StartWindowSize = enable ? 0.5f : 3;
+ }
+
+ void DisableIBDetection() {
+ IBDetection = false;
+ }
+
+}