aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/poller_tcp_unit.h
blob: d3f0d0fd0a1aa2306c90e36ca8c09e92964cbca8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#pragma once

#include <util/system/thread.h>
#include <library/cpp/actors/util/funnel_queue.h>

#include "interconnect_stream.h"

#include <memory>
#include <functional>
#include <unordered_map>

namespace NInterconnect {
    using NActors::TFDDelegate; 
    using NActors::TSharedDescriptor; 

    class TPollerUnit { 
    public: 
        typedef std::unique_ptr<TPollerUnit> TPtr; 

        static TPtr Make(bool useSelect); 

        void Start(); 
        void Stop(); 

        virtual void StartReadOperation( 
            const TIntrusivePtr<TSharedDescriptor>& stream, 
            TFDDelegate&& operation); 

        virtual void StartWriteOperation( 
            const TIntrusivePtr<TSharedDescriptor>& stream, 
            TFDDelegate&& operation); 

        virtual ~TPollerUnit(); 

    private: 
        virtual void ProcessRead() = 0; 
        virtual void ProcessWrite() = 0; 

        template <bool IsWrite> 
        static void* IdleThread(void* param); 

        template <bool IsWrite> 
        void RunLoop(); 

        volatile bool StopFlag; 
        TThread ReadLoop, WriteLoop; 

    protected: 
        TPollerUnit(); 

        struct TSide { 
            using TOperations = 
                std::unordered_map<SOCKET, 
                                   std::pair<TIntrusivePtr<TSharedDescriptor>, TFDDelegate>>; 

            TOperations Operations; 
            using TItem = TOperations::mapped_type; 
            TFunnelQueue<TItem> InputQueue; 

            void ProcessInput(); 
        } Read, Write; 

        template <bool IsWrite> 
        TSide& GetSide(); 
    }; 

}