aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVlad Kuznetsov <va-kuznecov@ydb.tech>2024-10-03 16:46:31 +0200
committerGitHub <noreply@github.com>2024-10-03 16:46:31 +0200
commit9b97c8ff2469b21f21163c4ee70118b3152a15c0 (patch)
tree3f13c0f5289eae6773b3f664c8e44081a8634fbc
parent1faa700fdac58d295c2b813ffbd47b9f7cadf2d6 (diff)
downloadydb-9b97c8ff2469b21f21163c4ee70118b3152a15c0.tar.gz
Fix unimportant deadlock in TBlockDevice and add new UT (#9991)
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp4
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp60
2 files changed, 59 insertions, 5 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
index 2731863137..7e81e1ebca 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
@@ -111,14 +111,14 @@ class TRealBlockDevice : public IBlockDevice {
// Schedule action execution
// pass action = nullptr to quit
void Schedule(TCompletionAction *action) noexcept {
- TAtomicBase queueActions = AtomicIncrement(QueuedActions);
- if (queueActions >= MaxQueuedActions) {
+ if (AtomicGet(QueuedActions) >= MaxQueuedActions) {
Device.Mon.L7.Set(true, AtomicGetAndIncrement(SeqnoL7));
while (AtomicGet(QueuedActions) >= MaxQueuedActions) {
SpinLockPause();
}
Device.Mon.L7.Set(false, AtomicGetAndIncrement(SeqnoL7));
}
+ AtomicIncrement(QueuedActions);
CompletionActions.Push(action);
return;
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
index a38f392910..aebfb7c78c 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
@@ -112,23 +112,27 @@ public:
}
};
-class TCompletionCounter : public NPDisk::TCompletionAction {
+class TCompletionWorkerWithCounter : public NPDisk::TCompletionAction {
public:
- TCompletionCounter(TAtomic& counter)
+ TCompletionWorkerWithCounter(TAtomic& counter, TDuration workTime = TDuration::Zero())
: Counter(counter)
+ , WorkTime(workTime)
{}
void Exec(TActorSystem *) override {
+ Sleep(WorkTime);
AtomicIncrement(Counter);
delete this;
}
void Release(TActorSystem *) override {
+ AtomicIncrement(Counter);
delete this;
}
private:
TAtomic& Counter;
+ TDuration WorkTime;
};
static TString MakeDatabasePath(const char *dir) {
@@ -250,7 +254,7 @@ void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 disk
TAtomic expectedWrites = diskSize / (offsetIncrement);
for (ui64 offset = 0; offset < diskSize; offset += offsetIncrement) {
- device->PwriteAsync(data.Get(), data.Size(), offset, new TCompletionCounter(completedWrites), NPDisk::TReqId(NPDisk::TReqId::Test1, 0), {});
+ device->PwriteAsync(data.Get(), data.Size(), offset, new TCompletionWorkerWithCounter(completedWrites), NPDisk::TReqId(NPDisk::TReqId::Test1, 0), {});
}
WaitForValue(&completedWrites, TIMEOUT, expectedWrites);
@@ -341,6 +345,56 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) {
Ctest << "Done" << Endl;
}
+ Y_UNIT_TEST(WriteReadRestart) {
+ using namespace NPDisk;
+
+ TActorSystemCreator creator;
+ auto start = TMonotonic::Now();
+ while ((TMonotonic::Now() - start).Seconds() < 5) {
+ const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
+ THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
+
+ ui32 buffSize = 64_KB;
+ ui32 bufferPoolSize = 512;
+ THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(buffSize, bufferPoolSize, false, {}));
+ ui64 inFlight = 128;
+ ui32 maxQueuedCompletionActions = bufferPoolSize / 2;
+ ui64 diskSize = 32_GB;
+
+ TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, NSectorMap::DM_NONE);
+ THolder<NPDisk::IBlockDevice> device(CreateRealBlockDevice("", *mon, 0, 0, inFlight, TDeviceMode::None,
+ maxQueuedCompletionActions, sectorMap));
+ device->Initialize(std::make_shared<TPDiskCtx>(creator.GetActorSystem()));
+
+ TAtomic counter = 0;
+ const i64 totalRequests = 500;
+ for (i64 i = 0; i < totalRequests; i++) {
+ auto *completion = new TCompletionWorkerWithCounter(counter, TDuration::MicroSeconds(100));
+ NPDisk::TBuffer::TPtr buffer(bufferPool->Pop());
+ buffer->FlushAction = completion;
+ auto* data = buffer->Data();
+ switch (RandomNumber<ui32>(3)) {
+ case 0:
+ device->PreadAsync(data, 32_KB, 0, buffer.Release(), TReqId(), nullptr);
+ break;
+ case 1:
+ device->PwriteAsync(data, 32_KB, 0, buffer.Release(), TReqId(), nullptr);
+ break;
+ case 2:
+ device->FlushAsync(completion, TReqId());
+ buffer->FlushAction = nullptr;
+ break;
+ default:
+ break;
+ }
+ }
+
+ Cerr << AtomicGet(counter) << Endl;
+ device.Destroy();
+ UNIT_ASSERT(AtomicGet(counter) == totalRequests);
+ }
+ }
+
/*
Y_UNIT_TEST(TestRabbitCompletionAction) {
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;