aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVlad Kuznetsov <va-kuznecov@ydb.tech>2024-10-04 12:49:25 +0200
committerGitHub <noreply@github.com>2024-10-04 12:49:25 +0200
commite55564f7a0a0ff7b638ce199c1b5005b696d349d (patch)
treed1a4e4d60f0659be2939510419871fd3f9b406e9
parentf2805c67d45546a33b9605ea33bb2fde285498c9 (diff)
downloadydb-e55564f7a0a0ff7b638ce199c1b5005b696d349d.tar.gz
Implement configurable number of completion threads (#9715)
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h3
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp221
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp3
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h1
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp35
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h10
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h8
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp2
-rw-r--r--ydb/core/protos/blobstorage_pdisk_config.proto3
10 files changed, 184 insertions, 104 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
index 8faf1d968d..6713f4d98e 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
@@ -64,7 +64,8 @@ class TPDisk;
IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon,
ui64 reorderingCycles, ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags,
- ui32 maxQueuedCompletionActions, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk = nullptr);
+ ui32 maxQueuedCompletionActions, ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap,
+ TPDisk * const pdisk = nullptr);
IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags,
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk = nullptr);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
index 7e81e1ebca..3f7e5d782a 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
@@ -44,41 +44,29 @@ class TRealBlockDevice : public IBlockDevice {
////////////////////////////////////////////////////////
// TCompletionThread
////////////////////////////////////////////////////////
- class TCompletionThread : public TThread {
- static constexpr ui32 NumOfWriters = 2;
+ class TCompletionThread : public ISimpleThread {
public:
- TCompletionThread(TRealBlockDevice &device, ui32 maxQueuedActions)
- : TThread(&ThreadProc, this)
- , Device(device)
- , QueuedActions(0)
- , MaxQueuedActions(maxQueuedActions)
+ TCompletionThread(TRealBlockDevice &device, TString name)
+ : Device(device)
+ , Name(name)
{}
- static void* ThreadProc(void* _this) {
- SetCurrentThreadName("PdCmpl");
- static_cast<TCompletionThread*>(_this)->Exec();
- return nullptr;
- }
-
- void Exec() {
- ui32 exitSignalsReceived = 0;
- Device.Mon.L7.Set(false, AtomicGetAndIncrement(SeqnoL7));
+ void *ThreadProc() override {
+ SetCurrentThreadName(Name.data());
auto prevCycleEnd = HPNow();
bool isWorking = true;
bool stateError = false;
+ auto cpuCounter = Device.Mon.PDiskGroup->GetCounter(Name + "CPU", true);
+
while(isWorking) {
- TAtomicBase actionCount = CompletionActions.GetWaitingSize();
+ TAtomicBase actionCount = Queue.GetWaitingSize();
if (actionCount > 0) {
for (TAtomicBase idx = 0; idx < actionCount; ++idx) {
- TCompletionAction *action = CompletionActions.Pop();
- AtomicDecrement(QueuedActions);
+ TCompletionAction *action = Queue.Pop();
if (action == nullptr) {
- ++exitSignalsReceived;
- if (exitSignalsReceived == NumOfWriters) {
- isWorking = false;
- }
+ isWorking = false;
} else {
if (!stateError && action->CanHandleResult()) {
action->Exec(Device.PCtx->ActorSystem);
@@ -96,8 +84,8 @@ class TRealBlockDevice : public IBlockDevice {
}
}
} else {
- *Device.Mon.CompletionThreadCPU = ThreadCPUTime();
- CompletionActions.ProducedWaitI();
+ *cpuCounter = ThreadCPUTime();
+ Queue.ProducedWaitI();
}
const auto cycleEnd = HPNow();
@@ -106,37 +94,127 @@ class TRealBlockDevice : public IBlockDevice {
}
prevCycleEnd = cycleEnd;
}
+ return nullptr;
+ }
+
+ void Schedule(TCompletionAction *action) {
+ Queue.Push(action);
+ }
+
+ size_t GetQueuedActions() {
+ return Queue.GetWaitingSize();
+ }
+
+ private:
+ TCountedQueueManyOne<TCompletionAction, 4 << 10> Queue;
+ TRealBlockDevice& Device;
+ TString Name;
+ };
+
+ class TCompletionThreads {
+
+ public:
+ std::vector<std::unique_ptr<TCompletionThread>> Threads;
+
+ TCompletionThreads(TRealBlockDevice &device, size_t threadsCount, ui32 maxQueuedActions)
+ : Device(device)
+ , MaxQueuedActions(maxQueuedActions)
+ {
+ for (size_t i = 0; i < threadsCount; ++i) {
+ Threads.push_back(std::make_unique<TCompletionThread>(device, TStringBuilder() << "PdCmpl_" << i));
+ Threads.back()->Start();
+ }
}
// Schedule action execution
// pass action = nullptr to quit
void Schedule(TCompletionAction *action) noexcept {
- if (AtomicGet(QueuedActions) >= MaxQueuedActions) {
- Device.Mon.L7.Set(true, AtomicGetAndIncrement(SeqnoL7));
- while (AtomicGet(QueuedActions) >= MaxQueuedActions) {
- SpinLockPause();
+ if (!action) {
+ StopWork();
+ return;
+ }
+
+ if (!action->ShouldBeExecutedInCompletionThread || Threads.empty()) {
+ ExecuteActionInPlace(action);
+ return;
+ }
+
+ while (true) {
+ auto min_it = Threads.begin();
+ auto minQueueSize = (*min_it)->GetQueuedActions();
+ auto totalSize = minQueueSize;
+ for (auto it = Threads.begin() + 1; it != Threads.end(); ++it) {
+ auto queueSize = (*it)->GetQueuedActions();
+ totalSize += queueSize;
+ if (queueSize < minQueueSize) {
+ minQueueSize = queueSize;
+ min_it = it;
+ }
}
- Device.Mon.L7.Set(false, AtomicGetAndIncrement(SeqnoL7));
+ if (totalSize >= MaxQueuedActions) {
+ // We have a risk to run out of buffers from BufferPool, so MaxQueuedActions is expected
+ // to counter that
+ Device.Mon.L7.Set(true, AtomicGetAndIncrement(SeqnoL7));
+ Sleep(TDuration::MilliSeconds(1));
+ Device.Mon.L7.Set(false, AtomicGetAndIncrement(SeqnoL7));
+ continue;
+ }
+
+ Y_ABORT_UNLESS(min_it != Threads.end());
+ if (action->CanBeExecutedInAdditionalCompletionThread) {
+ (*min_it)->Schedule(action);
+ } else {
+ // actions which are supposed to be executed from single thread always will be executer in 0 thread
+ // scheduled here to pass MaxQueuedActions check above
+ Threads[0]->Schedule(action);
+ }
+
+ return;
}
- AtomicIncrement(QueuedActions);
- CompletionActions.Push(action);
- return;
}
- // Schedule action execution
- // pass action = nullptr to quit
+ void ExecuteActionInPlace(TCompletionAction *action) {
+ if (action->CanHandleResult()) {
+ action->Exec(Device.PCtx->ActorSystem);
+ } else {
+ TString errorReason = action->ErrorReason;
+
+ action->Release(Device.PCtx->ActorSystem);
+
+ if (!Device.QuitCounter.IsBlocked()) {
+ Device.BecomeErrorState(TStringBuilder()
+ << " CompletionAction error, operation info# " << errorReason);
+ }
+ }
+ }
+
+ // This hack make it possible to execute some completion action beside of device but in ComplitionThread
void ScheduleHackForLogReader(TCompletionAction *action) noexcept {
- AtomicIncrement(QueuedActions);
action->Result = EIoResult::Ok;
- CompletionActions.Push(action);
+ if (Threads.empty()) {
+ action->Exec(Device.PCtx->ActorSystem);
+ } else {
+ Threads[0]->Schedule(action);
+ }
return;
}
+ void StopWork() {
+ for (auto& thread : Threads) {
+ thread->Schedule(nullptr);
+ }
+ }
+
+ void Join() {
+ for (auto& thread : Threads) {
+ thread->Join();
+ }
+ Threads.clear();
+ }
+
private:
- TCountedQueueManyOne<TCompletionAction, 4 << 10> CompletionActions;
TRealBlockDevice &Device;
- TAtomic QueuedActions;
- const TAtomicBase MaxQueuedActions;
+ const size_t MaxQueuedActions;
TAtomic SeqnoL7 = 0;
};
@@ -368,25 +446,6 @@ class TRealBlockDevice : public IBlockDevice {
}
}
- void ExecuteOrScheduleCompletion(TCompletionAction *action) {
- if (action->ShouldBeExecutedInCompletionThread) {
- Device.CompletionThread->Schedule(action);
- } else {
- if (action->CanHandleResult()) {
- action->Exec(Device.PCtx->ActorSystem);
- } else {
- TString errorReason = action->ErrorReason;
-
- action->Release(Device.PCtx->ActorSystem);
-
- if (!Device.QuitCounter.IsBlocked()) {
- Device.BecomeErrorState(TStringBuilder()
- << " CompletionAction error, operation info# " << errorReason);
- }
- }
- }
- }
-
void Exec(TAsyncIoOperationResult *event) {
IAsyncIoOperation *op = event->Operation;
// Add up the execution time of all the events
@@ -446,7 +505,7 @@ class TRealBlockDevice : public IBlockDevice {
WaitingNoops[idx % MaxWaitingNoops] = completionAction->FlushAction;
completionAction->FlushAction = nullptr;
}
- ExecuteOrScheduleCompletion(completionAction);
+ Device.CompletionThreads->Schedule(completionAction);
auto seqnoL6 = AtomicGetAndIncrement(Device.Mon.SeqnoL6);
Device.Mon.L6.Set(duration > Device.Reordering, seqnoL6);
}
@@ -464,7 +523,7 @@ class TRealBlockDevice : public IBlockDevice {
LWTRACK(PDiskDeviceGetFromWaiting, WaitingNoops[i]->Orbit);
double durationMs = HPMilliSecondsFloat(HPNow() - WaitingNoops[i]->GetTime);
Device.Mon.DeviceFlushDuration.Increment(durationMs);
- ExecuteOrScheduleCompletion(WaitingNoops[i]);
+ Device.CompletionThreads->Schedule(WaitingNoops[i]);
WaitingNoops[i] = nullptr;
}
++NextPossibleNoop;
@@ -511,7 +570,7 @@ class TRealBlockDevice : public IBlockDevice {
}
}
// Stop the completion thread
- Device.CompletionThread->Schedule(nullptr);
+ Device.CompletionThreads->Schedule(nullptr);
}
};
@@ -696,7 +755,6 @@ class TRealBlockDevice : public IBlockDevice {
for (TAtomicBase idx = 0; idx < actionCount; ++idx) {
IAsyncIoOperation *op = TrimOperations.Pop();
if (op == nullptr) {
- Device.CompletionThread->Schedule(nullptr);
return;
}
Y_ABORT_UNLESS(op->GetType() == IAsyncIoOperation::EType::PTrim);
@@ -721,7 +779,7 @@ class TRealBlockDevice : public IBlockDevice {
LWPROBE(PDiskDeviceTrimDuration, Device.GetPDiskId(), duration, op->GetOffset());
}
completion->SetResult(EIoResult::Ok);
- Device.CompletionThread->Schedule(completion);
+ completion->Exec(Device.PCtx->ActorSystem);
Device.IoContext->DestroyAsyncIoOperation(op);
}
} else {
@@ -744,8 +802,10 @@ protected:
TPDiskMon &Mon;
TString Path;
+ TPDiskConfig cfg{0, 0, 0};
+
private:
- THolder<TCompletionThread> CompletionThread;
+ THolder<TCompletionThreads> CompletionThreads;
THolder<TTrimThread> TrimThread;
THolder<TGetThread> GetEventsThread;
THolder<TSubmitGetThread> SpdkSubmitGetThread;
@@ -758,7 +818,8 @@ private:
ui64 Reordering;
ui64 SeekCostNs;
bool IsTrimEnabled;
- ui32 MaxQueuedCompletionActions;
+ const ui32 MaxQueuedCompletionActions; // for all threads
+ const ui32 CompletionThreadsCount;
TIdleCounter IdleCounter; // Includes reads, writes and trims
@@ -782,10 +843,10 @@ private:
public:
TRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
- TIntrusivePtr<TSectorMap> sectorMap)
+ ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap)
: Mon(mon)
, Path(path)
- , CompletionThread(nullptr)
+ , CompletionThreads(nullptr)
, TrimThread(nullptr)
, GetEventsThread(nullptr)
, SharedCallback(nullptr)
@@ -796,6 +857,7 @@ public:
, SeekCostNs(seekCostNs)
, IsTrimEnabled(true)
, MaxQueuedCompletionActions(maxQueuedCompletionActions)
+ , CompletionThreadsCount(completionThreadsCount)
, IdleCounter(Mon.IdleLight)
, Flags(flags)
, SectorMap(sectorMap)
@@ -857,7 +919,7 @@ protected:
}
if (IsFileOpened) {
IoContext->SetActorSystem(PCtx->ActorSystem);
- CompletionThread = MakeHolder<TCompletionThread>(*this, MaxQueuedCompletionActions);
+ CompletionThreads = MakeHolder<TCompletionThreads>(*this, CompletionThreadsCount, MaxQueuedCompletionActions);
TrimThread = MakeHolder<TTrimThread>(*this);
SharedCallback = MakeHolder<TSharedCallback>(*this);
if (Flags & TDeviceMode::UseSpdk) {
@@ -874,7 +936,6 @@ protected:
GetEventsThread->Start();
}
}
- CompletionThread->Start();
TrimThread->Start();
IsInitialized = true;
}
@@ -1041,7 +1102,7 @@ protected:
}
completionAction->SetResult(EIoResult::Ok);
- CompletionThread->Schedule(completionAction);
+ CompletionThreads->Schedule(completionAction);
}
void NoopAsyncHackForLogReader(TCompletionAction *completionAction, TReqId /*reqId*/) override {
@@ -1056,7 +1117,7 @@ protected:
}
completionAction->SetResult(EIoResult::Ok);
- CompletionThread->ScheduleHackForLogReader(completionAction);
+ CompletionThreads->ScheduleHackForLogReader(completionAction);
}
void TrimAsync(ui32 size, ui64 offset, TCompletionAction *completionAction, TReqId reqId) override {
@@ -1127,7 +1188,7 @@ protected:
if (res.PrevA ^ res.A) { // res.ToggledA()
if (IsInitialized) {
Y_ABORT_UNLESS(TrimThread);
- Y_ABORT_UNLESS(CompletionThread);
+ Y_ABORT_UNLESS(CompletionThreads);
TrimThread->Schedule(nullptr); // Stop the Trim thread
if (Flags & TDeviceMode::UseSpdk) {
Y_ABORT_UNLESS(SpdkSubmitGetThread);
@@ -1145,13 +1206,13 @@ protected:
}
SharedCallback->Destroy();
TrimThread->Join();
- CompletionThread->Join();
+ CompletionThreads->Join();
IsInitialized = false;
} else {
Y_ABORT_UNLESS(SubmitThread.Get() == nullptr);
Y_ABORT_UNLESS(GetEventsThread.Get() == nullptr);
Y_ABORT_UNLESS(TrimThread.Get() == nullptr);
- Y_ABORT_UNLESS(CompletionThread.Get() == nullptr);
+ Y_ABORT_UNLESS(CompletionThreads.Get() == nullptr);
}
if (IsFileOpened) {
EIoResult ret = IoContext->Destroy();
@@ -1287,9 +1348,9 @@ class TCachedBlockDevice : public TRealBlockDevice {
public:
TCachedBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
- TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk)
+ ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk)
: TRealBlockDevice(path, mon, reorderingCycles, seekCostNs, deviceInFlight, flags,
- maxQueuedCompletionActions, sectorMap)
+ maxQueuedCompletionActions, completionThreadsCount, sectorMap)
, ReadsInFly(0)
, PDisk(pdisk)
{}
@@ -1425,14 +1486,14 @@ public:
IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
- TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk) {
+ ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk) {
return new TCachedBlockDevice(path, mon, reorderingCycles, seekCostNs, deviceInFlight, flags,
- maxQueuedCompletionActions, sectorMap, pdisk);
+ maxQueuedCompletionActions, completionThreadsCount, sectorMap, pdisk);
}
IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags,
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk) {
- IBlockDevice *device = CreateRealBlockDevice(path, mon, 0, 0, 4, flags, 8, sectorMap, pdisk);
+ IBlockDevice *device = CreateRealBlockDevice(path, mon, 0, 0, 4, flags, 8, 1, sectorMap, pdisk);
device->Initialize(std::make_shared<TPDiskCtx>(actorSystem));
return device;
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
index 9068feaa67..c309f90d26 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
@@ -288,11 +288,12 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) {
THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(buffSize, bufferPoolSize, false, {}));
ui64 inFlight = 128;
ui32 maxQueuedCompletionActions = bufferPoolSize / 2;
+ ui32 completionThreadsCount = 1;
ui64 diskSize = 32_GB;
TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, NSectorMap::DM_NONE);
THolder<NPDisk::IBlockDevice> device(CreateRealBlockDevice("", *mon, 0, 0, inFlight, TDeviceMode::None,
- maxQueuedCompletionActions, sectorMap));
+ maxQueuedCompletionActions, completionThreadsCount, sectorMap));
device->Initialize(std::make_shared<TPDiskCtx>(creator.GetActorSystem()));
TAtomic counter = 0;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h
index 951dd03390..31dbc6681e 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h
@@ -23,6 +23,7 @@ struct TCompletionAction {
// to BlockDevice from Exec() and it's more safe to use WhiteList to allow only
// LogWrite and ChunkWrite to be executed from GetThread
bool ShouldBeExecutedInCompletionThread = true;
+ bool CanBeExecutedInAdditionalCompletionThread = false;
mutable NLWTrace::TOrbit Orbit;
protected:
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp
index b0a4514d00..4b32f513f9 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp
@@ -128,10 +128,16 @@ TCompletionChunkReadPart::TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr<
, PayloadReadSize(payloadReadSize)
, CommonBufferOffset(commonBufferOffset)
, CumulativeCompletion(cumulativeCompletion)
+ , ChunkNonce(CumulativeCompletion->GetChunkNonce())
, Buffer(PDisk->BufferPool->Pop())
, IsTheLastPart(isTheLastPart)
, Span(std::move(span))
{
+ TCompletionAction::CanBeExecutedInAdditionalCompletionThread = true;
+
+ TBufferWithGaps *commonBuffer = CumulativeCompletion->GetCommonBuffer();
+ Destination = commonBuffer->RawDataPtr(CommonBufferOffset, PayloadReadSize);
+
if (!IsTheLastPart) {
CumulativeCompletion->AddPart();
}
@@ -166,8 +172,6 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
Read->Offset + CommonBufferOffset, PayloadReadSize, firstSector, lastSector, sectorOffset);
Y_ABORT_UNLESS(isOk);
- TBufferWithGaps *commonBuffer = CumulativeCompletion->GetCommonBuffer();
- ui8 *destination = commonBuffer->RawDataPtr(CommonBufferOffset, PayloadReadSize);
ui8* source = Buffer->Data();
@@ -183,8 +187,6 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
sectorOffset = 0;
}
- ui64 chunkNonce = CumulativeCompletion->GetChunkNonce();
-
ui32 beginBadUserOffset = 0xffffffff;
ui32 endBadUserOffset = 0xffffffff;
ui32 userSectorSize = format.SectorPayloadSize();
@@ -193,7 +195,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
TSectorRestorator restorator(false, 1, false,
format, PDisk->PCtx.get(), &PDisk->Mon, PDisk->BufferPool.Get());
- ui64 lastNonce = Min((ui64)0, chunkNonce - 1);
+ ui64 lastNonce = Min((ui64)0, ChunkNonce - 1);
restorator.Restore(source, format.Offset(Read->ChunkIdx, sectorIdx), format.MagicDataChunk, lastNonce,
Read->Owner);
@@ -211,7 +213,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
<< " for owner# " << Read->Owner
<< " beginBadUserOffet# " << beginBadUserOffset << " endBadUserOffset# " << endBadUserOffset
<< " due to multiple sectors with incorrect hashes. Marker# BPC001");
- commonBuffer->AddGap(beginBadUserOffset, endBadUserOffset);
+ CumulativeCompletion->AddGap(beginBadUserOffset, endBadUserOffset);
beginBadUserOffset = 0xffffffff;
endBadUserOffset = 0xffffffff;
}
@@ -221,16 +223,16 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
// Decrypt data
if (beginBadUserOffset != 0xffffffff) {
- memset(destination, 0, sectorPayloadSize);
+ memset(Destination, 0, sectorPayloadSize);
} else {
TDataSectorFooter *footer = (TDataSectorFooter*) (source + format.SectorSize - sizeof(TDataSectorFooter));
- if (footer->Nonce != chunkNonce + sectorIdx) {
+ if (footer->Nonce != ChunkNonce + sectorIdx) {
ui32 userOffset = sectorIdx * userSectorSize;
LOG_INFO_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PCtx->PDiskId
<< " ReqId# " << Read->ReqId
<< " Can't read chunk chunkIdx# " << Read->ChunkIdx
<< " for owner# " << Read->Owner
- << " nonce mismatch: expected# " << (ui64)(chunkNonce + sectorIdx)
+ << " nonce mismatch: expected# " << (ui64)(ChunkNonce + sectorIdx)
<< ", on-disk# " << (ui64)footer->Nonce
<< " for userOffset# " << userOffset
<< " ! Marker# BPC002");
@@ -238,18 +240,18 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
beginBadUserOffset = userOffset;
}
endBadUserOffset = beginUserOffset + userSectorSize;
- memset(destination, 0, sectorPayloadSize);
+ memset(Destination, 0, sectorPayloadSize);
} else {
cypher.StartMessage(footer->Nonce);
- if (sectorOffset > 0 || intptr_t(destination) % 32) {
+ if (sectorOffset > 0 || intptr_t(Destination) % 32) {
cypher.InplaceEncrypt(source, sectorOffset + sectorPayloadSize);
if (CommonBufferOffset == 0 || !IsTheLastPart) {
- memcpy(destination, source + sectorOffset, sectorPayloadSize);
+ memcpy(Destination, source + sectorOffset, sectorPayloadSize);
} else {
- memcpy(destination, source, sectorPayloadSize);
+ memcpy(Destination, source, sectorPayloadSize);
}
} else {
- cypher.Encrypt(destination, source, sectorPayloadSize);
+ cypher.Encrypt(Destination, source, sectorPayloadSize);
}
if (CanarySize > 0) {
ui32 canaryPosition = sectorOffset + sectorPayloadSize;
@@ -259,7 +261,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
}
}
}
- destination += sectorPayloadSize;
+ Destination += sectorPayloadSize;
source += format.SectorSize;
PayloadReadSize -= sectorPayloadSize;
sectorPayloadSize = Min(format.SectorPayloadSize(), PayloadReadSize);
@@ -273,7 +275,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
<< " for owner# " << Read->Owner
<< " beginBadUserOffet# " << beginBadUserOffset << " endBadUserOffset# " << endBadUserOffset
<< " due to multiple sectors with incorrect hashes/nonces. Marker# BPC003");
- commonBuffer->AddGap(beginBadUserOffset, endBadUserOffset);
+ CumulativeCompletion->AddGap(beginBadUserOffset, endBadUserOffset);
beginBadUserOffset = 0xffffffff;
endBadUserOffset = 0xffffffff;
}
@@ -407,4 +409,3 @@ void TChunkTrimCompletion::Exec(TActorSystem *actorSystem) {
} // NPDisk
} // NKikimr
-
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
index 5563007383..bd2d99dfe3 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
@@ -169,6 +169,7 @@ class TCompletionChunkRead : public TCompletionAction {
TPDisk *PDisk;
TIntrusivePtr<TChunkRead> Read;
TBufferWithGaps CommonBuffer;
+ TMutex CommonBufferMutex; // used to protect CommonBuffer when gaps are being add
TAtomic PartsPending;
TAtomic Deletes;
std::function<void()> OnDestroy;
@@ -206,6 +207,11 @@ public:
return &CommonBuffer;
}
+ void AddGap(ui32 start, ui32 end) {
+ TGuard<TMutex> g(CommonBufferMutex);
+ CommonBuffer.AddGap(start, end);
+ }
+
ui64 GetChunkNonce() {
return ChunkNonce;
}
@@ -228,12 +234,14 @@ class TCompletionChunkReadPart : public TCompletionAction {
ui64 PayloadReadSize;
ui64 CommonBufferOffset;
TCompletionChunkRead *CumulativeCompletion;
+ ui64 ChunkNonce;
+ ui8 *Destination = nullptr;
TBuffer::TPtr Buffer;
bool IsTheLastPart;
NWilson::TSpan Span;
public:
TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, ui64 rawReadSize, ui64 payloadReadSize,
- ui64 commonBufferOffset, TCompletionChunkRead *cumulativeCompletion, bool isTheLastPart,
+ ui64 commonBufferOffset, TCompletionChunkRead *cumulativeCompletion, bool isTheLastPart,
NWilson::TSpan&& span);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h
index b33ae5a9a6..c70a322d66 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h
@@ -156,6 +156,8 @@ struct TPDiskConfig : public TThrRefBase {
NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColorBorder = NKikimrBlobStorage::TPDiskSpaceColor::GREEN;
+ ui32 CompletionThreadsCount = 1;
+
bool MetadataOnly = false;
TPDiskConfig(ui64 pDiskGuid, ui32 pdiskId, ui64 pDiskCategory)
@@ -310,6 +312,7 @@ struct TPDiskConfig : public TThrRefBase {
str << " YellowLogChunksMultiplier# " << YellowLogChunksMultiplier << x;
str << " MaxMetadataMegabytes# " << MaxMetadataMegabytes << x;
str << " SpaceColorBorder# " << SpaceColorBorder << x;
+ str << " CompletionThreadsCount# " << CompletionThreadsCount << x;
str << "}";
return str.Str();
}
@@ -394,8 +397,11 @@ struct TPDiskConfig : public TThrRefBase {
limit = Max<ui32>(13, limit);
ChunkBaseLimit = limit;
}
+
+ if (cfg->HasCompletionThreadsCount()) {
+ CompletionThreadsCount = cfg->GetCompletionThreadsCount();
+ }
}
};
} // NKikimr
-
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp
index 641a76cad4..9b945098e2 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp
@@ -237,7 +237,7 @@ TDriveEstimator::TDriveEstimator(const TString filename)
, ActorSystemCreator(new TActorSystemCreator)
, ActorSystem(ActorSystemCreator->GetActorSystem())
, QueueDepth(4)
- , Device(CreateRealBlockDevice(filename, PDiskMon, 50, 0, QueueDepth, TDeviceMode::LockFile, 128, nullptr, nullptr))
+ , Device(CreateRealBlockDevice(filename, PDiskMon, 50, 0, QueueDepth, TDeviceMode::LockFile, 128, 1, nullptr, nullptr))
, BufferPool(CreateBufferPool(BufferSize, 1, false, {}))
, Buffer(BufferPool->Pop())
{
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
index 1a16e1d47f..7381abe3f1 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
@@ -47,7 +47,7 @@ TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig
, BlockDevice(CreateRealBlockDevice(cfg->GetDevicePath(), Mon,
HPCyclesMs(ReorderingMs), DriveModel.SeekTimeNs(), cfg->DeviceInFlight,
TDeviceMode::LockFile | (cfg->UseSpdkNvmeDriver ? TDeviceMode::UseSpdk : 0),
- cfg->MaxQueuedCompletionActions, cfg->SectorMap, this))
+ cfg->MaxQueuedCompletionActions, cfg->CompletionThreadsCount, cfg->SectorMap, this))
, Cfg(cfg)
, CreationTime(TInstant::Now())
, ExpectedSlotCount(cfg->ExpectedSlotCount)
diff --git a/ydb/core/protos/blobstorage_pdisk_config.proto b/ydb/core/protos/blobstorage_pdisk_config.proto
index 92aee89e09..2c2dedfabc 100644
--- a/ydb/core/protos/blobstorage_pdisk_config.proto
+++ b/ydb/core/protos/blobstorage_pdisk_config.proto
@@ -91,5 +91,6 @@ message TPDiskConfig {
optional uint64 ExpectedSlotCount = 2001; // Number of slots to calculate per-vdisk disk space limit.
optional uint32 ChunkBaseLimit = 2002; // Free chunk permille that triggers Cyan color (e.g. 100 is 10%). Between 130 (default) and 13.
-};
+ optional uint32 CompletionThreadsCount = 2003;
+};