diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/socket/packet_queue.h | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/netliba/socket/packet_queue.h')
-rw-r--r-- | library/cpp/netliba/socket/packet_queue.h | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/library/cpp/netliba/socket/packet_queue.h b/library/cpp/netliba/socket/packet_queue.h new file mode 100644 index 0000000000..58a84709c2 --- /dev/null +++ b/library/cpp/netliba/socket/packet_queue.h @@ -0,0 +1,97 @@ +#pragma once + +#include "udp_recv_packet.h" + +#include <library/cpp/threading/chunk_queue/queue.h> + +#include <util/network/init.h> +#include <library/cpp/deprecated/atomic/atomic.h> +#include <util/system/event.h> +#include <util/system/yassert.h> +#include <library/cpp/deprecated/atomic/atomic_ops.h> +#include <utility> + +namespace NNetlibaSocket { + struct TPacketMeta { + sockaddr_in6 RemoteAddr; + sockaddr_in6 MyAddr; + }; + + template <size_t TTNumWriterThreads> + class TLockFreePacketQueue { + private: + enum { MAX_PACKETS_IN_QUEUE = INT_MAX, + CMD_QUEUE_RESERVE = 1 << 20, + MAX_DATA_IN_QUEUE = 32 << 20 }; + + typedef std::pair<TUdpRecvPacket*, TPacketMeta> TPacket; + typedef std::conditional_t<TTNumWriterThreads == 1, NThreading::TOneOneQueue<TPacket>, NThreading::TManyOneQueue<TPacket, TTNumWriterThreads>> TImpl; + + mutable TImpl Queue; + mutable TSystemEvent QueueEvent; + + mutable TAtomic NumPackets; + TAtomic DataSize; + + public: + TLockFreePacketQueue() + : NumPackets(0) + , DataSize(0) + { + } + + ~TLockFreePacketQueue() { + TPacket packet; + while (Queue.Dequeue(packet)) { + delete packet.first; + } + } + + bool IsDataPartFull() const { + return (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE - CMD_QUEUE_RESERVE); + } + + bool Push(TUdpRecvPacket* packet, const TPacketMeta& meta) { + // simulate OS behavior on buffer overflow - drop packets. + // yeah it contains small data race (we can add little bit more packets, but nobody cares) + if (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE) { + return false; + } + AtomicAdd(NumPackets, 1); + AtomicAdd(DataSize, packet->DataSize); + Y_ASSERT(packet->DataStart == 0); + + Queue.Enqueue(TPacket(std::make_pair(packet, meta))); + QueueEvent.Signal(); + return true; + } + + bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) { + TPacket p; + if (!Queue.Dequeue(p)) { + QueueEvent.Reset(); + if (!Queue.Dequeue(p)) { + return false; + } + QueueEvent.Signal(); + } + *packet = p.first; + *srcAddr = p.second.RemoteAddr; + *dstAddr = p.second.MyAddr; + + AtomicSub(NumPackets, 1); + AtomicSub(DataSize, (*packet)->DataSize); + Y_ASSERT(AtomicGet(NumPackets) >= 0 && AtomicGet(DataSize) >= 0); + + return true; + } + + bool IsEmpty() const { + return AtomicAdd(NumPackets, 0) == 0; + } + + TSystemEvent& GetEvent() const { + return QueueEvent; + } + }; +} |