diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/net_acks.cpp | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/net_acks.cpp')
-rw-r--r-- | library/cpp/netliba/v6/net_acks.cpp | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/net_acks.cpp b/library/cpp/netliba/v6/net_acks.cpp new file mode 100644 index 0000000000..5f4690c264 --- /dev/null +++ b/library/cpp/netliba/v6/net_acks.cpp @@ -0,0 +1,194 @@ +#include "stdafx.h" +#include "net_acks.h" +#include <util/datetime/cputimer.h> + +#include <atomic> + +namespace NNetliba { + const float RTT_AVERAGE_OVER = 15; + + float TCongestionControl::StartWindowSize = 3; + float TCongestionControl::MaxPacketRate = 0; // unlimited + + bool UseTOSforAcks = false; //true;// + + void EnableUseTOSforAcks(bool enable) { + UseTOSforAcks = enable; + } + + float CONG_CTRL_CHANNEL_INFLATE = 1; + + void SetCongCtrlChannelInflate(float inflate) { + CONG_CTRL_CHANNEL_INFLATE = inflate; + } + + ////////////////////////////////////////////////////////////////////////// + TPingTracker::TPingTracker() + : AvrgRTT(CONG_CTRL_INITIAL_RTT) + , AvrgRTT2(CONG_CTRL_INITIAL_RTT * CONG_CTRL_INITIAL_RTT) + , RTTCount(0) + { + } + + void TPingTracker::RegisterRTT(float rtt) { + Y_ASSERT(rtt > 0); + float keep = RTTCount / (RTTCount + 1); + AvrgRTT *= keep; + AvrgRTT += (1 - keep) * rtt; + AvrgRTT2 *= keep; + AvrgRTT2 += (1 - keep) * Sqr(rtt); + RTTCount = Min(RTTCount + 1, RTT_AVERAGE_OVER); + //static int n; + //if ((++n % 1024) == 0) + // printf("Average RTT = %g (sko = %g)\n", GetRTT() * 1000, GetRTTSKO() * 1000); + } + + void TPingTracker::IncreaseRTT() { + const float F_RTT_DECAY_RATE = 1.1f; + AvrgRTT *= F_RTT_DECAY_RATE; + AvrgRTT2 *= Sqr(F_RTT_DECAY_RATE); + } + + ////////////////////////////////////////////////////////////////////////// + void TAckTracker::Resend() { + CurrentPacket = 0; + for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i) + Congestion->Failure(); // not actually correct but simplifies logic a lot + PacketsInFly.clear(); + DroppedPackets.clear(); + ResendQueue.clear(); + for (size_t i = 0; i < AckReceived.size(); ++i) + AckReceived[i] = false; + } + + int TAckTracker::SelectPacket() { + if (!ResendQueue.empty()) { + int res = ResendQueue.back(); + ResendQueue.pop_back(); + //printf("resending packet %d\n", res); + return res; + } + if (CurrentPacket == PacketCount) { + return -1; + } + return CurrentPacket++; + } + + TAckTracker::~TAckTracker() { + for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i) + Congestion->Failure(); + // object will be incorrect state after this (failed packets are not added to resend queue), but who cares + } + + int TAckTracker::GetPacketToSend(float deltaT) { + int res = SelectPacket(); + if (res == -1) { + // needed to count time even if we don't have anything to send + Congestion->HasTriedToSend(); + return res; + } + Congestion->LaunchPacket(); + PacketsInFly[res] = -deltaT; // deltaT is time since last Step(), so for the timing to be correct we should subtract it + return res; + } + + // called on SendTo() failure + void TAckTracker::AddToResend(int pkt) { + //printf("AddToResend(%d)\n", pkt); + TPacketHash::iterator i = PacketsInFly.find(pkt); + if (i != PacketsInFly.end()) { + PacketsInFly.erase(i); + Congestion->FailureOnSend(); + ResendQueue.push_back(pkt); + } else + Y_ASSERT(0); + } + + void TAckTracker::Ack(int pkt, float deltaT, bool updateRTT) { + Y_ASSERT(pkt >= 0 && pkt < PacketCount); + if (AckReceived[pkt]) + return; + AckReceived[pkt] = true; + //printf("Ack received for %d\n", pkt); + TPacketHash::iterator i = PacketsInFly.find(pkt); + if (i == PacketsInFly.end()) { + for (size_t k = 0; k < ResendQueue.size(); ++k) { + if (ResendQueue[k] == pkt) { + ResendQueue[k] = ResendQueue.back(); + ResendQueue.pop_back(); + break; + } + } + TPacketHash::iterator z = DroppedPackets.find(pkt); + if (z != DroppedPackets.end()) { + // late packet arrived + if (updateRTT) { + float ping = z->second + deltaT; + Congestion->RegisterRTT(ping); + } + DroppedPackets.erase(z); + } else { + // Y_ASSERT(0); // ack on nonsent packet, possible in resend scenario + } + return; + } + if (updateRTT) { + float ping = i->second + deltaT; + //printf("Register RTT %g\n", ping * 1000); + Congestion->RegisterRTT(ping); + } + PacketsInFly.erase(i); + Congestion->Success(); + } + + void TAckTracker::AckAll() { + for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i) { + int pkt = i->first; + AckReceived[pkt] = true; + Congestion->Success(); + } + PacketsInFly.clear(); + } + + void TAckTracker::Step(float deltaT) { + float timeoutVal = Congestion->GetTimeout(); + + //static int n; + //if ((++n % 1024) == 0) + // printf("timeout = %g, window = %g, fail_rate %g, pkt_rate = %g\n", timeoutVal * 1000, Congestion->GetWindow(), Congestion->GetFailRate(), (1 - Congestion->GetFailRate()) * Congestion->GetWindow() / Congestion->GetRTT()); + + TimeToNextPacketTimeout = 1000; + // для окон меньше единицы мы кидаем рандом один раз за RTT на то, можно ли пускать пакет + // поэтому можно ждать максимум RTT, после этого надо кинуть новый random + if (Congestion->GetWindow() < 1) + TimeToNextPacketTimeout = Congestion->GetRTT(); + + for (auto& droppedPacket : DroppedPackets) { + float& t = droppedPacket.second; + t += deltaT; + } + + for (TPacketHash::iterator i = PacketsInFly.begin(); i != PacketsInFly.end();) { + float& t = i->second; + t += deltaT; + if (t > timeoutVal) { + //printf("packet %d timed out (timeout = %g)\n", i->first, timeoutVal); + ResendQueue.push_back(i->first); + DroppedPackets[i->first] = i->second; + TPacketHash::iterator k = i++; + PacketsInFly.erase(k); + Congestion->Failure(); + } else { + TimeToNextPacketTimeout = Min(TimeToNextPacketTimeout, timeoutVal - t); + ++i; + } + } + } + + static std::atomic<ui32> netAckRndVal = (ui32)GetCycleCount(); + ui32 NetAckRnd() { + const auto nextNetAckRndVal = static_cast<ui32>(((ui64)netAckRndVal.load(std::memory_order_acquire) * 279470273) % 4294967291); + netAckRndVal.store(nextNetAckRndVal, std::memory_order_release); + return nextNetAckRndVal; + } +} |