aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryuryalekseev <yuryalekseev@yandex-team.ru>2022-07-01 11:52:41 +0300
committeryuryalekseev <yuryalekseev@yandex-team.ru>2022-07-01 11:52:41 +0300
commitff2a4854d72ceb0ba86fce4cd64aa382f59332dd (patch)
tree4295eb45daf41bb24a349b023e9165707acbdb0f
parent3c9bba8e28fdf3bca1abbce2a72e27c542c2fac4 (diff)
downloadydb-ff2a4854d72ceb0ba86fce4cd64aa382f59332dd.tar.gz
KIKIMR-12068: Add TSectorOperationThrottler to TSectorMap.
ref:e6b40b9e7aa176088d0226cfdecdc6d7037e0dd3
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp8
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp97
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h4
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp36
-rw-r--r--ydb/library/pdisk_io/aio.h1
-rw-r--r--ydb/library/pdisk_io/aio_linux.cpp3
-rw-r--r--ydb/library/pdisk_io/aio_map.cpp38
-rw-r--r--ydb/library/pdisk_io/sector_map.h202
8 files changed, 350 insertions, 39 deletions
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp
index 10e6069c9b..edabea0c15 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp
@@ -51,9 +51,15 @@ namespace NKikimr::NStorage {
if (splitted.size() >= 3) {
size = Max(size, FromStringWithDefault<ui64>(splitted[2], size) << 30);
}
+
+ auto diskMode = NPDisk::NSectorMap::DM_NONE;
+ if (splitted.size() >= 4) {
+ diskMode = NPDisk::NSectorMap::DiskModeFromString(splitted[3]);
+ }
+
auto& maps = Cfg->SectorMaps;
if (auto it = maps.find(path); it == maps.end()) {
- maps[path] = new NPDisk::TSectorMap(size);
+ maps[path] = new NPDisk::TSectorMap(size, diskMode);
maps[path]->ZeroInit(1000); // Format PDisk
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
index 7257a47a11..9f408f6e4c 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
@@ -111,6 +111,25 @@ public:
}
};
+class TCompletionCounter : public NPDisk::TCompletionAction {
+public:
+ TCompletionCounter(TAtomic& counter)
+ : Counter(counter)
+ {}
+
+ void Exec(TActorSystem *) override {
+ AtomicIncrement(Counter);
+ delete this;
+ }
+
+ void Release(TActorSystem *) override {
+ delete this;
+ }
+
+private:
+ TAtomic& Counter;
+};
+
static TString MakeDatabasePath(const char *dir) {
TString databaseDirectory = Sprintf("%s/yard", dir);
return databaseDirectory;
@@ -212,6 +231,32 @@ void RunTestDestructionWithMultipleFlushesFromCompletionAction() {
Ctest << "Done" << Endl;
}
+void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 diskSize, ui32 bufferSize, bool sequential = true) {
+ const TIntrusivePtr<NMonitoring::TDynamicCounters> counters = new NMonitoring::TDynamicCounters;
+ THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
+
+ TActorSystemCreator creator;
+ TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, diskMode);
+ THolder<NPDisk::IBlockDevice> device(NPDisk::CreateRealBlockDeviceWithDefaults(
+ /* path can be empty when sector map is used */ "", *mon, NPDisk::TDeviceMode::None, sectorMap, creator.GetActorSystem()));
+
+ NPDisk::TAlignedData data(bufferSize);
+ memset(data.Get(), 0, data.Size());
+
+ TAtomic completedWrites = 0;
+
+ ui32 offsetIncrement = sequential ? bufferSize : 2 * bufferSize;
+ TAtomic expectedWrites = diskSize / (offsetIncrement);
+
+ for (ui64 offset = 0; offset < diskSize; offset += offsetIncrement) {
+ device->PwriteAsync(data.Get(), data.Size(), offset, new TCompletionCounter(completedWrites), NPDisk::TReqId(NPDisk::TReqId::Test1, 0), {});
+ }
+
+ WaitForValue(&completedWrites, TIMEOUT, expectedWrites);
+
+ device.Destroy();
+}
+
Y_UNIT_TEST_SUITE(TBlockDeviceTest) {
Y_UNIT_TEST(TestMultipleRequestsFromCompletionAction) {
@@ -243,6 +288,58 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) {
Ctest << "Done" << Endl;
}
+ Y_UNIT_TEST(TestWriteWithNoneSectorMap2GbDisk8MbBuffer) {
+ ui32 diskSize = 2 * 1024 * 1024 * 1024u;
+ ui32 bufferSize = 8 * 1024 * 1024u;
+ RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_NONE, diskSize, bufferSize);
+ Ctest << "Done" << Endl;
+ }
+
+ Y_UNIT_TEST(TestWriteWithNoneSectorMap2GbDisk8MbBufferNonSequential) {
+ ui32 diskSize = 2 * 1024 * 1024 * 1024u;
+ ui32 bufferSize = 8 * 1024 * 1024u;
+ bool sequential = false;
+ RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_NONE, diskSize, bufferSize, sequential);
+ Ctest << "Done" << Endl;
+ }
+
+ Y_UNIT_TEST(TestWriteWithNoneSectorMap2GbDisk32KbBuffer) {
+ ui32 diskSize = 2 * 1024 * 1024 * 1024u;
+ ui32 bufferSize = 32 * 1024u;
+ RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_NONE, diskSize, bufferSize);
+ Ctest << "Done" << Endl;
+ }
+
+ Y_UNIT_TEST(TestWriteWithNoneSectorMap2GbDisk32KbBufferNonSequential) {
+ ui32 diskSize = 2 * 1024 * 1024 * 1024u;
+ ui32 bufferSize = 32 * 1024u;
+ bool sequential = false;
+ RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_NONE, diskSize, bufferSize, sequential);
+ Ctest << "Done" << Endl;
+ }
+
+ Y_UNIT_TEST(TestWriteWithHddSectorMap2GbDisk8MbBuffer) {
+ ui32 diskSize = 2 * 1024 * 1024 * 1024u;
+ ui32 bufferSize = 8 * 1024 * 1024u;
+ RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_HDD, diskSize, bufferSize);
+ Ctest << "Done" << Endl;
+ }
+
+ Y_UNIT_TEST(TestWriteWithHddSectorMap2GbDisk8MbBufferNonSequential) {
+ ui32 diskSize = 2 * 1024 * 1024 * 1024u;
+ ui32 bufferSize = 8 * 1024 * 1024u;
+ bool sequential = false;
+ RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_HDD, diskSize, bufferSize, sequential);
+ Ctest << "Done" << Endl;
+ }
+
+ Y_UNIT_TEST(TestWriteWithHddSectorMap2GbDisk32KbBuffer) {
+ ui32 diskSize = 2 * 1024 * 1024 * 1024u;
+ ui32 bufferSize = 32 * 1024u;
+ RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_HDD, diskSize, bufferSize);
+ Ctest << "Done" << Endl;
+ }
+
/*
Y_UNIT_TEST(TestRabbitCompletionAction) {
const TIntrusivePtr<NMonitoring::TDynamicCounters> counters = new NMonitoring::TDynamicCounters;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h
index 24000dcbd0..ee61f6df88 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h
@@ -15,13 +15,13 @@ public:
TIntrusivePtr<NPDisk::TSectorMap> SectorMap;
THolder<TTempDir> TempDir;
- TTestContext(bool makeTempDir, bool useSectorMap) {
+ TTestContext(bool makeTempDir, bool useSectorMap, NPDisk::NSectorMap::EDiskMode diskMode = NPDisk::NSectorMap::DM_NONE) {
if (makeTempDir) {
TempDir.Reset(new TTempDir);
Dir = TempDir->Name().c_str();
}
if (useSectorMap) {
- SectorMap = new NPDisk::TSectorMap;
+ SectorMap = new NPDisk::TSectorMap(0, diskMode);
}
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp
index cab783a62b..1bc21e3f4e 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp
@@ -102,6 +102,12 @@ YARD_UNIT_TEST(TestLogWriteReadMedium) {
Run<TTestLogWriteRead<6000>>(&tc, 1, MIN_CHUNK_SIZE);
}
+YARD_UNIT_TEST(TestLogWriteReadMediumWithHddSectorMap) {
+ TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD);
+ FillDeviceWithZeroes(&tc, MIN_CHUNK_SIZE);
+ Run<TTestLogWriteRead<6000>>(&tc, 1, MIN_CHUNK_SIZE);
+}
+
YARD_UNIT_TEST(TestLogWriteReadLarge) {
TTestContext tc(false, true);
Run<TTestLogWriteRead<9000>>(&tc, 1, MIN_CHUNK_SIZE);
@@ -203,6 +209,11 @@ YARD_UNIT_TEST(TestChunkWriteRead) {
Run<TTestChunkWriteRead<30000, 2 << 20>>(&tc, 1, 5 << 20);
}
+YARD_UNIT_TEST(TestChunkWriteReadWithHddSectorMap) {
+ TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD);
+ Run<TTestChunkWriteRead<30000, 2 << 20>>(&tc, 1, 5 << 20);
+}
+
YARD_UNIT_TEST(TestChunkWriteReadMultiple) {
{
TTestContext tc(false, true);
@@ -222,12 +233,37 @@ YARD_UNIT_TEST(TestChunkWriteReadMultiple) {
}
}
+YARD_UNIT_TEST(TestChunkWriteReadMultipleWithHddSectorMap) {
+ {
+ TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD);
+ Run<TTestChunkWriteRead<6000000, 6500000>>(&tc, 1, 16 << 20, false);
+ }
+ {
+ TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD);
+ Run<TTestChunkWriteRead<3000000, 3500000>>(&tc, 1, 8 << 20, false);
+ }
+ {
+ TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD);
+ Run<TTestChunkWriteRead<2 << 20, 2 << 20>>(&tc, 1, 8 << 20, false);
+ }
+ {
+ TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD);
+ Run<TTestChunkWriteRead<1000000, 1500000>>(&tc, 1, 4 << 20, false);
+ }
+}
+
YARD_UNIT_TEST(TestChunkWriteReadWhole) {
TTestContext tc(false, true);
FillDeviceWithZeroes(&tc, MIN_CHUNK_SIZE);
Run<TTestChunkWriteReadWhole>(&tc, 1, MIN_CHUNK_SIZE);
}
+YARD_UNIT_TEST(TestChunkWriteReadWholeWithHddSectorMap) {
+ TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD);
+ FillDeviceWithZeroes(&tc, MIN_CHUNK_SIZE);
+ Run<TTestChunkWriteReadWhole>(&tc, 1, MIN_CHUNK_SIZE);
+}
+
YARD_UNIT_TEST(TestChunkWrite20Read02) {
TTestContext tc(false, true);
// 2 << 20 is the read/write burst size, that's why.
diff --git a/ydb/library/pdisk_io/aio.h b/ydb/library/pdisk_io/aio.h
index 3dfd861459..f42064ca73 100644
--- a/ydb/library/pdisk_io/aio.h
+++ b/ydb/library/pdisk_io/aio.h
@@ -130,6 +130,7 @@ public:
virtual int GetLastErrno() = 0;
virtual TString GetPDiskInfo() = 0;
virtual TFileHandle *GetFileHandle() = 0;
+ virtual void OnAsyncIoOperationCompletion(IAsyncIoOperation *op) = 0;
};
std::unique_ptr<IAsyncIoContext> CreateAsyncIoContextReal(const TString &path, ui32 pDiskId, TDeviceMode::TFlags flags);
diff --git a/ydb/library/pdisk_io/aio_linux.cpp b/ydb/library/pdisk_io/aio_linux.cpp
index 6f097bb8f4..418321a7c8 100644
--- a/ydb/library/pdisk_io/aio_linux.cpp
+++ b/ydb/library/pdisk_io/aio_linux.cpp
@@ -342,6 +342,9 @@ public:
TFileHandle *GetFileHandle() override {
return File.Get();
}
+
+ void OnAsyncIoOperationCompletion(IAsyncIoOperation *) override {
+ }
};
diff --git a/ydb/library/pdisk_io/aio_map.cpp b/ydb/library/pdisk_io/aio_map.cpp
index 19062a7264..72d831ddc0 100644
--- a/ydb/library/pdisk_io/aio_map.cpp
+++ b/ydb/library/pdisk_io/aio_map.cpp
@@ -3,12 +3,14 @@
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_countedqueueoneone.h>
#include <util/random/random.h>
+#include <util/system/spinlock.h>
#include <util/thread/pool.h>
namespace NKikimr {
namespace NPDisk {
struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation {
+ IAsyncIoContext &AsyncIoContext;
TSectorMap &SectorMap;
TCountedQueueOneOne<IAsyncIoOperation*, 4 << 10> &CompleteQueue;
void *Cookie;
@@ -22,10 +24,13 @@ struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation {
TInstant Deadline;
- TAsyncIoOperationMap(TSectorMap &sectorMap,
+ bool PrevAsyncIoOperationIsInProgress = false;
+
+ TAsyncIoOperationMap(IAsyncIoContext &asyncIoContext, TSectorMap &sectorMap,
TCountedQueueOneOne<IAsyncIoOperation*, 4 << 10> &completeQueue,
void *cookie, TReqId reqId, NWilson::TTraceId *traceId)
- : SectorMap(sectorMap)
+ : AsyncIoContext(asyncIoContext)
+ , SectorMap(sectorMap)
, CompleteQueue(completeQueue)
, Cookie(cookie)
, ReqId(reqId)
@@ -67,17 +72,18 @@ struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation {
switch (Type) {
case IAsyncIoOperation::EType::PRead:
{
- SectorMap.Read((ui8*)Data, Size, Offset);
+ SectorMap.Read((ui8*)Data, Size, Offset, PrevAsyncIoOperationIsInProgress);
break;
}
case IAsyncIoOperation::EType::PWrite:
{
- SectorMap.Write((ui8*)Data, Size, Offset);
+ SectorMap.Write((ui8*)Data, Size, Offset, PrevAsyncIoOperationIsInProgress);
break;
}
default:
Y_FAIL_S("Unexpected op type# " << (i64)Type);
}
+ AsyncIoContext.OnAsyncIoOperationCompletion(this);
CompleteQueue.Push(this);
}
@@ -209,6 +215,9 @@ class TAsyncIoContextMap : public IAsyncIoContext {
int LastErrno = 0;
TPDiskDebugInfo PDiskInfo;
+
+ TSpinLock SpinLock;
+ IAsyncIoOperation* LastOngoingAsyncIoOperation = nullptr;
public:
TAsyncIoContextMap(const TString &path, ui32 pDiskId, TIntrusivePtr<TSectorMap> sectorMap)
@@ -224,7 +233,7 @@ public:
}
IAsyncIoOperation* CreateAsyncIoOperation(void* cookie, TReqId reqId, NWilson::TTraceId *traceId) override {
- IAsyncIoOperation *operation = new TAsyncIoOperationMap(*SectorMap, CompleteQueue, cookie, reqId, traceId);
+ IAsyncIoOperation *operation = new TAsyncIoOperationMap(*this, *SectorMap, CompleteQueue, cookie, reqId, traceId);
return operation;
}
@@ -323,7 +332,8 @@ public:
if (SectorMap->ImitateRandomWait) {
Queue = new TRandomWaitThreadPool(*SectorMap->ImitateRandomWait);
} else {
- Queue = CreateThreadPool(1, MaxEvents);
+ Queue = new TThreadPool();
+ Queue->Start(1, MaxEvents);
}
return EIoResult::Ok;
}
@@ -331,6 +341,15 @@ public:
EIoResult Submit(IAsyncIoOperation *op, ICallback *callback) override {
op->SetCallback(callback);
TAsyncIoOperationMap *operation = static_cast<TAsyncIoOperationMap*>(op);
+
+ {
+ TGuard<TSpinLock> guard(SpinLock);
+ if (LastOngoingAsyncIoOperation != nullptr) {
+ operation->PrevAsyncIoOperationIsInProgress = true;
+ }
+ LastOngoingAsyncIoOperation = operation;
+ }
+
bool isOk = Queue->Add(operation);
return isOk ? EIoResult::Ok : EIoResult::TryAgain;
}
@@ -349,6 +368,13 @@ public:
TFileHandle *GetFileHandle() override {
return nullptr;
}
+
+ void OnAsyncIoOperationCompletion(IAsyncIoOperation *op) override {
+ TGuard<TSpinLock> guard(SpinLock);
+ if (LastOngoingAsyncIoOperation == op) {
+ LastOngoingAsyncIoOperation = nullptr;
+ }
+ }
};
std::unique_ptr<IAsyncIoContext> CreateAsyncIoContextMap(const TString &path, ui32 pDiskId, TIntrusivePtr<TSectorMap> sectorMap) {
diff --git a/ydb/library/pdisk_io/sector_map.h b/ydb/library/pdisk_io/sector_map.h
index f86e8fb6d0..d4d2919bd8 100644
--- a/ydb/library/pdisk_io/sector_map.h
+++ b/ydb/library/pdisk_io/sector_map.h
@@ -3,28 +3,143 @@
#include <ydb/core/util/yverify_stream.h>
#include <library/cpp/actors/util/ticket_lock.h>
+#include <util/datetime/base.h>
+#include <util/datetime/cputimer.h>
#include <util/generic/guid.h>
#include <util/generic/hash.h>
#include <util/generic/string.h>
#include <util/stream/file.h>
#include <util/stream/format.h>
#include <util/system/mutex.h>
+#include <util/system/types.h>
#include <contrib/libs/lz4/lz4.h>
+#include <array>
#include <atomic>
#include <optional>
namespace NKikimr {
namespace NPDisk {
+namespace NSectorMap {
-constexpr ui64 SectorMapSectorSize = 4096;
+enum EDiskMode {
+ DM_NONE = 0,
+ DM_HDD = 1,
+ DM_COUNT = 2
+};
-class TSectorMap : public TThrRefBase {
- THashMap<ui64, TString> Map;
+inline EDiskMode DiskModeFromString(const TString& diskMode) {
+ if (diskMode == "HDD") {
+ return DM_HDD;
+ } else if (diskMode == "NONE") {
+ return DM_NONE;
+ }
+
+ return DM_NONE;
+}
+
+inline TString DiskModeToString(EDiskMode diskMode) {
+ switch (diskMode) {
+ case DM_NONE:
+ return "NONE";
+ case DM_HDD:
+ return "HDD";
+ default:
+ return "UNKNOWN";
+ }
+}
+
+constexpr ui64 SECTOR_SIZE = 4096;
+
+/* TSectorOperationThrottler: thread-safe */
+
+class TSectorOperationThrottler {
+private:
+ struct TDiskModeParams {
+ TDuration SeekSleep;
+ double FirstSectorRate;
+ double LastSectorRate;
+ };
+
+ static constexpr std::array<TDiskModeParams, NSectorMap::DM_COUNT> DiskModeParams = {
+ {
+ {TDuration::MilliSeconds(0), 0, 0}, // DM_NONE
+ {TDuration::MilliSeconds(9), 200 * 1024 * 1024, 66 * 1024 * 1024} // DM_HDD
+ }
+ };
public:
+ TSectorOperationThrottler(ui64 sectors, NSectorMap::EDiskMode diskMode) {
+ Init(sectors, diskMode);
+ }
+
+ void Init(ui64 sectors, NSectorMap::EDiskMode diskMode) {
+ Y_VERIFY(sectors > 0);
+
+ DiskMode = diskMode;
+ MaxSector = sectors - 1;
+ MostRecentlyUsedSector = 0;
+ }
+
+ void ThrottleRead(i64 size, ui64 offset, bool prevOperationIsInProgress) {
+ ThrottleOperation(size, offset, prevOperationIsInProgress);
+ }
+
+ void ThrottleWrite(i64 size, ui64 offset, bool prevOperationIsInProgress) {
+ ThrottleOperation(size, offset, prevOperationIsInProgress);
+ }
+
+private:
+ /* throttle read/write operation */
+ void ThrottleOperation(i64 size, ui64 offset, bool prevOperationIsInProgress) {
+ if (size == 0) {
+ return;
+ }
+
+ ui64 beginSector = offset / NSectorMap::SECTOR_SIZE;
+ ui64 endSector = (offset + size + NSectorMap::SECTOR_SIZE - 1) / NSectorMap::SECTOR_SIZE;
+ ui64 midSector = (beginSector + endSector) / 2;
+
+ Y_VERIFY(DiskMode < DiskModeParams.size());
+ const auto& diskModeParams = DiskModeParams[DiskMode];
+
+ {
+ TGuard<TMutex> guard(Mutex);
+ if (beginSector != MostRecentlyUsedSector + 1 || !prevOperationIsInProgress) {
+ Sleep(diskModeParams.SeekSleep);
+ }
+
+ MostRecentlyUsedSector = endSector - 1;
+ }
+
+ auto rate = CalcRate(diskModeParams.FirstSectorRate, diskModeParams.LastSectorRate, midSector, MaxSector);
+
+ auto rateByMilliSeconds = rate / 1000;
+ auto milliSecondsToWait = (double)size / rateByMilliSeconds;
+ Sleep(TDuration::MilliSeconds(milliSecondsToWait));
+ }
+
+ static double CalcRate(double firstSectorRate, double lastSectorRate, double sector, double lastSector) {
+ Y_VERIFY(sector <= lastSector, "%lf %lf", sector, lastSector);
+ Y_VERIFY(lastSectorRate <= firstSectorRate, "%lf %lf", firstSectorRate, lastSectorRate);
+ return firstSectorRate - (sector / lastSector) * (firstSectorRate - lastSectorRate);
+ }
+
+private:
+ TMutex Mutex;
+ ui64 MaxSector = 0;
+ ui64 MostRecentlyUsedSector = 0;
+ NSectorMap::EDiskMode DiskMode = NSectorMap::DM_NONE;
+};
+
+} // NSectorMap
+
+/* TSectorMap */
+
+class TSectorMap : public TThrRefBase {
+public:
TString Serial = CreateGuidAsString();
ui64 DeviceSize;
@@ -36,13 +151,16 @@ public:
std::atomic<ui64> AllocatedBytes;
- TSectorMap(ui64 deviceSize = 0)
+ TSectorMap(ui64 deviceSize = 0, NSectorMap::EDiskMode diskMode = NSectorMap::DM_NONE)
: DeviceSize(deviceSize)
, IsLocked(false)
, ImitateIoErrorProbability(0.0)
, ImitateReadIoErrorProbability(0.0)
, AllocatedBytes(0)
- {}
+ , DiskMode(diskMode)
+ {
+ InitSectorOperationThrottler();
+ }
bool Lock() {
return !IsLocked.exchange(true);
@@ -59,42 +177,60 @@ public:
Y_VERIFY_S(offset + 4096 <= DeviceSize, "It is not possible to shrink TSectorMap with data");
}
}
+
+ InitSectorOperationThrottler();
+ }
+
+ void InitSectorOperationThrottler() {
+ if (DeviceSize > 0 && DiskMode != NSectorMap::DM_NONE) {
+ SectorOperationThrottler = MakeHolder<NSectorMap::TSectorOperationThrottler>((DeviceSize + NSectorMap::SECTOR_SIZE - 1) / NSectorMap::SECTOR_SIZE, DiskMode);
+ } else {
+ SectorOperationThrottler.Reset();
+ }
}
void ZeroInit(ui64 sectors) {
- ui64 bytes = sectors * SectorMapSectorSize;
+ ui64 bytes = sectors * NSectorMap::SECTOR_SIZE;
TString str = TString::Uninitialized(bytes);
memset(str.Detach(), 0, bytes);
Write((ui8*)str.Detach(), bytes, 0);
}
- void Read(ui8 *data, i64 size, ui64 offset) {
- Y_VERIFY(size % SectorMapSectorSize == 0);
- Y_VERIFY(offset % SectorMapSectorSize == 0);
+ void Read(ui8 *data, i64 size, ui64 offset, bool prevOperationIsInProgress = false) {
+ Y_VERIFY(size % NSectorMap::SECTOR_SIZE == 0);
+ Y_VERIFY(offset % NSectorMap::SECTOR_SIZE == 0);
+
+ if (SectorOperationThrottler.Get() != nullptr) {
+ SectorOperationThrottler->ThrottleRead(size, offset, prevOperationIsInProgress);
+ }
TGuard<TTicketLock> guard(MapLock);
- for (; size > 0; size -= SectorMapSectorSize) {
+ for (; size > 0; size -= NSectorMap::SECTOR_SIZE) {
if (auto it = Map.find(offset); it == Map.end()) {
- memset(data, 0x33, SectorMapSectorSize);
+ memset(data, 0x33, NSectorMap::SECTOR_SIZE);
} else {
- char tmp[4 * SectorMapSectorSize];
- int processed = LZ4_decompress_safe(it->second.data(), tmp, it->second.size(), 4 * SectorMapSectorSize);
- Y_VERIFY_S(processed == SectorMapSectorSize, "processed# " << processed);
- memcpy(data, tmp, SectorMapSectorSize);
+ char tmp[4 * NSectorMap::SECTOR_SIZE];
+ int processed = LZ4_decompress_safe(it->second.data(), tmp, it->second.size(), 4 * NSectorMap::SECTOR_SIZE);
+ Y_VERIFY_S(processed == NSectorMap::SECTOR_SIZE, "processed# " << processed);
+ memcpy(data, tmp, NSectorMap::SECTOR_SIZE);
}
- offset += SectorMapSectorSize;
- data += SectorMapSectorSize;
+ offset += NSectorMap::SECTOR_SIZE;
+ data += NSectorMap::SECTOR_SIZE;
}
}
- void Write(const ui8 *data, i64 size, ui64 offset) {
- Y_VERIFY(size % SectorMapSectorSize == 0);
- Y_VERIFY(offset % SectorMapSectorSize == 0);
+ void Write(const ui8 *data, i64 size, ui64 offset, bool prevOperationIsInProgress = false) {
+ Y_VERIFY(size % NSectorMap::SECTOR_SIZE == 0);
+ Y_VERIFY(offset % NSectorMap::SECTOR_SIZE == 0);
+
+ if (SectorOperationThrottler.Get() != nullptr) {
+ SectorOperationThrottler->ThrottleWrite(size, offset, prevOperationIsInProgress);
+ }
TGuard<TTicketLock> guard(MapLock);
- for (; size > 0; size -= SectorMapSectorSize) {
- char tmp[4 * SectorMapSectorSize];
- int written = LZ4_compress_default((const char*)data, tmp, SectorMapSectorSize, 4 * SectorMapSectorSize);
+ for (; size > 0; size -= NSectorMap::SECTOR_SIZE) {
+ char tmp[4 * NSectorMap::SECTOR_SIZE];
+ int written = LZ4_compress_default((const char*)data, tmp, NSectorMap::SECTOR_SIZE, 4 * NSectorMap::SECTOR_SIZE);
Y_VERIFY_S(written > 0, "written# " << written);
TString str = TString::Uninitialized(written);
memcpy(str.Detach(), tmp, written);
@@ -105,26 +241,26 @@ public:
Map[offset] = str;
}
AllocatedBytes.fetch_add(Map[offset].size());
- offset += SectorMapSectorSize;
- data += SectorMapSectorSize;
+ offset += NSectorMap::SECTOR_SIZE;
+ data += NSectorMap::SECTOR_SIZE;
}
}
void Trim(i64 size, ui64 offset) {
TGuard<TTicketLock> guard(MapLock);
- Y_VERIFY(size % SectorMapSectorSize == 0);
- Y_VERIFY(offset % SectorMapSectorSize == 0);
- for (; size > 0; size -= SectorMapSectorSize) {
+ Y_VERIFY(size % NSectorMap::SECTOR_SIZE == 0);
+ Y_VERIFY(offset % NSectorMap::SECTOR_SIZE == 0);
+ for (; size > 0; size -= NSectorMap::SECTOR_SIZE) {
if (auto it = Map.find(offset); it != Map.end()) {
AllocatedBytes.fetch_sub(it->second.size());
Map.erase(it);
}
- offset += SectorMapSectorSize;
+ offset += NSectorMap::SECTOR_SIZE;
}
}
ui64 DataBytes() const {
- return Map.size() * 4096;
+ return Map.size() * NSectorMap::SECTOR_SIZE;
}
TString ToString() const {
@@ -140,12 +276,18 @@ public:
str << "ImitateIoErrorProbability# " << ImitateIoErrorProbability.load() << "\n";
str << "AllocatedBytes (approx.)# " << HumanReadableSize(AllocatedBytes.load(), SF_QUANTITY) << "\n";
str << "DataBytes# " << HumanReadableSize(DataBytes(), SF_QUANTITY) << "\n";
+ str << "DiskMode# " << DiskModeToString(DiskMode) << "\n";
return str.Str();
}
// Requires proto information, so should be defined in cpp
void LoadFromFile(const TString& path);
void StoreToFile(const TString& path);
+
+private:
+ THashMap<ui64, TString> Map;
+ NSectorMap::EDiskMode DiskMode = NSectorMap::DM_NONE;
+ THolder<NSectorMap::TSectorOperationThrottler> SectorOperationThrottler;
};
} // NPDisk