summaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/v6/ib_mem.cpp
diff options
context:
space:
mode:
authormonster <[email protected]>2022-07-07 14:41:37 +0300
committermonster <[email protected]>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/ib_mem.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/ib_mem.cpp')
-rw-r--r--library/cpp/netliba/v6/ib_mem.cpp167
1 files changed, 167 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/ib_mem.cpp b/library/cpp/netliba/v6/ib_mem.cpp
new file mode 100644
index 00000000000..1e7f55ac578
--- /dev/null
+++ b/library/cpp/netliba/v6/ib_mem.cpp
@@ -0,0 +1,167 @@
+#include "stdafx.h"
+#include "ib_mem.h"
+#include "ib_low.h"
+#include "cpu_affinity.h"
+
+#if defined(_unix_)
+#include <pthread.h>
+#endif
+
+namespace NNetliba {
+ TIBMemSuperBlock::TIBMemSuperBlock(TIBMemPool* pool, size_t szLog)
+ : Pool(pool)
+ , SzLog(szLog)
+ , UseCount(0)
+ {
+ size_t sz = GetSize();
+ MemRegion = new TMemoryRegion(pool->GetIBContext(), sz);
+ //printf("Alloc super block, size %" PRId64 "\n", sz);
+ }
+
+ TIBMemSuperBlock::~TIBMemSuperBlock() {
+ Y_ASSERT(AtomicGet(UseCount) == 0);
+ }
+
+ char* TIBMemSuperBlock::GetData() {
+ return MemRegion->GetData();
+ }
+
+ void TIBMemSuperBlock::DecRef() {
+ if (AtomicAdd(UseCount, -1) == 0) {
+ Pool->Return(this);
+ }
+ }
+
+ TIBMemBlock::~TIBMemBlock() {
+ if (Super.Get()) {
+ Super->DecRef();
+ } else {
+ delete[] Data;
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ TIBMemPool::TIBMemPool(TPtrArg<TIBContext> ctx)
+ : IBCtx(ctx)
+ , AllocCacheSize(0)
+ , CurrentOffset(IB_MEM_LARGE_BLOCK)
+ , WorkThread(TThread::TParams(ThreadFunc, (void*)this).SetName("nl6_ib_mem_pool"))
+ , KeepRunning(true)
+ {
+ WorkThread.Start();
+ HasStarted.Wait();
+ }
+
+ TIBMemPool::~TIBMemPool() {
+ Y_ASSERT(WorkThread.Running());
+ KeepRunning = false;
+ HasWork.Signal();
+ WorkThread.Join();
+ {
+ TJobItem* work = nullptr;
+ while (Requests.Dequeue(&work)) {
+ delete work;
+ }
+ }
+ }
+
+ TIntrusivePtr<TIBMemSuperBlock> TIBMemPool::AllocSuper(size_t szArg) {
+ // assume CacheLock is taken
+ size_t szLog = 12;
+ while ((((size_t)1) << szLog) < szArg) {
+ ++szLog;
+ }
+ TIntrusivePtr<TIBMemSuperBlock> super;
+ {
+ TVector<TIntrusivePtr<TIBMemSuperBlock>>& cc = AllocCache[szLog];
+ if (!cc.empty()) {
+ super = cc.back();
+ cc.resize(cc.size() - 1);
+ AllocCacheSize -= 1ll << super->SzLog;
+ }
+ }
+ if (super.Get() == nullptr) {
+ super = new TIBMemSuperBlock(this, szLog);
+ }
+ return super;
+ }
+
+ TIBMemBlock* TIBMemPool::Alloc(size_t sz) {
+ TGuard<TMutex> gg(CacheLock);
+ if (sz > IB_MEM_LARGE_BLOCK) {
+ TIntrusivePtr<TIBMemSuperBlock> super = AllocSuper(sz);
+ return new TIBMemBlock(super, super->GetData(), sz);
+ } else {
+ if (CurrentOffset + sz > IB_MEM_LARGE_BLOCK) {
+ CurrentBlk.Assign(AllocSuper(IB_MEM_LARGE_BLOCK));
+ CurrentOffset = 0;
+ }
+ CurrentOffset += sz;
+ return new TIBMemBlock(CurrentBlk.Get(), CurrentBlk.Get()->GetData() + CurrentOffset - sz, sz);
+ }
+ }
+
+ void TIBMemPool::Return(TPtrArg<TIBMemSuperBlock> blk) {
+ TGuard<TMutex> gg(CacheLock);
+ Y_ASSERT(AtomicGet(blk->UseCount) == 0);
+ size_t sz = 1ull << blk->SzLog;
+ if (sz + AllocCacheSize > IB_MEM_POOL_SIZE) {
+ AllocCache.clear();
+ AllocCacheSize = 0;
+ }
+ {
+ TVector<TIntrusivePtr<TIBMemSuperBlock>>& cc = AllocCache[blk->SzLog];
+ cc.push_back(blk.Get());
+ AllocCacheSize += sz;
+ }
+ }
+
+ void* TIBMemPool::ThreadFunc(void* param) {
+ BindToSocket(0);
+ SetHighestThreadPriority();
+ TIBMemPool* pThis = (TIBMemPool*)param;
+ pThis->HasStarted.Signal();
+
+ while (pThis->KeepRunning) {
+ TJobItem* work = nullptr;
+ if (!pThis->Requests.Dequeue(&work)) {
+ pThis->HasWork.Reset();
+ if (!pThis->Requests.Dequeue(&work)) {
+ pThis->HasWork.Wait();
+ }
+ }
+ if (work) {
+ //printf("mem copy got work\n");
+ int sz = work->Data->GetSize();
+ work->Block = pThis->Alloc(sz);
+ TBlockChainIterator bc(work->Data->GetChain());
+ bc.Read(work->Block->GetData(), sz);
+ TIntrusivePtr<TCopyResultStorage> dst = work->ResultStorage;
+ work->ResultStorage = nullptr;
+ dst->Results.Enqueue(work);
+ //printf("mem copy completed\n");
+ }
+ }
+ return nullptr;
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ static TMutex IBMemMutex;
+ static TIntrusivePtr<TIBMemPool> IBMemPool;
+ static bool IBWasInitialized;
+
+ TIntrusivePtr<TIBMemPool> GetIBMemPool() {
+ TGuard<TMutex> gg(IBMemMutex);
+ if (IBWasInitialized) {
+ return IBMemPool;
+ }
+ IBWasInitialized = true;
+
+ TIntrusivePtr<TIBPort> ibPort = GetIBDevice();
+ if (ibPort.Get() == nullptr) {
+ return nullptr;
+ }
+ IBMemPool = new TIBMemPool(ibPort->GetCtx());
+ return IBMemPool;
+ }
+}