aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/v6/udp_test.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/udp_test.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/udp_test.cpp')
-rw-r--r--library/cpp/netliba/v6/udp_test.cpp161
1 files changed, 161 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/udp_test.cpp b/library/cpp/netliba/v6/udp_test.cpp
new file mode 100644
index 0000000000..d0af51a368
--- /dev/null
+++ b/library/cpp/netliba/v6/udp_test.cpp
@@ -0,0 +1,161 @@
+#include "stdafx.h"
+#include "udp_test.h"
+#include "udp_client_server.h"
+#include "udp_http.h"
+#include "cpu_affinity.h"
+#include <util/system/hp_timer.h>
+#include <util/datetime/cputimer.h>
+#include <util/random/random.h>
+#include <util/random/fast.h>
+
+namespace NNetliba {
+ //static void PacketLevelTest(bool client)
+ //{
+ // int port = client ? 0 : 13013;
+ // TIntrusivePtr<IUdpHost> host = CreateUdpHost(&port);
+ //
+ // if(host == 0) {
+ // exit(-1);
+ // }
+ // TUdpAddress serverAddr = CreateAddress("localhost", 13013);
+ // vector<char> dummyPacket;
+ // dummyPacket.resize(10000);
+ // srand(GetCycleCount());
+ //
+ // for (int i = 0; i < dummyPacket.size(); ++i)
+ // dummyPacket[i] = rand();
+ // bool cont = true, hasReply = true;
+ // int reqCount = 1;
+ // for (int i = 0; cont; ++i) {
+ // host->Step();
+ // if (client) {
+ // //while (host->HasPendingData(serverAddr))
+ // // Sleep(0);
+ // if (hasReply) {
+ // printf("request %d\n", reqCount);
+ // *(int*)&dummyPacket[0] = reqCount;
+ // host->Send(serverAddr, dummyPacket, 0, PP_NORMAL);
+ // hasReply = false;
+ // ++reqCount;
+ // }else
+ // sleep(0);
+ //
+ // TRequest *req;
+ // while (req = host->GetRequest()) {
+ // int n = *(int*)&req->Data[0];
+ // printf("received response %d\n", n);
+ // Y_ASSERT(memcmp(&req->Data[4], &dummyPacket[4], dummyPacket.size() - 4) == 0);
+ // delete req;
+ // hasReply = true;
+ // }
+ // TSendResult sr;
+ // while (host->GetSendResult(&sr)) {
+ // if (!sr.Success) {
+ // printf("Send failed!\n");
+ // //Sleep(INFINITE);
+ // hasReply = true;
+ // }
+ // }
+ // } else {
+ // while (TRequest *req = host->GetRequest()) {
+ // int n = *(int*)&req->Data[0];
+ // printf("responding %d\n", n);
+ // host->Send(req->Address, req->Data, 0, PP_NORMAL);
+ // delete req;
+ // }
+ // TSendResult sr;
+ // while (host->GetSendResult(&sr)) {
+ // if (!sr.Success) {
+ // printf("Send failed!\n");
+ // sleep(0);
+ // }
+ // }
+ // sleep(0);
+ // }
+ // }
+ //}
+
+ static void SessionLevelTest(bool client, const char* serverName, int packetSize, int packetsInFly, int srcPort) {
+ BindToSocket(0);
+ TIntrusivePtr<IRequester> reqHost;
+ // reqHost = CreateHttpUdpRequester(13013);
+ reqHost = CreateHttpUdpRequester(client ? srcPort : 13013);
+ TUdpAddress serverAddr = CreateAddress(serverName, 13013);
+ TVector<char> dummyPacket;
+ dummyPacket.resize(packetSize);
+ TReallyFastRng32 rr((unsigned int)GetCycleCount());
+ for (size_t i = 0; i < dummyPacket.size(); ++i)
+ dummyPacket[i] = (char)rr.Uniform(256);
+ bool cont = true;
+ NHPTimer::STime t;
+ NHPTimer::GetTime(&t);
+ THashMap<TGUID, bool, TGUIDHash> seenReqs;
+ if (client) {
+ THashMap<TGUID, bool, TGUIDHash> reqList;
+ int packetsSentCount = 0;
+ TUdpHttpRequest* udpReq;
+ for (int i = 1; cont; ++i) {
+ for (;;) {
+ udpReq = reqHost->GetRequest();
+ if (udpReq == nullptr)
+ break;
+ udpReq->Data.resize(10);
+ reqHost->SendResponse(udpReq->ReqId, &udpReq->Data);
+ delete udpReq;
+ }
+ while (TUdpHttpResponse* res = reqHost->GetResponse()) {
+ THashMap<TGUID, bool, TGUIDHash>::iterator z = reqList.find(res->ReqId);
+ if (z == reqList.end()) {
+ printf("Unexpected response\n");
+ abort();
+ }
+ reqList.erase(z);
+ if (res->Ok) {
+ ++packetsSentCount;
+ //Y_ASSERT(res->Data == dummyPacket);
+ NHPTimer::STime tChk = t;
+ if (NHPTimer::GetTimePassed(&tChk) > 1) {
+ printf("packet size = %d\n", dummyPacket.ysize());
+ double passedTime = NHPTimer::GetTimePassed(&t);
+ double rate = packetsSentCount / passedTime;
+ printf("packet rate %g, transfer %gmb\n", rate, rate * dummyPacket.size() / 1000000);
+ packetsSentCount = 0;
+ }
+ } else {
+ printf("Failed request!\n");
+ //Sleep(INFINITE);
+ }
+ delete res;
+ }
+ while (reqList.ysize() < packetsInFly) {
+ *(int*)&dummyPacket[0] = i;
+ TVector<char> fakePacket = dummyPacket;
+ TGUID req2 = reqHost->SendRequest(serverAddr, "blaxuz", &fakePacket);
+ reqList[req2];
+ }
+ reqHost->GetAsyncEvent().Wait();
+ }
+ } else {
+ TUdpHttpRequest* req;
+ for (;;) {
+ req = reqHost->GetRequest();
+ if (req) {
+ if (seenReqs.find(req->ReqId) != seenReqs.end()) {
+ printf("Request %s recieved twice!\n", GetGuidAsString(req->ReqId).c_str());
+ }
+ seenReqs[req->ReqId];
+ req->Data.resize(10);
+ reqHost->SendResponse(req->ReqId, &req->Data);
+ delete req;
+ } else {
+ reqHost->GetAsyncEvent().Wait();
+ }
+ }
+ }
+ }
+
+ void RunUdpTest(bool client, const char* serverName, int packetSize, int packetsInFly, int srcPort) {
+ //PacketLevelTest(client);
+ SessionLevelTest(client, serverName, packetSize, packetsInFly, srcPort);
+ }
+}