diff options
author | yuryalekseev <yuryalekseev@yandex-team.ru> | 2022-07-01 11:52:41 +0300 |
---|---|---|
committer | yuryalekseev <yuryalekseev@yandex-team.ru> | 2022-07-01 11:52:41 +0300 |
commit | ff2a4854d72ceb0ba86fce4cd64aa382f59332dd (patch) | |
tree | 4295eb45daf41bb24a349b023e9165707acbdb0f | |
parent | 3c9bba8e28fdf3bca1abbce2a72e27c542c2fac4 (diff) | |
download | ydb-ff2a4854d72ceb0ba86fce4cd64aa382f59332dd.tar.gz |
KIKIMR-12068: Add TSectorOperationThrottler to TSectorMap.
ref:e6b40b9e7aa176088d0226cfdecdc6d7037e0dd3
-rw-r--r-- | ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp | 8 | ||||
-rw-r--r-- | ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp | 97 | ||||
-rw-r--r-- | ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp | 36 | ||||
-rw-r--r-- | ydb/library/pdisk_io/aio.h | 1 | ||||
-rw-r--r-- | ydb/library/pdisk_io/aio_linux.cpp | 3 | ||||
-rw-r--r-- | ydb/library/pdisk_io/aio_map.cpp | 38 | ||||
-rw-r--r-- | ydb/library/pdisk_io/sector_map.h | 202 |
8 files changed, 350 insertions, 39 deletions
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp index 10e6069c9b..edabea0c15 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp @@ -51,9 +51,15 @@ namespace NKikimr::NStorage { if (splitted.size() >= 3) { size = Max(size, FromStringWithDefault<ui64>(splitted[2], size) << 30); } + + auto diskMode = NPDisk::NSectorMap::DM_NONE; + if (splitted.size() >= 4) { + diskMode = NPDisk::NSectorMap::DiskModeFromString(splitted[3]); + } + auto& maps = Cfg->SectorMaps; if (auto it = maps.find(path); it == maps.end()) { - maps[path] = new NPDisk::TSectorMap(size); + maps[path] = new NPDisk::TSectorMap(size, diskMode); maps[path]->ZeroInit(1000); // Format PDisk } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp index 7257a47a11..9f408f6e4c 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp @@ -111,6 +111,25 @@ public: } }; +class TCompletionCounter : public NPDisk::TCompletionAction { +public: + TCompletionCounter(TAtomic& counter) + : Counter(counter) + {} + + void Exec(TActorSystem *) override { + AtomicIncrement(Counter); + delete this; + } + + void Release(TActorSystem *) override { + delete this; + } + +private: + TAtomic& Counter; +}; + static TString MakeDatabasePath(const char *dir) { TString databaseDirectory = Sprintf("%s/yard", dir); return databaseDirectory; @@ -212,6 +231,32 @@ void RunTestDestructionWithMultipleFlushesFromCompletionAction() { Ctest << "Done" << Endl; } +void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 diskSize, ui32 bufferSize, bool sequential = true) { + const TIntrusivePtr<NMonitoring::TDynamicCounters> counters = new NMonitoring::TDynamicCounters; + THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr)); + + TActorSystemCreator creator; + TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, diskMode); + THolder<NPDisk::IBlockDevice> device(NPDisk::CreateRealBlockDeviceWithDefaults( + /* path can be empty when sector map is used */ "", *mon, NPDisk::TDeviceMode::None, sectorMap, creator.GetActorSystem())); + + NPDisk::TAlignedData data(bufferSize); + memset(data.Get(), 0, data.Size()); + + TAtomic completedWrites = 0; + + ui32 offsetIncrement = sequential ? bufferSize : 2 * bufferSize; + TAtomic expectedWrites = diskSize / (offsetIncrement); + + for (ui64 offset = 0; offset < diskSize; offset += offsetIncrement) { + device->PwriteAsync(data.Get(), data.Size(), offset, new TCompletionCounter(completedWrites), NPDisk::TReqId(NPDisk::TReqId::Test1, 0), {}); + } + + WaitForValue(&completedWrites, TIMEOUT, expectedWrites); + + device.Destroy(); +} + Y_UNIT_TEST_SUITE(TBlockDeviceTest) { Y_UNIT_TEST(TestMultipleRequestsFromCompletionAction) { @@ -243,6 +288,58 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) { Ctest << "Done" << Endl; } + Y_UNIT_TEST(TestWriteWithNoneSectorMap2GbDisk8MbBuffer) { + ui32 diskSize = 2 * 1024 * 1024 * 1024u; + ui32 bufferSize = 8 * 1024 * 1024u; + RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_NONE, diskSize, bufferSize); + Ctest << "Done" << Endl; + } + + Y_UNIT_TEST(TestWriteWithNoneSectorMap2GbDisk8MbBufferNonSequential) { + ui32 diskSize = 2 * 1024 * 1024 * 1024u; + ui32 bufferSize = 8 * 1024 * 1024u; + bool sequential = false; + RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_NONE, diskSize, bufferSize, sequential); + Ctest << "Done" << Endl; + } + + Y_UNIT_TEST(TestWriteWithNoneSectorMap2GbDisk32KbBuffer) { + ui32 diskSize = 2 * 1024 * 1024 * 1024u; + ui32 bufferSize = 32 * 1024u; + RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_NONE, diskSize, bufferSize); + Ctest << "Done" << Endl; + } + + Y_UNIT_TEST(TestWriteWithNoneSectorMap2GbDisk32KbBufferNonSequential) { + ui32 diskSize = 2 * 1024 * 1024 * 1024u; + ui32 bufferSize = 32 * 1024u; + bool sequential = false; + RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_NONE, diskSize, bufferSize, sequential); + Ctest << "Done" << Endl; + } + + Y_UNIT_TEST(TestWriteWithHddSectorMap2GbDisk8MbBuffer) { + ui32 diskSize = 2 * 1024 * 1024 * 1024u; + ui32 bufferSize = 8 * 1024 * 1024u; + RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_HDD, diskSize, bufferSize); + Ctest << "Done" << Endl; + } + + Y_UNIT_TEST(TestWriteWithHddSectorMap2GbDisk8MbBufferNonSequential) { + ui32 diskSize = 2 * 1024 * 1024 * 1024u; + ui32 bufferSize = 8 * 1024 * 1024u; + bool sequential = false; + RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_HDD, diskSize, bufferSize, sequential); + Ctest << "Done" << Endl; + } + + Y_UNIT_TEST(TestWriteWithHddSectorMap2GbDisk32KbBuffer) { + ui32 diskSize = 2 * 1024 * 1024 * 1024u; + ui32 bufferSize = 32 * 1024u; + RunWriteTestWithSectorMap(NPDisk::NSectorMap::DM_HDD, diskSize, bufferSize); + Ctest << "Done" << Endl; + } + /* Y_UNIT_TEST(TestRabbitCompletionAction) { const TIntrusivePtr<NMonitoring::TDynamicCounters> counters = new NMonitoring::TDynamicCounters; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h index 24000dcbd0..ee61f6df88 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_context.h @@ -15,13 +15,13 @@ public: TIntrusivePtr<NPDisk::TSectorMap> SectorMap; THolder<TTempDir> TempDir; - TTestContext(bool makeTempDir, bool useSectorMap) { + TTestContext(bool makeTempDir, bool useSectorMap, NPDisk::NSectorMap::EDiskMode diskMode = NPDisk::NSectorMap::DM_NONE) { if (makeTempDir) { TempDir.Reset(new TTempDir); Dir = TempDir->Name().c_str(); } if (useSectorMap) { - SectorMap = new NPDisk::TSectorMap; + SectorMap = new NPDisk::TSectorMap(0, diskMode); } } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp index cab783a62b..1bc21e3f4e 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp @@ -102,6 +102,12 @@ YARD_UNIT_TEST(TestLogWriteReadMedium) { Run<TTestLogWriteRead<6000>>(&tc, 1, MIN_CHUNK_SIZE); } +YARD_UNIT_TEST(TestLogWriteReadMediumWithHddSectorMap) { + TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD); + FillDeviceWithZeroes(&tc, MIN_CHUNK_SIZE); + Run<TTestLogWriteRead<6000>>(&tc, 1, MIN_CHUNK_SIZE); +} + YARD_UNIT_TEST(TestLogWriteReadLarge) { TTestContext tc(false, true); Run<TTestLogWriteRead<9000>>(&tc, 1, MIN_CHUNK_SIZE); @@ -203,6 +209,11 @@ YARD_UNIT_TEST(TestChunkWriteRead) { Run<TTestChunkWriteRead<30000, 2 << 20>>(&tc, 1, 5 << 20); } +YARD_UNIT_TEST(TestChunkWriteReadWithHddSectorMap) { + TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD); + Run<TTestChunkWriteRead<30000, 2 << 20>>(&tc, 1, 5 << 20); +} + YARD_UNIT_TEST(TestChunkWriteReadMultiple) { { TTestContext tc(false, true); @@ -222,12 +233,37 @@ YARD_UNIT_TEST(TestChunkWriteReadMultiple) { } } +YARD_UNIT_TEST(TestChunkWriteReadMultipleWithHddSectorMap) { + { + TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD); + Run<TTestChunkWriteRead<6000000, 6500000>>(&tc, 1, 16 << 20, false); + } + { + TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD); + Run<TTestChunkWriteRead<3000000, 3500000>>(&tc, 1, 8 << 20, false); + } + { + TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD); + Run<TTestChunkWriteRead<2 << 20, 2 << 20>>(&tc, 1, 8 << 20, false); + } + { + TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD); + Run<TTestChunkWriteRead<1000000, 1500000>>(&tc, 1, 4 << 20, false); + } +} + YARD_UNIT_TEST(TestChunkWriteReadWhole) { TTestContext tc(false, true); FillDeviceWithZeroes(&tc, MIN_CHUNK_SIZE); Run<TTestChunkWriteReadWhole>(&tc, 1, MIN_CHUNK_SIZE); } +YARD_UNIT_TEST(TestChunkWriteReadWholeWithHddSectorMap) { + TTestContext tc(false, true, NPDisk::NSectorMap::DM_HDD); + FillDeviceWithZeroes(&tc, MIN_CHUNK_SIZE); + Run<TTestChunkWriteReadWhole>(&tc, 1, MIN_CHUNK_SIZE); +} + YARD_UNIT_TEST(TestChunkWrite20Read02) { TTestContext tc(false, true); // 2 << 20 is the read/write burst size, that's why. diff --git a/ydb/library/pdisk_io/aio.h b/ydb/library/pdisk_io/aio.h index 3dfd861459..f42064ca73 100644 --- a/ydb/library/pdisk_io/aio.h +++ b/ydb/library/pdisk_io/aio.h @@ -130,6 +130,7 @@ public: virtual int GetLastErrno() = 0; virtual TString GetPDiskInfo() = 0; virtual TFileHandle *GetFileHandle() = 0; + virtual void OnAsyncIoOperationCompletion(IAsyncIoOperation *op) = 0; }; std::unique_ptr<IAsyncIoContext> CreateAsyncIoContextReal(const TString &path, ui32 pDiskId, TDeviceMode::TFlags flags); diff --git a/ydb/library/pdisk_io/aio_linux.cpp b/ydb/library/pdisk_io/aio_linux.cpp index 6f097bb8f4..418321a7c8 100644 --- a/ydb/library/pdisk_io/aio_linux.cpp +++ b/ydb/library/pdisk_io/aio_linux.cpp @@ -342,6 +342,9 @@ public: TFileHandle *GetFileHandle() override { return File.Get(); } + + void OnAsyncIoOperationCompletion(IAsyncIoOperation *) override { + } }; diff --git a/ydb/library/pdisk_io/aio_map.cpp b/ydb/library/pdisk_io/aio_map.cpp index 19062a7264..72d831ddc0 100644 --- a/ydb/library/pdisk_io/aio_map.cpp +++ b/ydb/library/pdisk_io/aio_map.cpp @@ -3,12 +3,14 @@ #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_countedqueueoneone.h> #include <util/random/random.h> +#include <util/system/spinlock.h> #include <util/thread/pool.h> namespace NKikimr { namespace NPDisk { struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation { + IAsyncIoContext &AsyncIoContext; TSectorMap &SectorMap; TCountedQueueOneOne<IAsyncIoOperation*, 4 << 10> &CompleteQueue; void *Cookie; @@ -22,10 +24,13 @@ struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation { TInstant Deadline; - TAsyncIoOperationMap(TSectorMap §orMap, + bool PrevAsyncIoOperationIsInProgress = false; + + TAsyncIoOperationMap(IAsyncIoContext &asyncIoContext, TSectorMap §orMap, TCountedQueueOneOne<IAsyncIoOperation*, 4 << 10> &completeQueue, void *cookie, TReqId reqId, NWilson::TTraceId *traceId) - : SectorMap(sectorMap) + : AsyncIoContext(asyncIoContext) + , SectorMap(sectorMap) , CompleteQueue(completeQueue) , Cookie(cookie) , ReqId(reqId) @@ -67,17 +72,18 @@ struct TAsyncIoOperationMap : IObjectInQueue, IAsyncIoOperation { switch (Type) { case IAsyncIoOperation::EType::PRead: { - SectorMap.Read((ui8*)Data, Size, Offset); + SectorMap.Read((ui8*)Data, Size, Offset, PrevAsyncIoOperationIsInProgress); break; } case IAsyncIoOperation::EType::PWrite: { - SectorMap.Write((ui8*)Data, Size, Offset); + SectorMap.Write((ui8*)Data, Size, Offset, PrevAsyncIoOperationIsInProgress); break; } default: Y_FAIL_S("Unexpected op type# " << (i64)Type); } + AsyncIoContext.OnAsyncIoOperationCompletion(this); CompleteQueue.Push(this); } @@ -209,6 +215,9 @@ class TAsyncIoContextMap : public IAsyncIoContext { int LastErrno = 0; TPDiskDebugInfo PDiskInfo; + + TSpinLock SpinLock; + IAsyncIoOperation* LastOngoingAsyncIoOperation = nullptr; public: TAsyncIoContextMap(const TString &path, ui32 pDiskId, TIntrusivePtr<TSectorMap> sectorMap) @@ -224,7 +233,7 @@ public: } IAsyncIoOperation* CreateAsyncIoOperation(void* cookie, TReqId reqId, NWilson::TTraceId *traceId) override { - IAsyncIoOperation *operation = new TAsyncIoOperationMap(*SectorMap, CompleteQueue, cookie, reqId, traceId); + IAsyncIoOperation *operation = new TAsyncIoOperationMap(*this, *SectorMap, CompleteQueue, cookie, reqId, traceId); return operation; } @@ -323,7 +332,8 @@ public: if (SectorMap->ImitateRandomWait) { Queue = new TRandomWaitThreadPool(*SectorMap->ImitateRandomWait); } else { - Queue = CreateThreadPool(1, MaxEvents); + Queue = new TThreadPool(); + Queue->Start(1, MaxEvents); } return EIoResult::Ok; } @@ -331,6 +341,15 @@ public: EIoResult Submit(IAsyncIoOperation *op, ICallback *callback) override { op->SetCallback(callback); TAsyncIoOperationMap *operation = static_cast<TAsyncIoOperationMap*>(op); + + { + TGuard<TSpinLock> guard(SpinLock); + if (LastOngoingAsyncIoOperation != nullptr) { + operation->PrevAsyncIoOperationIsInProgress = true; + } + LastOngoingAsyncIoOperation = operation; + } + bool isOk = Queue->Add(operation); return isOk ? EIoResult::Ok : EIoResult::TryAgain; } @@ -349,6 +368,13 @@ public: TFileHandle *GetFileHandle() override { return nullptr; } + + void OnAsyncIoOperationCompletion(IAsyncIoOperation *op) override { + TGuard<TSpinLock> guard(SpinLock); + if (LastOngoingAsyncIoOperation == op) { + LastOngoingAsyncIoOperation = nullptr; + } + } }; std::unique_ptr<IAsyncIoContext> CreateAsyncIoContextMap(const TString &path, ui32 pDiskId, TIntrusivePtr<TSectorMap> sectorMap) { diff --git a/ydb/library/pdisk_io/sector_map.h b/ydb/library/pdisk_io/sector_map.h index f86e8fb6d0..d4d2919bd8 100644 --- a/ydb/library/pdisk_io/sector_map.h +++ b/ydb/library/pdisk_io/sector_map.h @@ -3,28 +3,143 @@ #include <ydb/core/util/yverify_stream.h> #include <library/cpp/actors/util/ticket_lock.h> +#include <util/datetime/base.h> +#include <util/datetime/cputimer.h> #include <util/generic/guid.h> #include <util/generic/hash.h> #include <util/generic/string.h> #include <util/stream/file.h> #include <util/stream/format.h> #include <util/system/mutex.h> +#include <util/system/types.h> #include <contrib/libs/lz4/lz4.h> +#include <array> #include <atomic> #include <optional> namespace NKikimr { namespace NPDisk { +namespace NSectorMap { -constexpr ui64 SectorMapSectorSize = 4096; +enum EDiskMode { + DM_NONE = 0, + DM_HDD = 1, + DM_COUNT = 2 +}; -class TSectorMap : public TThrRefBase { - THashMap<ui64, TString> Map; +inline EDiskMode DiskModeFromString(const TString& diskMode) { + if (diskMode == "HDD") { + return DM_HDD; + } else if (diskMode == "NONE") { + return DM_NONE; + } + + return DM_NONE; +} + +inline TString DiskModeToString(EDiskMode diskMode) { + switch (diskMode) { + case DM_NONE: + return "NONE"; + case DM_HDD: + return "HDD"; + default: + return "UNKNOWN"; + } +} + +constexpr ui64 SECTOR_SIZE = 4096; + +/* TSectorOperationThrottler: thread-safe */ + +class TSectorOperationThrottler { +private: + struct TDiskModeParams { + TDuration SeekSleep; + double FirstSectorRate; + double LastSectorRate; + }; + + static constexpr std::array<TDiskModeParams, NSectorMap::DM_COUNT> DiskModeParams = { + { + {TDuration::MilliSeconds(0), 0, 0}, // DM_NONE + {TDuration::MilliSeconds(9), 200 * 1024 * 1024, 66 * 1024 * 1024} // DM_HDD + } + }; public: + TSectorOperationThrottler(ui64 sectors, NSectorMap::EDiskMode diskMode) { + Init(sectors, diskMode); + } + + void Init(ui64 sectors, NSectorMap::EDiskMode diskMode) { + Y_VERIFY(sectors > 0); + + DiskMode = diskMode; + MaxSector = sectors - 1; + MostRecentlyUsedSector = 0; + } + + void ThrottleRead(i64 size, ui64 offset, bool prevOperationIsInProgress) { + ThrottleOperation(size, offset, prevOperationIsInProgress); + } + + void ThrottleWrite(i64 size, ui64 offset, bool prevOperationIsInProgress) { + ThrottleOperation(size, offset, prevOperationIsInProgress); + } + +private: + /* throttle read/write operation */ + void ThrottleOperation(i64 size, ui64 offset, bool prevOperationIsInProgress) { + if (size == 0) { + return; + } + + ui64 beginSector = offset / NSectorMap::SECTOR_SIZE; + ui64 endSector = (offset + size + NSectorMap::SECTOR_SIZE - 1) / NSectorMap::SECTOR_SIZE; + ui64 midSector = (beginSector + endSector) / 2; + + Y_VERIFY(DiskMode < DiskModeParams.size()); + const auto& diskModeParams = DiskModeParams[DiskMode]; + + { + TGuard<TMutex> guard(Mutex); + if (beginSector != MostRecentlyUsedSector + 1 || !prevOperationIsInProgress) { + Sleep(diskModeParams.SeekSleep); + } + + MostRecentlyUsedSector = endSector - 1; + } + + auto rate = CalcRate(diskModeParams.FirstSectorRate, diskModeParams.LastSectorRate, midSector, MaxSector); + + auto rateByMilliSeconds = rate / 1000; + auto milliSecondsToWait = (double)size / rateByMilliSeconds; + Sleep(TDuration::MilliSeconds(milliSecondsToWait)); + } + + static double CalcRate(double firstSectorRate, double lastSectorRate, double sector, double lastSector) { + Y_VERIFY(sector <= lastSector, "%lf %lf", sector, lastSector); + Y_VERIFY(lastSectorRate <= firstSectorRate, "%lf %lf", firstSectorRate, lastSectorRate); + return firstSectorRate - (sector / lastSector) * (firstSectorRate - lastSectorRate); + } + +private: + TMutex Mutex; + ui64 MaxSector = 0; + ui64 MostRecentlyUsedSector = 0; + NSectorMap::EDiskMode DiskMode = NSectorMap::DM_NONE; +}; + +} // NSectorMap + +/* TSectorMap */ + +class TSectorMap : public TThrRefBase { +public: TString Serial = CreateGuidAsString(); ui64 DeviceSize; @@ -36,13 +151,16 @@ public: std::atomic<ui64> AllocatedBytes; - TSectorMap(ui64 deviceSize = 0) + TSectorMap(ui64 deviceSize = 0, NSectorMap::EDiskMode diskMode = NSectorMap::DM_NONE) : DeviceSize(deviceSize) , IsLocked(false) , ImitateIoErrorProbability(0.0) , ImitateReadIoErrorProbability(0.0) , AllocatedBytes(0) - {} + , DiskMode(diskMode) + { + InitSectorOperationThrottler(); + } bool Lock() { return !IsLocked.exchange(true); @@ -59,42 +177,60 @@ public: Y_VERIFY_S(offset + 4096 <= DeviceSize, "It is not possible to shrink TSectorMap with data"); } } + + InitSectorOperationThrottler(); + } + + void InitSectorOperationThrottler() { + if (DeviceSize > 0 && DiskMode != NSectorMap::DM_NONE) { + SectorOperationThrottler = MakeHolder<NSectorMap::TSectorOperationThrottler>((DeviceSize + NSectorMap::SECTOR_SIZE - 1) / NSectorMap::SECTOR_SIZE, DiskMode); + } else { + SectorOperationThrottler.Reset(); + } } void ZeroInit(ui64 sectors) { - ui64 bytes = sectors * SectorMapSectorSize; + ui64 bytes = sectors * NSectorMap::SECTOR_SIZE; TString str = TString::Uninitialized(bytes); memset(str.Detach(), 0, bytes); Write((ui8*)str.Detach(), bytes, 0); } - void Read(ui8 *data, i64 size, ui64 offset) { - Y_VERIFY(size % SectorMapSectorSize == 0); - Y_VERIFY(offset % SectorMapSectorSize == 0); + void Read(ui8 *data, i64 size, ui64 offset, bool prevOperationIsInProgress = false) { + Y_VERIFY(size % NSectorMap::SECTOR_SIZE == 0); + Y_VERIFY(offset % NSectorMap::SECTOR_SIZE == 0); + + if (SectorOperationThrottler.Get() != nullptr) { + SectorOperationThrottler->ThrottleRead(size, offset, prevOperationIsInProgress); + } TGuard<TTicketLock> guard(MapLock); - for (; size > 0; size -= SectorMapSectorSize) { + for (; size > 0; size -= NSectorMap::SECTOR_SIZE) { if (auto it = Map.find(offset); it == Map.end()) { - memset(data, 0x33, SectorMapSectorSize); + memset(data, 0x33, NSectorMap::SECTOR_SIZE); } else { - char tmp[4 * SectorMapSectorSize]; - int processed = LZ4_decompress_safe(it->second.data(), tmp, it->second.size(), 4 * SectorMapSectorSize); - Y_VERIFY_S(processed == SectorMapSectorSize, "processed# " << processed); - memcpy(data, tmp, SectorMapSectorSize); + char tmp[4 * NSectorMap::SECTOR_SIZE]; + int processed = LZ4_decompress_safe(it->second.data(), tmp, it->second.size(), 4 * NSectorMap::SECTOR_SIZE); + Y_VERIFY_S(processed == NSectorMap::SECTOR_SIZE, "processed# " << processed); + memcpy(data, tmp, NSectorMap::SECTOR_SIZE); } - offset += SectorMapSectorSize; - data += SectorMapSectorSize; + offset += NSectorMap::SECTOR_SIZE; + data += NSectorMap::SECTOR_SIZE; } } - void Write(const ui8 *data, i64 size, ui64 offset) { - Y_VERIFY(size % SectorMapSectorSize == 0); - Y_VERIFY(offset % SectorMapSectorSize == 0); + void Write(const ui8 *data, i64 size, ui64 offset, bool prevOperationIsInProgress = false) { + Y_VERIFY(size % NSectorMap::SECTOR_SIZE == 0); + Y_VERIFY(offset % NSectorMap::SECTOR_SIZE == 0); + + if (SectorOperationThrottler.Get() != nullptr) { + SectorOperationThrottler->ThrottleWrite(size, offset, prevOperationIsInProgress); + } TGuard<TTicketLock> guard(MapLock); - for (; size > 0; size -= SectorMapSectorSize) { - char tmp[4 * SectorMapSectorSize]; - int written = LZ4_compress_default((const char*)data, tmp, SectorMapSectorSize, 4 * SectorMapSectorSize); + for (; size > 0; size -= NSectorMap::SECTOR_SIZE) { + char tmp[4 * NSectorMap::SECTOR_SIZE]; + int written = LZ4_compress_default((const char*)data, tmp, NSectorMap::SECTOR_SIZE, 4 * NSectorMap::SECTOR_SIZE); Y_VERIFY_S(written > 0, "written# " << written); TString str = TString::Uninitialized(written); memcpy(str.Detach(), tmp, written); @@ -105,26 +241,26 @@ public: Map[offset] = str; } AllocatedBytes.fetch_add(Map[offset].size()); - offset += SectorMapSectorSize; - data += SectorMapSectorSize; + offset += NSectorMap::SECTOR_SIZE; + data += NSectorMap::SECTOR_SIZE; } } void Trim(i64 size, ui64 offset) { TGuard<TTicketLock> guard(MapLock); - Y_VERIFY(size % SectorMapSectorSize == 0); - Y_VERIFY(offset % SectorMapSectorSize == 0); - for (; size > 0; size -= SectorMapSectorSize) { + Y_VERIFY(size % NSectorMap::SECTOR_SIZE == 0); + Y_VERIFY(offset % NSectorMap::SECTOR_SIZE == 0); + for (; size > 0; size -= NSectorMap::SECTOR_SIZE) { if (auto it = Map.find(offset); it != Map.end()) { AllocatedBytes.fetch_sub(it->second.size()); Map.erase(it); } - offset += SectorMapSectorSize; + offset += NSectorMap::SECTOR_SIZE; } } ui64 DataBytes() const { - return Map.size() * 4096; + return Map.size() * NSectorMap::SECTOR_SIZE; } TString ToString() const { @@ -140,12 +276,18 @@ public: str << "ImitateIoErrorProbability# " << ImitateIoErrorProbability.load() << "\n"; str << "AllocatedBytes (approx.)# " << HumanReadableSize(AllocatedBytes.load(), SF_QUANTITY) << "\n"; str << "DataBytes# " << HumanReadableSize(DataBytes(), SF_QUANTITY) << "\n"; + str << "DiskMode# " << DiskModeToString(DiskMode) << "\n"; return str.Str(); } // Requires proto information, so should be defined in cpp void LoadFromFile(const TString& path); void StoreToFile(const TString& path); + +private: + THashMap<ui64, TString> Map; + NSectorMap::EDiskMode DiskMode = NSectorMap::DM_NONE; + THolder<NSectorMap::TSectorOperationThrottler> SectorOperationThrottler; }; } // NPDisk |