aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/v6/ib_mem.h
blob: 5adcc59dd7e9ceb03bda44979f82b36d4b722c88 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#pragma once

#include "block_chain.h"
#include <util/thread/lfqueue.h>
#include <util/system/thread.h>

namespace NNetliba {
    // registered memory blocks
    class TMemoryRegion;
    class TIBContext;

    class TIBMemPool;
    struct TIBMemSuperBlock: public TThrRefBase, TNonCopyable {
        TIntrusivePtr<TIBMemPool> Pool;
        size_t SzLog;
        TAtomic UseCount;
        TIntrusivePtr<TMemoryRegion> MemRegion;

        TIBMemSuperBlock(TIBMemPool* pool, size_t szLog);
        ~TIBMemSuperBlock() override;
        char* GetData();
        size_t GetSize() {
            return ((ui64)1) << SzLog;
        }
        void IncRef() {
            AtomicAdd(UseCount, 1);
        }
        void DecRef();
    };

    class TIBMemBlock: public TThrRefBase, TNonCopyable {
        TIntrusivePtr<TIBMemSuperBlock> Super;
        char* Data;
        size_t Size;

        ~TIBMemBlock() override;

    public:
        TIBMemBlock(TPtrArg<TIBMemSuperBlock> super, char* data, size_t sz)
            : Super(super)
            , Data(data)
            , Size(sz)
        {
            Super->IncRef();
        }
        TIBMemBlock(size_t sz)
            : Super(nullptr)
            , Size(sz)
        {
            // not really IB mem block, but useful IB code debug without IB
            Data = new char[sz];
        }
        char* GetData() {
            return Data;
        }
        ui64 GetAddr() {
            return reinterpret_cast<ui64>(Data) / sizeof(char);
        }
        size_t GetSize() {
            return Size;
        }
        TMemoryRegion* GetMemRegion() {
            return Super.Get() ? Super->MemRegion.Get() : nullptr;
        }
    };

    const size_t IB_MEM_LARGE_BLOCK_LN = 20;
    const size_t IB_MEM_LARGE_BLOCK = 1ul << IB_MEM_LARGE_BLOCK_LN;
    const size_t IB_MEM_POOL_SIZE = 1024 * 1024 * 1024;

    class TIBMemPool: public TThrRefBase, TNonCopyable {
    public:
        struct TCopyResultStorage;

    private:
        class TIBMemSuperBlockPtr {
            TIntrusivePtr<TIBMemSuperBlock> Blk;

        public:
            ~TIBMemSuperBlockPtr() {
                Detach();
            }
            void Assign(TIntrusivePtr<TIBMemSuperBlock> p) {
                Detach();
                Blk = p;
                if (p.Get()) {
                    AtomicAdd(p->UseCount, 1);
                }
            }
            void Detach() {
                if (Blk.Get()) {
                    Blk->DecRef();
                    Blk = nullptr;
                }
            }
            TIBMemSuperBlock* Get() {
                return Blk.Get();
            }
        };

        TIntrusivePtr<TIBContext> IBCtx;
        THashMap<size_t, TVector<TIntrusivePtr<TIBMemSuperBlock>>> AllocCache;
        size_t AllocCacheSize;
        TIBMemSuperBlockPtr CurrentBlk;
        int CurrentOffset;
        TMutex CacheLock;
        TThread WorkThread;
        TSystemEvent HasStarted;
        bool KeepRunning;

        struct TJobItem {
            TRopeDataPacket* Data;
            i64 MsgHandle;
            TIntrusivePtr<TThrRefBase> Context;
            TIntrusivePtr<TIBMemBlock> Block;
            TIntrusivePtr<TCopyResultStorage> ResultStorage;

            TJobItem(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst)
                : Data(data)
                , MsgHandle(msgHandle)
                , Context(context)
                , ResultStorage(dst)
            {
            }
        };

        TLockFreeQueue<TJobItem*> Requests;
        TSystemEvent HasWork;

        static void* ThreadFunc(void* param);

        void Return(TPtrArg<TIBMemSuperBlock> blk);
        TIntrusivePtr<TIBMemSuperBlock> AllocSuper(size_t sz);
        ~TIBMemPool() override;

    public:
        struct TCopyResultStorage: public TThrRefBase {
            TLockFreeStack<TJobItem*> Results;

            ~TCopyResultStorage() override {
                TJobItem* work;
                while (Results.Dequeue(&work)) {
                    delete work;
                }
            }
            template <class T>
            bool GetCopyResult(TIntrusivePtr<TIBMemBlock>* resBlock, i64* resMsgHandle, TIntrusivePtr<T>* context) {
                TJobItem* work;
                if (Results.Dequeue(&work)) {
                    *resBlock = work->Block;
                    *resMsgHandle = work->MsgHandle;
                    *context = static_cast<T*>(work->Context.Get()); // caller responsibility to make sure this makes sense
                    delete work;
                    return true;
                } else {
                    return false;
                }
            }
        };

    public:
        TIBMemPool(TPtrArg<TIBContext> ctx);
        TIBContext* GetIBContext() {
            return IBCtx.Get();
        }
        TIBMemBlock* Alloc(size_t sz);

        void CopyData(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst) {
            Requests.Enqueue(new TJobItem(data, msgHandle, context, dst));
            HasWork.Signal();
        }

        friend class TIBMemBlock;
        friend struct TIBMemSuperBlock;
    };

    extern TIntrusivePtr<TIBMemPool> GetIBMemPool();
}