aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/v6/ib_mem.h
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/ib_mem.h
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/ib_mem.h')
-rw-r--r--library/cpp/netliba/v6/ib_mem.h178
1 files changed, 178 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/ib_mem.h b/library/cpp/netliba/v6/ib_mem.h
new file mode 100644
index 0000000000..dfa5b9cd5f
--- /dev/null
+++ b/library/cpp/netliba/v6/ib_mem.h
@@ -0,0 +1,178 @@
+#pragma once
+
+#include "block_chain.h"
+#include <util/thread/lfqueue.h>
+#include <util/system/thread.h>
+
+namespace NNetliba {
+ // registered memory blocks
+ class TMemoryRegion;
+ class TIBContext;
+
+ class TIBMemPool;
+ struct TIBMemSuperBlock: public TThrRefBase, TNonCopyable {
+ TIntrusivePtr<TIBMemPool> Pool;
+ size_t SzLog;
+ TAtomic UseCount;
+ TIntrusivePtr<TMemoryRegion> MemRegion;
+
+ TIBMemSuperBlock(TIBMemPool* pool, size_t szLog);
+ ~TIBMemSuperBlock() override;
+ char* GetData();
+ size_t GetSize() {
+ return ((ui64)1) << SzLog;
+ }
+ void IncRef() {
+ AtomicAdd(UseCount, 1);
+ }
+ void DecRef();
+ };
+
+ class TIBMemBlock: public TThrRefBase, TNonCopyable {
+ TIntrusivePtr<TIBMemSuperBlock> Super;
+ char* Data;
+ size_t Size;
+
+ ~TIBMemBlock() override;
+
+ public:
+ TIBMemBlock(TPtrArg<TIBMemSuperBlock> super, char* data, size_t sz)
+ : Super(super)
+ , Data(data)
+ , Size(sz)
+ {
+ Super->IncRef();
+ }
+ TIBMemBlock(size_t sz)
+ : Super(nullptr)
+ , Size(sz)
+ {
+ // not really IB mem block, but useful IB code debug without IB
+ Data = new char[sz];
+ }
+ char* GetData() {
+ return Data;
+ }
+ ui64 GetAddr() {
+ return Data - (char*)nullptr;
+ }
+ size_t GetSize() {
+ return Size;
+ }
+ TMemoryRegion* GetMemRegion() {
+ return Super.Get() ? Super->MemRegion.Get() : nullptr;
+ }
+ };
+
+ const size_t IB_MEM_LARGE_BLOCK_LN = 20;
+ const size_t IB_MEM_LARGE_BLOCK = 1ul << IB_MEM_LARGE_BLOCK_LN;
+ const size_t IB_MEM_POOL_SIZE = 1024 * 1024 * 1024;
+
+ class TIBMemPool: public TThrRefBase, TNonCopyable {
+ public:
+ struct TCopyResultStorage;
+
+ private:
+ class TIBMemSuperBlockPtr {
+ TIntrusivePtr<TIBMemSuperBlock> Blk;
+
+ public:
+ ~TIBMemSuperBlockPtr() {
+ Detach();
+ }
+ void Assign(TIntrusivePtr<TIBMemSuperBlock> p) {
+ Detach();
+ Blk = p;
+ if (p.Get()) {
+ AtomicAdd(p->UseCount, 1);
+ }
+ }
+ void Detach() {
+ if (Blk.Get()) {
+ Blk->DecRef();
+ Blk = nullptr;
+ }
+ }
+ TIBMemSuperBlock* Get() {
+ return Blk.Get();
+ }
+ };
+
+ TIntrusivePtr<TIBContext> IBCtx;
+ THashMap<size_t, TVector<TIntrusivePtr<TIBMemSuperBlock>>> AllocCache;
+ size_t AllocCacheSize;
+ TIBMemSuperBlockPtr CurrentBlk;
+ int CurrentOffset;
+ TMutex CacheLock;
+ TThread WorkThread;
+ TSystemEvent HasStarted;
+ bool KeepRunning;
+
+ struct TJobItem {
+ TRopeDataPacket* Data;
+ i64 MsgHandle;
+ TIntrusivePtr<TThrRefBase> Context;
+ TIntrusivePtr<TIBMemBlock> Block;
+ TIntrusivePtr<TCopyResultStorage> ResultStorage;
+
+ TJobItem(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst)
+ : Data(data)
+ , MsgHandle(msgHandle)
+ , Context(context)
+ , ResultStorage(dst)
+ {
+ }
+ };
+
+ TLockFreeQueue<TJobItem*> Requests;
+ TSystemEvent HasWork;
+
+ static void* ThreadFunc(void* param);
+
+ void Return(TPtrArg<TIBMemSuperBlock> blk);
+ TIntrusivePtr<TIBMemSuperBlock> AllocSuper(size_t sz);
+ ~TIBMemPool() override;
+
+ public:
+ struct TCopyResultStorage: public TThrRefBase {
+ TLockFreeStack<TJobItem*> Results;
+
+ ~TCopyResultStorage() override {
+ TJobItem* work;
+ while (Results.Dequeue(&work)) {
+ delete work;
+ }
+ }
+ template <class T>
+ bool GetCopyResult(TIntrusivePtr<TIBMemBlock>* resBlock, i64* resMsgHandle, TIntrusivePtr<T>* context) {
+ TJobItem* work;
+ if (Results.Dequeue(&work)) {
+ *resBlock = work->Block;
+ *resMsgHandle = work->MsgHandle;
+ *context = static_cast<T*>(work->Context.Get()); // caller responsibility to make sure this makes sense
+ delete work;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ public:
+ TIBMemPool(TPtrArg<TIBContext> ctx);
+ TIBContext* GetIBContext() {
+ return IBCtx.Get();
+ }
+ TIBMemBlock* Alloc(size_t sz);
+
+ void CopyData(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst) {
+ Requests.Enqueue(new TJobItem(data, msgHandle, context, dst));
+ HasWork.Signal();
+ }
+
+ friend class TIBMemBlock;
+ friend struct TIBMemSuperBlock;
+ };
+
+ extern TIntrusivePtr<TIBMemPool> GetIBMemPool();
+}