1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
#pragma once
#include "udp_recv_packet.h"
#include <library/cpp/threading/chunk_queue/queue.h>
#include <util/network/init.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/event.h>
#include <util/system/yassert.h>
#include <library/cpp/deprecated/atomic/atomic_ops.h>
#include <utility>
namespace NNetlibaSocket {
struct TPacketMeta {
sockaddr_in6 RemoteAddr;
sockaddr_in6 MyAddr;
};
template <size_t TTNumWriterThreads>
class TLockFreePacketQueue {
private:
static constexpr int MAX_PACKETS_IN_QUEUE = INT_MAX;
static constexpr int CMD_QUEUE_RESERVE = 1 << 20;
static constexpr int MAX_DATA_IN_QUEUE = 32 << 20;
typedef std::pair<TUdpRecvPacket*, TPacketMeta> TPacket;
typedef std::conditional_t<TTNumWriterThreads == 1, NThreading::TOneOneQueue<TPacket>, NThreading::TManyOneQueue<TPacket, TTNumWriterThreads>> TImpl;
mutable TImpl Queue;
mutable TSystemEvent QueueEvent;
mutable TAtomic NumPackets;
TAtomic DataSize;
public:
TLockFreePacketQueue()
: NumPackets(0)
, DataSize(0)
{
}
~TLockFreePacketQueue() {
TPacket packet;
while (Queue.Dequeue(packet)) {
delete packet.first;
}
}
bool IsDataPartFull() const {
return (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE - CMD_QUEUE_RESERVE);
}
bool Push(TUdpRecvPacket* packet, const TPacketMeta& meta) {
// simulate OS behavior on buffer overflow - drop packets.
// yeah it contains small data race (we can add little bit more packets, but nobody cares)
if (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE) {
return false;
}
AtomicAdd(NumPackets, 1);
AtomicAdd(DataSize, packet->DataSize);
Y_ASSERT(packet->DataStart == 0);
Queue.Enqueue(TPacket(std::make_pair(packet, meta)));
QueueEvent.Signal();
return true;
}
bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) {
TPacket p;
if (!Queue.Dequeue(p)) {
QueueEvent.Reset();
if (!Queue.Dequeue(p)) {
return false;
}
QueueEvent.Signal();
}
*packet = p.first;
*srcAddr = p.second.RemoteAddr;
*dstAddr = p.second.MyAddr;
AtomicSub(NumPackets, 1);
AtomicSub(DataSize, (*packet)->DataSize);
Y_ASSERT(AtomicGet(NumPackets) >= 0 && AtomicGet(DataSize) >= 0);
return true;
}
bool IsEmpty() const {
return AtomicAdd(NumPackets, 0) == 0;
}
TSystemEvent& GetEvent() const {
return QueueEvent;
}
};
}
|