diff options
author | yuryalekseev <[email protected]> | 2022-07-01 11:52:41 +0300 |
---|---|---|
committer | yuryalekseev <[email protected]> | 2022-07-01 11:52:41 +0300 |
commit | ff2a4854d72ceb0ba86fce4cd64aa382f59332dd (patch) | |
tree | 4295eb45daf41bb24a349b023e9165707acbdb0f /ydb/library/pdisk_io/aio_map.cpp | |
parent | 3c9bba8e28fdf3bca1abbce2a72e27c542c2fac4 (diff) |
KIKIMR-12068: Add TSectorOperationThrottler to TSectorMap.
ref:e6b40b9e7aa176088d0226cfdecdc6d7037e0dd3
Diffstat (limited to 'ydb/library/pdisk_io/aio_map.cpp')
-rw-r--r-- | ydb/library/pdisk_io/aio_map.cpp | 38 |
1 files changed, 32 insertions, 6 deletions
diff --git a/ydb/library/pdisk_io/aio_map.cpp b/ydb/library/pdisk_io/aio_map.cpp index 19062a7264b..72d831ddc0e 100644 --- a/ydb/library/pdisk_io/aio_map.cpp +++ b/ydb/library/pdisk_io/aio_map.cpp @@ -3,12 +3,14 @@ #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_countedqueueoneone.h> #include <util/random/random.h> +#include <util/system/spinlock.h> #include <util/thread/pool.h> namespace NKikimr { namespace NPDisk { struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation { + IAsyncIoContext &AsyncIoContext; TSectorMap &SectorMap; TCountedQueueOneOne<IAsyncIoOperation*, 4 << 10> &CompleteQueue; void *Cookie; @@ -22,10 +24,13 @@ struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation { TInstant Deadline; - TAsyncIoOperationMap(TSectorMap §orMap, + bool PrevAsyncIoOperationIsInProgress = false; + + TAsyncIoOperationMap(IAsyncIoContext &asyncIoContext, TSectorMap §orMap, TCountedQueueOneOne<IAsyncIoOperation*, 4 << 10> &completeQueue, void *cookie, TReqId reqId, NWilson::TTraceId *traceId) - : SectorMap(sectorMap) + : AsyncIoContext(asyncIoContext) + , SectorMap(sectorMap) , CompleteQueue(completeQueue) , Cookie(cookie) , ReqId(reqId) @@ -67,17 +72,18 @@ struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation { switch (Type) { case IAsyncIoOperation::EType::PRead: { - SectorMap.Read((ui8*)Data, Size, Offset); + SectorMap.Read((ui8*)Data, Size, Offset, PrevAsyncIoOperationIsInProgress); break; } case IAsyncIoOperation::EType::PWrite: { - SectorMap.Write((ui8*)Data, Size, Offset); + SectorMap.Write((ui8*)Data, Size, Offset, PrevAsyncIoOperationIsInProgress); break; } default: Y_FAIL_S("Unexpected op type# " << (i64)Type); } + AsyncIoContext.OnAsyncIoOperationCompletion(this); CompleteQueue.Push(this); } @@ -209,6 +215,9 @@ class TAsyncIoContextMap : public IAsyncIoContext { int LastErrno = 0; TPDiskDebugInfo PDiskInfo; + + TSpinLock SpinLock; + IAsyncIoOperation* LastOngoingAsyncIoOperation = nullptr; public: TAsyncIoContextMap(const TString &path, ui32 pDiskId, TIntrusivePtr<TSectorMap> sectorMap) @@ -224,7 +233,7 @@ public: } IAsyncIoOperation* CreateAsyncIoOperation(void* cookie, TReqId reqId, NWilson::TTraceId *traceId) override { - IAsyncIoOperation *operation = new TAsyncIoOperationMap(*SectorMap, CompleteQueue, cookie, reqId, traceId); + IAsyncIoOperation *operation = new TAsyncIoOperationMap(*this, *SectorMap, CompleteQueue, cookie, reqId, traceId); return operation; } @@ -323,7 +332,8 @@ public: if (SectorMap->ImitateRandomWait) { Queue = new TRandomWaitThreadPool(*SectorMap->ImitateRandomWait); } else { - Queue = CreateThreadPool(1, MaxEvents); + Queue = new TThreadPool(); + Queue->Start(1, MaxEvents); } return EIoResult::Ok; } @@ -331,6 +341,15 @@ public: EIoResult Submit(IAsyncIoOperation *op, ICallback *callback) override { op->SetCallback(callback); TAsyncIoOperationMap *operation = static_cast<TAsyncIoOperationMap*>(op); + + { + TGuard<TSpinLock> guard(SpinLock); + if (LastOngoingAsyncIoOperation != nullptr) { + operation->PrevAsyncIoOperationIsInProgress = true; + } + LastOngoingAsyncIoOperation = operation; + } + bool isOk = Queue->Add(operation); return isOk ? EIoResult::Ok : EIoResult::TryAgain; } @@ -349,6 +368,13 @@ public: TFileHandle *GetFileHandle() override { return nullptr; } + + void OnAsyncIoOperationCompletion(IAsyncIoOperation *op) override { + TGuard<TSpinLock> guard(SpinLock); + if (LastOngoingAsyncIoOperation == op) { + LastOngoingAsyncIoOperation = nullptr; + } + } }; std::unique_ptr<IAsyncIoContext> CreateAsyncIoContextMap(const TString &path, ui32 pDiskId, TIntrusivePtr<TSectorMap> sectorMap) { |