aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/storage.h
blob: 694ea2fcfe1958302ce315bb77c9b22164d7d1d7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#pragma once

#include "message_ptr_and_header.h"
#include "moved.h"
#include "ybus.h"

#include <contrib/libs/sparsehash/src/sparsehash/dense_hash_map>

#include <util/generic/deque.h>
#include <util/generic/noncopyable.h>
#include <util/generic/utility.h>

namespace NBus { 
    namespace NPrivate { 
        typedef TVector<TBusMessage*> TMessagesPtrs; 

        class TTimedMessages { 
        public: 
            TTimedMessages(); 
            ~TTimedMessages(); 

            struct TItem { 
                THolder<TBusMessage> Message; 

                void Swap(TItem& that) { 
                    DoSwap(Message, that.Message); 
                } 
            }; 

            typedef TDeque<TMoved<TItem>> TItems; 

            void PushBack(TNonDestroyingAutoPtr<TBusMessage> m); 
            TNonDestroyingAutoPtr<TBusMessage> PopFront(); 
            bool Empty() const; 
            size_t Size() const; 

            void Timeout(TInstant before, TMessagesPtrs* r); 
            void Clear(TMessagesPtrs* r); 

        private: 
            TItems Items; 
        }; 

        class TSyncAckMessages : TNonCopyable { 
        public: 
            TSyncAckMessages(); 
            ~TSyncAckMessages(); 

            void Push(TBusMessagePtrAndHeader& m); 
            TBusMessage* Pop(TBusKey id); 

            void Timeout(TInstant before, TMessagesPtrs* r); 

            void Clear(TMessagesPtrs* r); 

            size_t Size() const { 
                return KeyToMessage.size(); 
            } 

            void RemoveAll(const TMessagesPtrs&); 

            void Gc(); 

            void DumpState(); 

        private: 
            struct TTimedItem { 
                TBusKey Key; 
                TBusInstant SendTime; 
            }; 

            typedef TDeque<TTimedItem> TTimedItems; 
            typedef TDeque<TTimedItem>::iterator TTimedIterator; 

            TTimedItems TimedItems; 

            struct TValue { 
                TBusMessage* Message; 
            }; 

            // keys are already random, no need to hash them further 
            struct TIdHash { 
                size_t operator()(TBusKey value) const { 
                    return value; 
                } 
            }; 

            typedef google::dense_hash_map<TBusKey, TValue, TIdHash> TKeyToMessage;

            TKeyToMessage KeyToMessage; 
        };

    } 
}