diff options
author | Vlad Kuznetsov <va-kuznecov@ydb.tech> | 2024-10-04 12:49:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-04 12:49:25 +0200 |
commit | e55564f7a0a0ff7b638ce199c1b5005b696d349d (patch) | |
tree | d1a4e4d60f0659be2939510419871fd3f9b406e9 | |
parent | f2805c67d45546a33b9605ea33bb2fde285498c9 (diff) | |
download | ydb-e55564f7a0a0ff7b638ce199c1b5005b696d349d.tar.gz |
Implement configurable number of completion threads (#9715)
10 files changed, 184 insertions, 104 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h index 8faf1d968d..6713f4d98e 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h @@ -64,7 +64,8 @@ class TPDisk; IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles, ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, - ui32 maxQueuedCompletionActions, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk = nullptr); + ui32 maxQueuedCompletionActions, ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, + TPDisk * const pdisk = nullptr); IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags, TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk = nullptr); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp index 7e81e1ebca..3f7e5d782a 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp @@ -44,41 +44,29 @@ class TRealBlockDevice : public IBlockDevice { //////////////////////////////////////////////////////// // TCompletionThread //////////////////////////////////////////////////////// - class TCompletionThread : public TThread { - static constexpr ui32 NumOfWriters = 2; + class TCompletionThread : public ISimpleThread { public: - TCompletionThread(TRealBlockDevice &device, ui32 maxQueuedActions) - : TThread(&ThreadProc, this) - , Device(device) - , QueuedActions(0) - , MaxQueuedActions(maxQueuedActions) + TCompletionThread(TRealBlockDevice &device, TString name) + : Device(device) + , Name(name) {} - static void* ThreadProc(void* _this) { - SetCurrentThreadName("PdCmpl"); - static_cast<TCompletionThread*>(_this)->Exec(); - return nullptr; - } - - void Exec() { - ui32 exitSignalsReceived = 0; - Device.Mon.L7.Set(false, AtomicGetAndIncrement(SeqnoL7)); + void *ThreadProc() override { + SetCurrentThreadName(Name.data()); auto prevCycleEnd = HPNow(); bool isWorking = true; bool stateError = false; + auto cpuCounter = Device.Mon.PDiskGroup->GetCounter(Name + "CPU", true); + while(isWorking) { - TAtomicBase actionCount = CompletionActions.GetWaitingSize(); + TAtomicBase actionCount = Queue.GetWaitingSize(); if (actionCount > 0) { for (TAtomicBase idx = 0; idx < actionCount; ++idx) { - TCompletionAction *action = CompletionActions.Pop(); - AtomicDecrement(QueuedActions); + TCompletionAction *action = Queue.Pop(); if (action == nullptr) { - ++exitSignalsReceived; - if (exitSignalsReceived == NumOfWriters) { - isWorking = false; - } + isWorking = false; } else { if (!stateError && action->CanHandleResult()) { action->Exec(Device.PCtx->ActorSystem); @@ -96,8 +84,8 @@ class TRealBlockDevice : public IBlockDevice { } } } else { - *Device.Mon.CompletionThreadCPU = ThreadCPUTime(); - CompletionActions.ProducedWaitI(); + *cpuCounter = ThreadCPUTime(); + Queue.ProducedWaitI(); } const auto cycleEnd = HPNow(); @@ -106,37 +94,127 @@ class TRealBlockDevice : public IBlockDevice { } prevCycleEnd = cycleEnd; } + return nullptr; + } + + void Schedule(TCompletionAction *action) { + Queue.Push(action); + } + + size_t GetQueuedActions() { + return Queue.GetWaitingSize(); + } + + private: + TCountedQueueManyOne<TCompletionAction, 4 << 10> Queue; + TRealBlockDevice& Device; + TString Name; + }; + + class TCompletionThreads { + + public: + std::vector<std::unique_ptr<TCompletionThread>> Threads; + + TCompletionThreads(TRealBlockDevice &device, size_t threadsCount, ui32 maxQueuedActions) + : Device(device) + , MaxQueuedActions(maxQueuedActions) + { + for (size_t i = 0; i < threadsCount; ++i) { + Threads.push_back(std::make_unique<TCompletionThread>(device, TStringBuilder() << "PdCmpl_" << i)); + Threads.back()->Start(); + } } // Schedule action execution // pass action = nullptr to quit void Schedule(TCompletionAction *action) noexcept { - if (AtomicGet(QueuedActions) >= MaxQueuedActions) { - Device.Mon.L7.Set(true, AtomicGetAndIncrement(SeqnoL7)); - while (AtomicGet(QueuedActions) >= MaxQueuedActions) { - SpinLockPause(); + if (!action) { + StopWork(); + return; + } + + if (!action->ShouldBeExecutedInCompletionThread || Threads.empty()) { + ExecuteActionInPlace(action); + return; + } + + while (true) { + auto min_it = Threads.begin(); + auto minQueueSize = (*min_it)->GetQueuedActions(); + auto totalSize = minQueueSize; + for (auto it = Threads.begin() + 1; it != Threads.end(); ++it) { + auto queueSize = (*it)->GetQueuedActions(); + totalSize += queueSize; + if (queueSize < minQueueSize) { + minQueueSize = queueSize; + min_it = it; + } } - Device.Mon.L7.Set(false, AtomicGetAndIncrement(SeqnoL7)); + if (totalSize >= MaxQueuedActions) { + // We have a risk to run out of buffers from BufferPool, so MaxQueuedActions is expected + // to counter that + Device.Mon.L7.Set(true, AtomicGetAndIncrement(SeqnoL7)); + Sleep(TDuration::MilliSeconds(1)); + Device.Mon.L7.Set(false, AtomicGetAndIncrement(SeqnoL7)); + continue; + } + + Y_ABORT_UNLESS(min_it != Threads.end()); + if (action->CanBeExecutedInAdditionalCompletionThread) { + (*min_it)->Schedule(action); + } else { + // actions which are supposed to be executed from single thread always will be executer in 0 thread + // scheduled here to pass MaxQueuedActions check above + Threads[0]->Schedule(action); + } + + return; } - AtomicIncrement(QueuedActions); - CompletionActions.Push(action); - return; } - // Schedule action execution - // pass action = nullptr to quit + void ExecuteActionInPlace(TCompletionAction *action) { + if (action->CanHandleResult()) { + action->Exec(Device.PCtx->ActorSystem); + } else { + TString errorReason = action->ErrorReason; + + action->Release(Device.PCtx->ActorSystem); + + if (!Device.QuitCounter.IsBlocked()) { + Device.BecomeErrorState(TStringBuilder() + << " CompletionAction error, operation info# " << errorReason); + } + } + } + + // This hack make it possible to execute some completion action beside of device but in ComplitionThread void ScheduleHackForLogReader(TCompletionAction *action) noexcept { - AtomicIncrement(QueuedActions); action->Result = EIoResult::Ok; - CompletionActions.Push(action); + if (Threads.empty()) { + action->Exec(Device.PCtx->ActorSystem); + } else { + Threads[0]->Schedule(action); + } return; } + void StopWork() { + for (auto& thread : Threads) { + thread->Schedule(nullptr); + } + } + + void Join() { + for (auto& thread : Threads) { + thread->Join(); + } + Threads.clear(); + } + private: - TCountedQueueManyOne<TCompletionAction, 4 << 10> CompletionActions; TRealBlockDevice &Device; - TAtomic QueuedActions; - const TAtomicBase MaxQueuedActions; + const size_t MaxQueuedActions; TAtomic SeqnoL7 = 0; }; @@ -368,25 +446,6 @@ class TRealBlockDevice : public IBlockDevice { } } - void ExecuteOrScheduleCompletion(TCompletionAction *action) { - if (action->ShouldBeExecutedInCompletionThread) { - Device.CompletionThread->Schedule(action); - } else { - if (action->CanHandleResult()) { - action->Exec(Device.PCtx->ActorSystem); - } else { - TString errorReason = action->ErrorReason; - - action->Release(Device.PCtx->ActorSystem); - - if (!Device.QuitCounter.IsBlocked()) { - Device.BecomeErrorState(TStringBuilder() - << " CompletionAction error, operation info# " << errorReason); - } - } - } - } - void Exec(TAsyncIoOperationResult *event) { IAsyncIoOperation *op = event->Operation; // Add up the execution time of all the events @@ -446,7 +505,7 @@ class TRealBlockDevice : public IBlockDevice { WaitingNoops[idx % MaxWaitingNoops] = completionAction->FlushAction; completionAction->FlushAction = nullptr; } - ExecuteOrScheduleCompletion(completionAction); + Device.CompletionThreads->Schedule(completionAction); auto seqnoL6 = AtomicGetAndIncrement(Device.Mon.SeqnoL6); Device.Mon.L6.Set(duration > Device.Reordering, seqnoL6); } @@ -464,7 +523,7 @@ class TRealBlockDevice : public IBlockDevice { LWTRACK(PDiskDeviceGetFromWaiting, WaitingNoops[i]->Orbit); double durationMs = HPMilliSecondsFloat(HPNow() - WaitingNoops[i]->GetTime); Device.Mon.DeviceFlushDuration.Increment(durationMs); - ExecuteOrScheduleCompletion(WaitingNoops[i]); + Device.CompletionThreads->Schedule(WaitingNoops[i]); WaitingNoops[i] = nullptr; } ++NextPossibleNoop; @@ -511,7 +570,7 @@ class TRealBlockDevice : public IBlockDevice { } } // Stop the completion thread - Device.CompletionThread->Schedule(nullptr); + Device.CompletionThreads->Schedule(nullptr); } }; @@ -696,7 +755,6 @@ class TRealBlockDevice : public IBlockDevice { for (TAtomicBase idx = 0; idx < actionCount; ++idx) { IAsyncIoOperation *op = TrimOperations.Pop(); if (op == nullptr) { - Device.CompletionThread->Schedule(nullptr); return; } Y_ABORT_UNLESS(op->GetType() == IAsyncIoOperation::EType::PTrim); @@ -721,7 +779,7 @@ class TRealBlockDevice : public IBlockDevice { LWPROBE(PDiskDeviceTrimDuration, Device.GetPDiskId(), duration, op->GetOffset()); } completion->SetResult(EIoResult::Ok); - Device.CompletionThread->Schedule(completion); + completion->Exec(Device.PCtx->ActorSystem); Device.IoContext->DestroyAsyncIoOperation(op); } } else { @@ -744,8 +802,10 @@ protected: TPDiskMon &Mon; TString Path; + TPDiskConfig cfg{0, 0, 0}; + private: - THolder<TCompletionThread> CompletionThread; + THolder<TCompletionThreads> CompletionThreads; THolder<TTrimThread> TrimThread; THolder<TGetThread> GetEventsThread; THolder<TSubmitGetThread> SpdkSubmitGetThread; @@ -758,7 +818,8 @@ private: ui64 Reordering; ui64 SeekCostNs; bool IsTrimEnabled; - ui32 MaxQueuedCompletionActions; + const ui32 MaxQueuedCompletionActions; // for all threads + const ui32 CompletionThreadsCount; TIdleCounter IdleCounter; // Includes reads, writes and trims @@ -782,10 +843,10 @@ private: public: TRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles, ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions, - TIntrusivePtr<TSectorMap> sectorMap) + ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap) : Mon(mon) , Path(path) - , CompletionThread(nullptr) + , CompletionThreads(nullptr) , TrimThread(nullptr) , GetEventsThread(nullptr) , SharedCallback(nullptr) @@ -796,6 +857,7 @@ public: , SeekCostNs(seekCostNs) , IsTrimEnabled(true) , MaxQueuedCompletionActions(maxQueuedCompletionActions) + , CompletionThreadsCount(completionThreadsCount) , IdleCounter(Mon.IdleLight) , Flags(flags) , SectorMap(sectorMap) @@ -857,7 +919,7 @@ protected: } if (IsFileOpened) { IoContext->SetActorSystem(PCtx->ActorSystem); - CompletionThread = MakeHolder<TCompletionThread>(*this, MaxQueuedCompletionActions); + CompletionThreads = MakeHolder<TCompletionThreads>(*this, CompletionThreadsCount, MaxQueuedCompletionActions); TrimThread = MakeHolder<TTrimThread>(*this); SharedCallback = MakeHolder<TSharedCallback>(*this); if (Flags & TDeviceMode::UseSpdk) { @@ -874,7 +936,6 @@ protected: GetEventsThread->Start(); } } - CompletionThread->Start(); TrimThread->Start(); IsInitialized = true; } @@ -1041,7 +1102,7 @@ protected: } completionAction->SetResult(EIoResult::Ok); - CompletionThread->Schedule(completionAction); + CompletionThreads->Schedule(completionAction); } void NoopAsyncHackForLogReader(TCompletionAction *completionAction, TReqId /*reqId*/) override { @@ -1056,7 +1117,7 @@ protected: } completionAction->SetResult(EIoResult::Ok); - CompletionThread->ScheduleHackForLogReader(completionAction); + CompletionThreads->ScheduleHackForLogReader(completionAction); } void TrimAsync(ui32 size, ui64 offset, TCompletionAction *completionAction, TReqId reqId) override { @@ -1127,7 +1188,7 @@ protected: if (res.PrevA ^ res.A) { // res.ToggledA() if (IsInitialized) { Y_ABORT_UNLESS(TrimThread); - Y_ABORT_UNLESS(CompletionThread); + Y_ABORT_UNLESS(CompletionThreads); TrimThread->Schedule(nullptr); // Stop the Trim thread if (Flags & TDeviceMode::UseSpdk) { Y_ABORT_UNLESS(SpdkSubmitGetThread); @@ -1145,13 +1206,13 @@ protected: } SharedCallback->Destroy(); TrimThread->Join(); - CompletionThread->Join(); + CompletionThreads->Join(); IsInitialized = false; } else { Y_ABORT_UNLESS(SubmitThread.Get() == nullptr); Y_ABORT_UNLESS(GetEventsThread.Get() == nullptr); Y_ABORT_UNLESS(TrimThread.Get() == nullptr); - Y_ABORT_UNLESS(CompletionThread.Get() == nullptr); + Y_ABORT_UNLESS(CompletionThreads.Get() == nullptr); } if (IsFileOpened) { EIoResult ret = IoContext->Destroy(); @@ -1287,9 +1348,9 @@ class TCachedBlockDevice : public TRealBlockDevice { public: TCachedBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles, ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions, - TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk) + ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk) : TRealBlockDevice(path, mon, reorderingCycles, seekCostNs, deviceInFlight, flags, - maxQueuedCompletionActions, sectorMap) + maxQueuedCompletionActions, completionThreadsCount, sectorMap) , ReadsInFly(0) , PDisk(pdisk) {} @@ -1425,14 +1486,14 @@ public: IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles, ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions, - TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk) { + ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk) { return new TCachedBlockDevice(path, mon, reorderingCycles, seekCostNs, deviceInFlight, flags, - maxQueuedCompletionActions, sectorMap, pdisk); + maxQueuedCompletionActions, completionThreadsCount, sectorMap, pdisk); } IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags, TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk) { - IBlockDevice *device = CreateRealBlockDevice(path, mon, 0, 0, 4, flags, 8, sectorMap, pdisk); + IBlockDevice *device = CreateRealBlockDevice(path, mon, 0, 0, 4, flags, 8, 1, sectorMap, pdisk); device->Initialize(std::make_shared<TPDiskCtx>(actorSystem)); return device; } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp index 9068feaa67..c309f90d26 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp @@ -288,11 +288,12 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) { THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(buffSize, bufferPoolSize, false, {})); ui64 inFlight = 128; ui32 maxQueuedCompletionActions = bufferPoolSize / 2; + ui32 completionThreadsCount = 1; ui64 diskSize = 32_GB; TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap(diskSize, NSectorMap::DM_NONE); THolder<NPDisk::IBlockDevice> device(CreateRealBlockDevice("", *mon, 0, 0, inFlight, TDeviceMode::None, - maxQueuedCompletionActions, sectorMap)); + maxQueuedCompletionActions, completionThreadsCount, sectorMap)); device->Initialize(std::make_shared<TPDiskCtx>(creator.GetActorSystem())); TAtomic counter = 0; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h index 951dd03390..31dbc6681e 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h @@ -23,6 +23,7 @@ struct TCompletionAction { // to BlockDevice from Exec() and it's more safe to use WhiteList to allow only // LogWrite and ChunkWrite to be executed from GetThread bool ShouldBeExecutedInCompletionThread = true; + bool CanBeExecutedInAdditionalCompletionThread = false; mutable NLWTrace::TOrbit Orbit; protected: diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp index b0a4514d00..4b32f513f9 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp @@ -128,10 +128,16 @@ TCompletionChunkReadPart::TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr< , PayloadReadSize(payloadReadSize) , CommonBufferOffset(commonBufferOffset) , CumulativeCompletion(cumulativeCompletion) + , ChunkNonce(CumulativeCompletion->GetChunkNonce()) , Buffer(PDisk->BufferPool->Pop()) , IsTheLastPart(isTheLastPart) , Span(std::move(span)) { + TCompletionAction::CanBeExecutedInAdditionalCompletionThread = true; + + TBufferWithGaps *commonBuffer = CumulativeCompletion->GetCommonBuffer(); + Destination = commonBuffer->RawDataPtr(CommonBufferOffset, PayloadReadSize); + if (!IsTheLastPart) { CumulativeCompletion->AddPart(); } @@ -166,8 +172,6 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) { Read->Offset + CommonBufferOffset, PayloadReadSize, firstSector, lastSector, sectorOffset); Y_ABORT_UNLESS(isOk); - TBufferWithGaps *commonBuffer = CumulativeCompletion->GetCommonBuffer(); - ui8 *destination = commonBuffer->RawDataPtr(CommonBufferOffset, PayloadReadSize); ui8* source = Buffer->Data(); @@ -183,8 +187,6 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) { sectorOffset = 0; } - ui64 chunkNonce = CumulativeCompletion->GetChunkNonce(); - ui32 beginBadUserOffset = 0xffffffff; ui32 endBadUserOffset = 0xffffffff; ui32 userSectorSize = format.SectorPayloadSize(); @@ -193,7 +195,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) { TSectorRestorator restorator(false, 1, false, format, PDisk->PCtx.get(), &PDisk->Mon, PDisk->BufferPool.Get()); - ui64 lastNonce = Min((ui64)0, chunkNonce - 1); + ui64 lastNonce = Min((ui64)0, ChunkNonce - 1); restorator.Restore(source, format.Offset(Read->ChunkIdx, sectorIdx), format.MagicDataChunk, lastNonce, Read->Owner); @@ -211,7 +213,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) { << " for owner# " << Read->Owner << " beginBadUserOffet# " << beginBadUserOffset << " endBadUserOffset# " << endBadUserOffset << " due to multiple sectors with incorrect hashes. Marker# BPC001"); - commonBuffer->AddGap(beginBadUserOffset, endBadUserOffset); + CumulativeCompletion->AddGap(beginBadUserOffset, endBadUserOffset); beginBadUserOffset = 0xffffffff; endBadUserOffset = 0xffffffff; } @@ -221,16 +223,16 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) { // Decrypt data if (beginBadUserOffset != 0xffffffff) { - memset(destination, 0, sectorPayloadSize); + memset(Destination, 0, sectorPayloadSize); } else { TDataSectorFooter *footer = (TDataSectorFooter*) (source + format.SectorSize - sizeof(TDataSectorFooter)); - if (footer->Nonce != chunkNonce + sectorIdx) { + if (footer->Nonce != ChunkNonce + sectorIdx) { ui32 userOffset = sectorIdx * userSectorSize; LOG_INFO_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId << " Can't read chunk chunkIdx# " << Read->ChunkIdx << " for owner# " << Read->Owner - << " nonce mismatch: expected# " << (ui64)(chunkNonce + sectorIdx) + << " nonce mismatch: expected# " << (ui64)(ChunkNonce + sectorIdx) << ", on-disk# " << (ui64)footer->Nonce << " for userOffset# " << userOffset << " ! Marker# BPC002"); @@ -238,18 +240,18 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) { beginBadUserOffset = userOffset; } endBadUserOffset = beginUserOffset + userSectorSize; - memset(destination, 0, sectorPayloadSize); + memset(Destination, 0, sectorPayloadSize); } else { cypher.StartMessage(footer->Nonce); - if (sectorOffset > 0 || intptr_t(destination) % 32) { + if (sectorOffset > 0 || intptr_t(Destination) % 32) { cypher.InplaceEncrypt(source, sectorOffset + sectorPayloadSize); if (CommonBufferOffset == 0 || !IsTheLastPart) { - memcpy(destination, source + sectorOffset, sectorPayloadSize); + memcpy(Destination, source + sectorOffset, sectorPayloadSize); } else { - memcpy(destination, source, sectorPayloadSize); + memcpy(Destination, source, sectorPayloadSize); } } else { - cypher.Encrypt(destination, source, sectorPayloadSize); + cypher.Encrypt(Destination, source, sectorPayloadSize); } if (CanarySize > 0) { ui32 canaryPosition = sectorOffset + sectorPayloadSize; @@ -259,7 +261,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) { } } } - destination += sectorPayloadSize; + Destination += sectorPayloadSize; source += format.SectorSize; PayloadReadSize -= sectorPayloadSize; sectorPayloadSize = Min(format.SectorPayloadSize(), PayloadReadSize); @@ -273,7 +275,7 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) { << " for owner# " << Read->Owner << " beginBadUserOffet# " << beginBadUserOffset << " endBadUserOffset# " << endBadUserOffset << " due to multiple sectors with incorrect hashes/nonces. Marker# BPC003"); - commonBuffer->AddGap(beginBadUserOffset, endBadUserOffset); + CumulativeCompletion->AddGap(beginBadUserOffset, endBadUserOffset); beginBadUserOffset = 0xffffffff; endBadUserOffset = 0xffffffff; } @@ -407,4 +409,3 @@ void TChunkTrimCompletion::Exec(TActorSystem *actorSystem) { } // NPDisk } // NKikimr - diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h index 5563007383..bd2d99dfe3 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h @@ -169,6 +169,7 @@ class TCompletionChunkRead : public TCompletionAction { TPDisk *PDisk; TIntrusivePtr<TChunkRead> Read; TBufferWithGaps CommonBuffer; + TMutex CommonBufferMutex; // used to protect CommonBuffer when gaps are being add TAtomic PartsPending; TAtomic Deletes; std::function<void()> OnDestroy; @@ -206,6 +207,11 @@ public: return &CommonBuffer; } + void AddGap(ui32 start, ui32 end) { + TGuard<TMutex> g(CommonBufferMutex); + CommonBuffer.AddGap(start, end); + } + ui64 GetChunkNonce() { return ChunkNonce; } @@ -228,12 +234,14 @@ class TCompletionChunkReadPart : public TCompletionAction { ui64 PayloadReadSize; ui64 CommonBufferOffset; TCompletionChunkRead *CumulativeCompletion; + ui64 ChunkNonce; + ui8 *Destination = nullptr; TBuffer::TPtr Buffer; bool IsTheLastPart; NWilson::TSpan Span; public: TCompletionChunkReadPart(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, ui64 rawReadSize, ui64 payloadReadSize, - ui64 commonBufferOffset, TCompletionChunkRead *cumulativeCompletion, bool isTheLastPart, + ui64 commonBufferOffset, TCompletionChunkRead *cumulativeCompletion, bool isTheLastPart, NWilson::TSpan&& span); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h index b33ae5a9a6..c70a322d66 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h @@ -156,6 +156,8 @@ struct TPDiskConfig : public TThrRefBase { NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColorBorder = NKikimrBlobStorage::TPDiskSpaceColor::GREEN; + ui32 CompletionThreadsCount = 1; + bool MetadataOnly = false; TPDiskConfig(ui64 pDiskGuid, ui32 pdiskId, ui64 pDiskCategory) @@ -310,6 +312,7 @@ struct TPDiskConfig : public TThrRefBase { str << " YellowLogChunksMultiplier# " << YellowLogChunksMultiplier << x; str << " MaxMetadataMegabytes# " << MaxMetadataMegabytes << x; str << " SpaceColorBorder# " << SpaceColorBorder << x; + str << " CompletionThreadsCount# " << CompletionThreadsCount << x; str << "}"; return str.Str(); } @@ -394,8 +397,11 @@ struct TPDiskConfig : public TThrRefBase { limit = Max<ui32>(13, limit); ChunkBaseLimit = limit; } + + if (cfg->HasCompletionThreadsCount()) { + CompletionThreadsCount = cfg->GetCompletionThreadsCount(); + } } }; } // NKikimr - diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp index 641a76cad4..9b945098e2 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_driveestimator.cpp @@ -237,7 +237,7 @@ TDriveEstimator::TDriveEstimator(const TString filename) , ActorSystemCreator(new TActorSystemCreator) , ActorSystem(ActorSystemCreator->GetActorSystem()) , QueueDepth(4) - , Device(CreateRealBlockDevice(filename, PDiskMon, 50, 0, QueueDepth, TDeviceMode::LockFile, 128, nullptr, nullptr)) + , Device(CreateRealBlockDevice(filename, PDiskMon, 50, 0, QueueDepth, TDeviceMode::LockFile, 128, 1, nullptr, nullptr)) , BufferPool(CreateBufferPool(BufferSize, 1, false, {})) , Buffer(BufferPool->Pop()) { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 1a16e1d47f..7381abe3f1 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -47,7 +47,7 @@ TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig , BlockDevice(CreateRealBlockDevice(cfg->GetDevicePath(), Mon, HPCyclesMs(ReorderingMs), DriveModel.SeekTimeNs(), cfg->DeviceInFlight, TDeviceMode::LockFile | (cfg->UseSpdkNvmeDriver ? TDeviceMode::UseSpdk : 0), - cfg->MaxQueuedCompletionActions, cfg->SectorMap, this)) + cfg->MaxQueuedCompletionActions, cfg->CompletionThreadsCount, cfg->SectorMap, this)) , Cfg(cfg) , CreationTime(TInstant::Now()) , ExpectedSlotCount(cfg->ExpectedSlotCount) diff --git a/ydb/core/protos/blobstorage_pdisk_config.proto b/ydb/core/protos/blobstorage_pdisk_config.proto index 92aee89e09..2c2dedfabc 100644 --- a/ydb/core/protos/blobstorage_pdisk_config.proto +++ b/ydb/core/protos/blobstorage_pdisk_config.proto @@ -91,5 +91,6 @@ message TPDiskConfig { optional uint64 ExpectedSlotCount = 2001; // Number of slots to calculate per-vdisk disk space limit. optional uint32 ChunkBaseLimit = 2002; // Free chunk permille that triggers Cyan color (e.g. 100 is 10%). Between 130 (default) and 13. -}; + optional uint32 CompletionThreadsCount = 2003; +}; |