diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/ib_mem.h | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/ib_mem.h')
-rw-r--r-- | library/cpp/netliba/v6/ib_mem.h | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/ib_mem.h b/library/cpp/netliba/v6/ib_mem.h new file mode 100644 index 0000000000..dfa5b9cd5f --- /dev/null +++ b/library/cpp/netliba/v6/ib_mem.h @@ -0,0 +1,178 @@ +#pragma once + +#include "block_chain.h" +#include <util/thread/lfqueue.h> +#include <util/system/thread.h> + +namespace NNetliba { + // registered memory blocks + class TMemoryRegion; + class TIBContext; + + class TIBMemPool; + struct TIBMemSuperBlock: public TThrRefBase, TNonCopyable { + TIntrusivePtr<TIBMemPool> Pool; + size_t SzLog; + TAtomic UseCount; + TIntrusivePtr<TMemoryRegion> MemRegion; + + TIBMemSuperBlock(TIBMemPool* pool, size_t szLog); + ~TIBMemSuperBlock() override; + char* GetData(); + size_t GetSize() { + return ((ui64)1) << SzLog; + } + void IncRef() { + AtomicAdd(UseCount, 1); + } + void DecRef(); + }; + + class TIBMemBlock: public TThrRefBase, TNonCopyable { + TIntrusivePtr<TIBMemSuperBlock> Super; + char* Data; + size_t Size; + + ~TIBMemBlock() override; + + public: + TIBMemBlock(TPtrArg<TIBMemSuperBlock> super, char* data, size_t sz) + : Super(super) + , Data(data) + , Size(sz) + { + Super->IncRef(); + } + TIBMemBlock(size_t sz) + : Super(nullptr) + , Size(sz) + { + // not really IB mem block, but useful IB code debug without IB + Data = new char[sz]; + } + char* GetData() { + return Data; + } + ui64 GetAddr() { + return Data - (char*)nullptr; + } + size_t GetSize() { + return Size; + } + TMemoryRegion* GetMemRegion() { + return Super.Get() ? Super->MemRegion.Get() : nullptr; + } + }; + + const size_t IB_MEM_LARGE_BLOCK_LN = 20; + const size_t IB_MEM_LARGE_BLOCK = 1ul << IB_MEM_LARGE_BLOCK_LN; + const size_t IB_MEM_POOL_SIZE = 1024 * 1024 * 1024; + + class TIBMemPool: public TThrRefBase, TNonCopyable { + public: + struct TCopyResultStorage; + + private: + class TIBMemSuperBlockPtr { + TIntrusivePtr<TIBMemSuperBlock> Blk; + + public: + ~TIBMemSuperBlockPtr() { + Detach(); + } + void Assign(TIntrusivePtr<TIBMemSuperBlock> p) { + Detach(); + Blk = p; + if (p.Get()) { + AtomicAdd(p->UseCount, 1); + } + } + void Detach() { + if (Blk.Get()) { + Blk->DecRef(); + Blk = nullptr; + } + } + TIBMemSuperBlock* Get() { + return Blk.Get(); + } + }; + + TIntrusivePtr<TIBContext> IBCtx; + THashMap<size_t, TVector<TIntrusivePtr<TIBMemSuperBlock>>> AllocCache; + size_t AllocCacheSize; + TIBMemSuperBlockPtr CurrentBlk; + int CurrentOffset; + TMutex CacheLock; + TThread WorkThread; + TSystemEvent HasStarted; + bool KeepRunning; + + struct TJobItem { + TRopeDataPacket* Data; + i64 MsgHandle; + TIntrusivePtr<TThrRefBase> Context; + TIntrusivePtr<TIBMemBlock> Block; + TIntrusivePtr<TCopyResultStorage> ResultStorage; + + TJobItem(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst) + : Data(data) + , MsgHandle(msgHandle) + , Context(context) + , ResultStorage(dst) + { + } + }; + + TLockFreeQueue<TJobItem*> Requests; + TSystemEvent HasWork; + + static void* ThreadFunc(void* param); + + void Return(TPtrArg<TIBMemSuperBlock> blk); + TIntrusivePtr<TIBMemSuperBlock> AllocSuper(size_t sz); + ~TIBMemPool() override; + + public: + struct TCopyResultStorage: public TThrRefBase { + TLockFreeStack<TJobItem*> Results; + + ~TCopyResultStorage() override { + TJobItem* work; + while (Results.Dequeue(&work)) { + delete work; + } + } + template <class T> + bool GetCopyResult(TIntrusivePtr<TIBMemBlock>* resBlock, i64* resMsgHandle, TIntrusivePtr<T>* context) { + TJobItem* work; + if (Results.Dequeue(&work)) { + *resBlock = work->Block; + *resMsgHandle = work->MsgHandle; + *context = static_cast<T*>(work->Context.Get()); // caller responsibility to make sure this makes sense + delete work; + return true; + } else { + return false; + } + } + }; + + public: + TIBMemPool(TPtrArg<TIBContext> ctx); + TIBContext* GetIBContext() { + return IBCtx.Get(); + } + TIBMemBlock* Alloc(size_t sz); + + void CopyData(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst) { + Requests.Enqueue(new TJobItem(data, msgHandle, context, dst)); + HasWork.Signal(); + } + + friend class TIBMemBlock; + friend struct TIBMemSuperBlock; + }; + + extern TIntrusivePtr<TIBMemPool> GetIBMemPool(); +} |