summaryrefslogtreecommitdiffstats
path: root/ydb/library/pdisk_io/aio_map.cpp
diff options
context:
space:
mode:
authoryuryalekseev <[email protected]>2022-07-01 11:52:41 +0300
committeryuryalekseev <[email protected]>2022-07-01 11:52:41 +0300
commitff2a4854d72ceb0ba86fce4cd64aa382f59332dd (patch)
tree4295eb45daf41bb24a349b023e9165707acbdb0f /ydb/library/pdisk_io/aio_map.cpp
parent3c9bba8e28fdf3bca1abbce2a72e27c542c2fac4 (diff)
KIKIMR-12068: Add TSectorOperationThrottler to TSectorMap.
ref:e6b40b9e7aa176088d0226cfdecdc6d7037e0dd3
Diffstat (limited to 'ydb/library/pdisk_io/aio_map.cpp')
-rw-r--r--ydb/library/pdisk_io/aio_map.cpp38
1 files changed, 32 insertions, 6 deletions
diff --git a/ydb/library/pdisk_io/aio_map.cpp b/ydb/library/pdisk_io/aio_map.cpp
index 19062a7264b..72d831ddc0e 100644
--- a/ydb/library/pdisk_io/aio_map.cpp
+++ b/ydb/library/pdisk_io/aio_map.cpp
@@ -3,12 +3,14 @@
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_countedqueueoneone.h>
#include <util/random/random.h>
+#include <util/system/spinlock.h>
#include <util/thread/pool.h>
namespace NKikimr {
namespace NPDisk {
struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation {
+ IAsyncIoContext &AsyncIoContext;
TSectorMap &SectorMap;
TCountedQueueOneOne<IAsyncIoOperation*, 4 << 10> &CompleteQueue;
void *Cookie;
@@ -22,10 +24,13 @@ struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation {
TInstant Deadline;
- TAsyncIoOperationMap(TSectorMap &sectorMap,
+ bool PrevAsyncIoOperationIsInProgress = false;
+
+ TAsyncIoOperationMap(IAsyncIoContext &asyncIoContext, TSectorMap &sectorMap,
TCountedQueueOneOne<IAsyncIoOperation*, 4 << 10> &completeQueue,
void *cookie, TReqId reqId, NWilson::TTraceId *traceId)
- : SectorMap(sectorMap)
+ : AsyncIoContext(asyncIoContext)
+ , SectorMap(sectorMap)
, CompleteQueue(completeQueue)
, Cookie(cookie)
, ReqId(reqId)
@@ -67,17 +72,18 @@ struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation {
switch (Type) {
case IAsyncIoOperation::EType::PRead:
{
- SectorMap.Read((ui8*)Data, Size, Offset);
+ SectorMap.Read((ui8*)Data, Size, Offset, PrevAsyncIoOperationIsInProgress);
break;
}
case IAsyncIoOperation::EType::PWrite:
{
- SectorMap.Write((ui8*)Data, Size, Offset);
+ SectorMap.Write((ui8*)Data, Size, Offset, PrevAsyncIoOperationIsInProgress);
break;
}
default:
Y_FAIL_S("Unexpected op type# " << (i64)Type);
}
+ AsyncIoContext.OnAsyncIoOperationCompletion(this);
CompleteQueue.Push(this);
}
@@ -209,6 +215,9 @@ class TAsyncIoContextMap : public IAsyncIoContext {
int LastErrno = 0;
TPDiskDebugInfo PDiskInfo;
+
+ TSpinLock SpinLock;
+ IAsyncIoOperation* LastOngoingAsyncIoOperation = nullptr;
public:
TAsyncIoContextMap(const TString &path, ui32 pDiskId, TIntrusivePtr<TSectorMap> sectorMap)
@@ -224,7 +233,7 @@ public:
}
IAsyncIoOperation* CreateAsyncIoOperation(void* cookie, TReqId reqId, NWilson::TTraceId *traceId) override {
- IAsyncIoOperation *operation = new TAsyncIoOperationMap(*SectorMap, CompleteQueue, cookie, reqId, traceId);
+ IAsyncIoOperation *operation = new TAsyncIoOperationMap(*this, *SectorMap, CompleteQueue, cookie, reqId, traceId);
return operation;
}
@@ -323,7 +332,8 @@ public:
if (SectorMap->ImitateRandomWait) {
Queue = new TRandomWaitThreadPool(*SectorMap->ImitateRandomWait);
} else {
- Queue = CreateThreadPool(1, MaxEvents);
+ Queue = new TThreadPool();
+ Queue->Start(1, MaxEvents);
}
return EIoResult::Ok;
}
@@ -331,6 +341,15 @@ public:
EIoResult Submit(IAsyncIoOperation *op, ICallback *callback) override {
op->SetCallback(callback);
TAsyncIoOperationMap *operation = static_cast<TAsyncIoOperationMap*>(op);
+
+ {
+ TGuard<TSpinLock> guard(SpinLock);
+ if (LastOngoingAsyncIoOperation != nullptr) {
+ operation->PrevAsyncIoOperationIsInProgress = true;
+ }
+ LastOngoingAsyncIoOperation = operation;
+ }
+
bool isOk = Queue->Add(operation);
return isOk ? EIoResult::Ok : EIoResult::TryAgain;
}
@@ -349,6 +368,13 @@ public:
TFileHandle *GetFileHandle() override {
return nullptr;
}
+
+ void OnAsyncIoOperationCompletion(IAsyncIoOperation *op) override {
+ TGuard<TSpinLock> guard(SpinLock);
+ if (LastOngoingAsyncIoOperation == op) {
+ LastOngoingAsyncIoOperation = nullptr;
+ }
+ }
};
std::unique_ptr<IAsyncIoContext> CreateAsyncIoContextMap(const TString &path, ui32 pDiskId, TIntrusivePtr<TSectorMap> sectorMap) {