aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/socket/packet_queue.h
blob: a81e956862fadad4da85cff5226d2b6d2fbd7858 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#pragma once

#include "udp_recv_packet.h"

#include <library/cpp/threading/chunk_queue/queue.h>

#include <util/network/init.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/event.h>
#include <util/system/yassert.h>
#include <library/cpp/deprecated/atomic/atomic_ops.h>
#include <utility>

namespace NNetlibaSocket {
    struct TPacketMeta {
        sockaddr_in6 RemoteAddr;
        sockaddr_in6 MyAddr;
    };

    template <size_t TTNumWriterThreads>
    class TLockFreePacketQueue {
    private:
        static constexpr int MAX_PACKETS_IN_QUEUE = INT_MAX;
        static constexpr int CMD_QUEUE_RESERVE = 1 << 20;
        static constexpr int MAX_DATA_IN_QUEUE = 32 << 20;

        typedef std::pair<TUdpRecvPacket*, TPacketMeta> TPacket;
        typedef std::conditional_t<TTNumWriterThreads == 1, NThreading::TOneOneQueue<TPacket>, NThreading::TManyOneQueue<TPacket, TTNumWriterThreads>> TImpl;

        mutable TImpl Queue;
        mutable TSystemEvent QueueEvent;

        mutable TAtomic NumPackets;
        TAtomic DataSize;

    public:
        TLockFreePacketQueue()
            : NumPackets(0)
            , DataSize(0)
        {
        }

        ~TLockFreePacketQueue() {
            TPacket packet;
            while (Queue.Dequeue(packet)) {
                delete packet.first;
            }
        }

        bool IsDataPartFull() const {
            return (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE - CMD_QUEUE_RESERVE);
        }

        bool Push(TUdpRecvPacket* packet, const TPacketMeta& meta) {
            // simulate OS behavior on buffer overflow - drop packets.
            // yeah it contains small data race (we can add little bit more packets, but nobody cares)
            if (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE) {
                return false;
            }
            AtomicAdd(NumPackets, 1);
            AtomicAdd(DataSize, packet->DataSize);
            Y_ASSERT(packet->DataStart == 0);

            Queue.Enqueue(TPacket(std::make_pair(packet, meta)));
            QueueEvent.Signal();
            return true;
        }

        bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) {
            TPacket p;
            if (!Queue.Dequeue(p)) {
                QueueEvent.Reset();
                if (!Queue.Dequeue(p)) {
                    return false;
                }
                QueueEvent.Signal();
            }
            *packet = p.first;
            *srcAddr = p.second.RemoteAddr;
            *dstAddr = p.second.MyAddr;

            AtomicSub(NumPackets, 1);
            AtomicSub(DataSize, (*packet)->DataSize);
            Y_ASSERT(AtomicGet(NumPackets) >= 0 && AtomicGet(DataSize) >= 0);

            return true;
        }

        bool IsEmpty() const {
            return AtomicAdd(NumPackets, 0) == 0;
        }

        TSystemEvent& GetEvent() const {
            return QueueEvent;
        }
    };
}