aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/v6/net_acks.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/net_acks.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/net_acks.cpp')
-rw-r--r--library/cpp/netliba/v6/net_acks.cpp194
1 files changed, 194 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/net_acks.cpp b/library/cpp/netliba/v6/net_acks.cpp
new file mode 100644
index 0000000000..5f4690c264
--- /dev/null
+++ b/library/cpp/netliba/v6/net_acks.cpp
@@ -0,0 +1,194 @@
+#include "stdafx.h"
+#include "net_acks.h"
+#include <util/datetime/cputimer.h>
+
+#include <atomic>
+
+namespace NNetliba {
+ const float RTT_AVERAGE_OVER = 15;
+
+ float TCongestionControl::StartWindowSize = 3;
+ float TCongestionControl::MaxPacketRate = 0; // unlimited
+
+ bool UseTOSforAcks = false; //true;//
+
+ void EnableUseTOSforAcks(bool enable) {
+ UseTOSforAcks = enable;
+ }
+
+ float CONG_CTRL_CHANNEL_INFLATE = 1;
+
+ void SetCongCtrlChannelInflate(float inflate) {
+ CONG_CTRL_CHANNEL_INFLATE = inflate;
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ TPingTracker::TPingTracker()
+ : AvrgRTT(CONG_CTRL_INITIAL_RTT)
+ , AvrgRTT2(CONG_CTRL_INITIAL_RTT * CONG_CTRL_INITIAL_RTT)
+ , RTTCount(0)
+ {
+ }
+
+ void TPingTracker::RegisterRTT(float rtt) {
+ Y_ASSERT(rtt > 0);
+ float keep = RTTCount / (RTTCount + 1);
+ AvrgRTT *= keep;
+ AvrgRTT += (1 - keep) * rtt;
+ AvrgRTT2 *= keep;
+ AvrgRTT2 += (1 - keep) * Sqr(rtt);
+ RTTCount = Min(RTTCount + 1, RTT_AVERAGE_OVER);
+ //static int n;
+ //if ((++n % 1024) == 0)
+ // printf("Average RTT = %g (sko = %g)\n", GetRTT() * 1000, GetRTTSKO() * 1000);
+ }
+
+ void TPingTracker::IncreaseRTT() {
+ const float F_RTT_DECAY_RATE = 1.1f;
+ AvrgRTT *= F_RTT_DECAY_RATE;
+ AvrgRTT2 *= Sqr(F_RTT_DECAY_RATE);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ void TAckTracker::Resend() {
+ CurrentPacket = 0;
+ for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i)
+ Congestion->Failure(); // not actually correct but simplifies logic a lot
+ PacketsInFly.clear();
+ DroppedPackets.clear();
+ ResendQueue.clear();
+ for (size_t i = 0; i < AckReceived.size(); ++i)
+ AckReceived[i] = false;
+ }
+
+ int TAckTracker::SelectPacket() {
+ if (!ResendQueue.empty()) {
+ int res = ResendQueue.back();
+ ResendQueue.pop_back();
+ //printf("resending packet %d\n", res);
+ return res;
+ }
+ if (CurrentPacket == PacketCount) {
+ return -1;
+ }
+ return CurrentPacket++;
+ }
+
+ TAckTracker::~TAckTracker() {
+ for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i)
+ Congestion->Failure();
+ // object will be incorrect state after this (failed packets are not added to resend queue), but who cares
+ }
+
+ int TAckTracker::GetPacketToSend(float deltaT) {
+ int res = SelectPacket();
+ if (res == -1) {
+ // needed to count time even if we don't have anything to send
+ Congestion->HasTriedToSend();
+ return res;
+ }
+ Congestion->LaunchPacket();
+ PacketsInFly[res] = -deltaT; // deltaT is time since last Step(), so for the timing to be correct we should subtract it
+ return res;
+ }
+
+ // called on SendTo() failure
+ void TAckTracker::AddToResend(int pkt) {
+ //printf("AddToResend(%d)\n", pkt);
+ TPacketHash::iterator i = PacketsInFly.find(pkt);
+ if (i != PacketsInFly.end()) {
+ PacketsInFly.erase(i);
+ Congestion->FailureOnSend();
+ ResendQueue.push_back(pkt);
+ } else
+ Y_ASSERT(0);
+ }
+
+ void TAckTracker::Ack(int pkt, float deltaT, bool updateRTT) {
+ Y_ASSERT(pkt >= 0 && pkt < PacketCount);
+ if (AckReceived[pkt])
+ return;
+ AckReceived[pkt] = true;
+ //printf("Ack received for %d\n", pkt);
+ TPacketHash::iterator i = PacketsInFly.find(pkt);
+ if (i == PacketsInFly.end()) {
+ for (size_t k = 0; k < ResendQueue.size(); ++k) {
+ if (ResendQueue[k] == pkt) {
+ ResendQueue[k] = ResendQueue.back();
+ ResendQueue.pop_back();
+ break;
+ }
+ }
+ TPacketHash::iterator z = DroppedPackets.find(pkt);
+ if (z != DroppedPackets.end()) {
+ // late packet arrived
+ if (updateRTT) {
+ float ping = z->second + deltaT;
+ Congestion->RegisterRTT(ping);
+ }
+ DroppedPackets.erase(z);
+ } else {
+ // Y_ASSERT(0); // ack on nonsent packet, possible in resend scenario
+ }
+ return;
+ }
+ if (updateRTT) {
+ float ping = i->second + deltaT;
+ //printf("Register RTT %g\n", ping * 1000);
+ Congestion->RegisterRTT(ping);
+ }
+ PacketsInFly.erase(i);
+ Congestion->Success();
+ }
+
+ void TAckTracker::AckAll() {
+ for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i) {
+ int pkt = i->first;
+ AckReceived[pkt] = true;
+ Congestion->Success();
+ }
+ PacketsInFly.clear();
+ }
+
+ void TAckTracker::Step(float deltaT) {
+ float timeoutVal = Congestion->GetTimeout();
+
+ //static int n;
+ //if ((++n % 1024) == 0)
+ // printf("timeout = %g, window = %g, fail_rate %g, pkt_rate = %g\n", timeoutVal * 1000, Congestion->GetWindow(), Congestion->GetFailRate(), (1 - Congestion->GetFailRate()) * Congestion->GetWindow() / Congestion->GetRTT());
+
+ TimeToNextPacketTimeout = 1000;
+ // для окон меньше единицы мы кидаем рандом один раз за RTT на то, можно ли пускать пакет
+ // поэтому можно ждать максимум RTT, после этого надо кинуть новый random
+ if (Congestion->GetWindow() < 1)
+ TimeToNextPacketTimeout = Congestion->GetRTT();
+
+ for (auto& droppedPacket : DroppedPackets) {
+ float& t = droppedPacket.second;
+ t += deltaT;
+ }
+
+ for (TPacketHash::iterator i = PacketsInFly.begin(); i != PacketsInFly.end();) {
+ float& t = i->second;
+ t += deltaT;
+ if (t > timeoutVal) {
+ //printf("packet %d timed out (timeout = %g)\n", i->first, timeoutVal);
+ ResendQueue.push_back(i->first);
+ DroppedPackets[i->first] = i->second;
+ TPacketHash::iterator k = i++;
+ PacketsInFly.erase(k);
+ Congestion->Failure();
+ } else {
+ TimeToNextPacketTimeout = Min(TimeToNextPacketTimeout, timeoutVal - t);
+ ++i;
+ }
+ }
+ }
+
+ static std::atomic<ui32> netAckRndVal = (ui32)GetCycleCount();
+ ui32 NetAckRnd() {
+ const auto nextNetAckRndVal = static_cast<ui32>(((ui64)netAckRndVal.load(std::memory_order_acquire) * 279470273) % 4294967291);
+ netAckRndVal.store(nextNetAckRndVal, std::memory_order_release);
+ return nextNetAckRndVal;
+ }
+}