#pragma once #include "block_chain.h" #include <util/thread/lfqueue.h> #include <util/system/thread.h> namespace NNetliba { // registered memory blocks class TMemoryRegion; class TIBContext; class TIBMemPool; struct TIBMemSuperBlock: public TThrRefBase, TNonCopyable { TIntrusivePtr<TIBMemPool> Pool; size_t SzLog; TAtomic UseCount; TIntrusivePtr<TMemoryRegion> MemRegion; TIBMemSuperBlock(TIBMemPool* pool, size_t szLog); ~TIBMemSuperBlock() override; char* GetData(); size_t GetSize() { return ((ui64)1) << SzLog; } void IncRef() { AtomicAdd(UseCount, 1); } void DecRef(); }; class TIBMemBlock: public TThrRefBase, TNonCopyable { TIntrusivePtr<TIBMemSuperBlock> Super; char* Data; size_t Size; ~TIBMemBlock() override; public: TIBMemBlock(TPtrArg<TIBMemSuperBlock> super, char* data, size_t sz) : Super(super) , Data(data) , Size(sz) { Super->IncRef(); } TIBMemBlock(size_t sz) : Super(nullptr) , Size(sz) { // not really IB mem block, but useful IB code debug without IB Data = new char[sz]; } char* GetData() { return Data; } ui64 GetAddr() { return reinterpret_cast<ui64>(Data) / sizeof(char); } size_t GetSize() { return Size; } TMemoryRegion* GetMemRegion() { return Super.Get() ? Super->MemRegion.Get() : nullptr; } }; const size_t IB_MEM_LARGE_BLOCK_LN = 20; const size_t IB_MEM_LARGE_BLOCK = 1ul << IB_MEM_LARGE_BLOCK_LN; const size_t IB_MEM_POOL_SIZE = 1024 * 1024 * 1024; class TIBMemPool: public TThrRefBase, TNonCopyable { public: struct TCopyResultStorage; private: class TIBMemSuperBlockPtr { TIntrusivePtr<TIBMemSuperBlock> Blk; public: ~TIBMemSuperBlockPtr() { Detach(); } void Assign(TIntrusivePtr<TIBMemSuperBlock> p) { Detach(); Blk = p; if (p.Get()) { AtomicAdd(p->UseCount, 1); } } void Detach() { if (Blk.Get()) { Blk->DecRef(); Blk = nullptr; } } TIBMemSuperBlock* Get() { return Blk.Get(); } }; TIntrusivePtr<TIBContext> IBCtx; THashMap<size_t, TVector<TIntrusivePtr<TIBMemSuperBlock>>> AllocCache; size_t AllocCacheSize; TIBMemSuperBlockPtr CurrentBlk; int CurrentOffset; TMutex CacheLock; TThread WorkThread; TSystemEvent HasStarted; bool KeepRunning; struct TJobItem { TRopeDataPacket* Data; i64 MsgHandle; TIntrusivePtr<TThrRefBase> Context; TIntrusivePtr<TIBMemBlock> Block; TIntrusivePtr<TCopyResultStorage> ResultStorage; TJobItem(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst) : Data(data) , MsgHandle(msgHandle) , Context(context) , ResultStorage(dst) { } }; TLockFreeQueue<TJobItem*> Requests; TSystemEvent HasWork; static void* ThreadFunc(void* param); void Return(TPtrArg<TIBMemSuperBlock> blk); TIntrusivePtr<TIBMemSuperBlock> AllocSuper(size_t sz); ~TIBMemPool() override; public: struct TCopyResultStorage: public TThrRefBase { TLockFreeStack<TJobItem*> Results; ~TCopyResultStorage() override { TJobItem* work; while (Results.Dequeue(&work)) { delete work; } } template <class T> bool GetCopyResult(TIntrusivePtr<TIBMemBlock>* resBlock, i64* resMsgHandle, TIntrusivePtr<T>* context) { TJobItem* work; if (Results.Dequeue(&work)) { *resBlock = work->Block; *resMsgHandle = work->MsgHandle; *context = static_cast<T*>(work->Context.Get()); // caller responsibility to make sure this makes sense delete work; return true; } else { return false; } } }; public: TIBMemPool(TPtrArg<TIBContext> ctx); TIBContext* GetIBContext() { return IBCtx.Get(); } TIBMemBlock* Alloc(size_t sz); void CopyData(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst) { Requests.Enqueue(new TJobItem(data, msgHandle, context, dst)); HasWork.Signal(); } friend class TIBMemBlock; friend struct TIBMemSuperBlock; }; extern TIntrusivePtr<TIBMemPool> GetIBMemPool(); }