diff options
author | Vlad Kuznetsov <va-kuznecov@ydb.tech> | 2024-10-03 16:46:31 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-03 16:46:31 +0200 |
commit | 9b97c8ff2469b21f21163c4ee70118b3152a15c0 (patch) | |
tree | 3f13c0f5289eae6773b3f664c8e44081a8634fbc | |
parent | 1faa700fdac58d295c2b813ffbd47b9f7cadf2d6 (diff) | |
download | ydb-9b97c8ff2469b21f21163c4ee70118b3152a15c0.tar.gz |
Fix unimportant deadlock in TBlockDevice and add new UT (#9991)
-rw-r--r-- | ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp | 60 |
2 files changed, 59 insertions, 5 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp index 2731863137..7e81e1ebca 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp @@ -111,14 +111,14 @@ class TRealBlockDevice : public IBlockDevice { // Schedule action execution // pass action = nullptr to quit void Schedule(TCompletionAction *action) noexcept { - TAtomicBase queueActions = AtomicIncrement(QueuedActions); - if (queueActions >= MaxQueuedActions) { + if (AtomicGet(QueuedActions) >= MaxQueuedActions) { Device.Mon.L7.Set(true, AtomicGetAndIncrement(SeqnoL7)); while (AtomicGet(QueuedActions) >= MaxQueuedActions) { SpinLockPause(); } Device.Mon.L7.Set(false, AtomicGetAndIncrement(SeqnoL7)); } + AtomicIncrement(QueuedActions); CompletionActions.Push(action); return; } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp index a38f392910..aebfb7c78c 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp @@ -112,23 +112,27 @@ public: } }; -class TCompletionCounter : public NPDisk::TCompletionAction { +class TCompletionWorkerWithCounter : public NPDisk::TCompletionAction { public: - TCompletionCounter(TAtomic& counter) + TCompletionWorkerWithCounter(TAtomic& counter, TDuration workTime = TDuration::Zero()) : Counter(counter) + , WorkTime(workTime) {} void Exec(TActorSystem *) override { + Sleep(WorkTime); AtomicIncrement(Counter); delete this; } void Release(TActorSystem *) override { + AtomicIncrement(Counter); delete this; } private: TAtomic& Counter; + TDuration WorkTime; }; static TString MakeDatabasePath(const char *dir) { @@ -250,7 +254,7 @@ void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 disk TAtomic expectedWrites = diskSize / (offsetIncrement); for (ui64 offset = 0; offset < diskSize; offset += offsetIncrement) { - device->PwriteAsync(data.Get(), data.Size(), offset, new TCompletionCounter(completedWrites), NPDisk::TReqId(NPDisk::TReqId::Test1, 0), {}); + device->PwriteAsync(data.Get(), data.Size(), offset, new TCompletionWorkerWithCounter(completedWrites), NPDisk::TReqId(NPDisk::TReqId::Test1, 0), {}); } WaitForValue(&completedWrites, TIMEOUT, expectedWrites); @@ -341,6 +345,56 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) { Ctest << "Done" << Endl; } + Y_UNIT_TEST(WriteReadRestart) { + using namespace NPDisk; + + TActorSystemCreator creator; + auto start = TMonotonic::Now(); + while ((TMonotonic::Now() - start).Seconds() < 5) { + const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters; + THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr)); + + ui32 buffSize = 64_KB; + ui32 bufferPoolSize = 512; + THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(buffSize, bufferPoolSize, false, {})); + ui64 inFlight = 128; + ui32 maxQueuedCompletionActions = bufferPoolSize / 2; + ui64 diskSize = 32_GB; + + TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, NSectorMap::DM_NONE); + THolder<NPDisk::IBlockDevice> device(CreateRealBlockDevice("", *mon, 0, 0, inFlight, TDeviceMode::None, + maxQueuedCompletionActions, sectorMap)); + device->Initialize(std::make_shared<TPDiskCtx>(creator.GetActorSystem())); + + TAtomic counter = 0; + const i64 totalRequests = 500; + for (i64 i = 0; i < totalRequests; i++) { + auto *completion = new TCompletionWorkerWithCounter(counter, TDuration::MicroSeconds(100)); + NPDisk::TBuffer::TPtr buffer(bufferPool->Pop()); + buffer->FlushAction = completion; + auto* data = buffer->Data(); + switch (RandomNumber<ui32>(3)) { + case 0: + device->PreadAsync(data, 32_KB, 0, buffer.Release(), TReqId(), nullptr); + break; + case 1: + device->PwriteAsync(data, 32_KB, 0, buffer.Release(), TReqId(), nullptr); + break; + case 2: + device->FlushAsync(completion, TReqId()); + buffer->FlushAction = nullptr; + break; + default: + break; + } + } + + Cerr << AtomicGet(counter) << Endl; + device.Destroy(); + UNIT_ASSERT(AtomicGet(counter) == totalRequests); + } + } + /* Y_UNIT_TEST(TestRabbitCompletionAction) { const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters; |