diff options
author | Vlad Kuznetsov <va-kuznecov@ydb.tech> | 2024-09-23 15:50:07 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-23 15:50:07 +0200 |
commit | 9918d3c8872725654382e964dd58dce272022195 (patch) | |
tree | 7344911d0d1d5faec4ca0825c52c65512750a704 | |
parent | a3bbcb929783d1eb27f8015aeb112e340e0315bf (diff) | |
download | ydb-9918d3c8872725654382e964dd58dce272022195.tar.gz |
Refactor chunk reads and writes (#8893)
Co-authored-by: Vlad Kuznecov <va-kuznecov@nebius.com>
3 files changed, 76 insertions, 115 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp index 75d4453337..2731863137 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp @@ -438,6 +438,8 @@ class TRealBlockDevice : public IBlockDevice { Device.Mon.DeviceWriteDuration.Increment(duration); LWPROBE(PDiskDeviceWriteDuration, Device.GetPDiskId(), duration, opSize); } + P_LOG(PRI_TRACE, BPD01, "iop is done", (Type, op->GetType()), (Duration, duration), + (Offset, op->GetOffset()), (Size, opSize)); if (completionAction->FlushAction) { ui64 idx = completionAction->FlushAction->OperationIdx; Y_ABORT_UNLESS(WaitingNoops[idx % MaxWaitingNoops] == nullptr); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 7bf8b65e05..2595a4fdc3 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -60,8 +60,8 @@ TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig 0, 64 * 1024 * 1024); ForsetiMaxLogBatchNs = TControlWrapper((PDiskCategory.IsSolidState() ? 50'000ll : 500'000ll), 0, 100'000'000ll); ForsetiMaxLogBatchNsCached = ForsetiMaxLogBatchNs; - ForsetiOpPieceSizeSsd = TControlWrapper(64 * 1024, 1, 64 * 1024 * 1024); - ForsetiOpPieceSizeRot = TControlWrapper(2 * 1024 * 1024, 1, 64 * 1024 * 1024); + ForsetiOpPieceSizeSsd = TControlWrapper(64 * 1024, 1, 512 * 1024); + ForsetiOpPieceSizeRot = TControlWrapper(512 * 1024, 1, 512 * 1024); ForsetiOpPieceSizeCached = PDiskCategory.IsSolidState() ? ForsetiOpPieceSizeSsd : ForsetiOpPieceSizeRot; if (Cfg->SectorMap) { @@ -809,10 +809,8 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi << " evChunkWrite->TotalSize# " << evChunkWrite->TotalSize); ui32 chunkIdx = evChunkWrite->ChunkIdx; - Y_ABORT_UNLESS(chunkIdx != 0); - ui64 desiredSectorIdx = 0; ui64 sectorOffset = 0; ui64 lastSectorIdx; @@ -937,7 +935,7 @@ void TPDisk::SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringSt } TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector, - ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit) { + ui64 pieceSizeLimit, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit) { if (read->IsReplied) { return ReadPieceResultOk; } @@ -950,13 +948,12 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> & sectorsToRead = pieceSizeLimit / Format.SectorSize; bytesToRead = sectorsToRead * Format.SectorSize; } + Y_VERIFY_S(bytesToRead == pieceSizeLimit, bytesToRead << " " << pieceSizeLimit); + Y_VERIFY_S(sectorsToRead == pieceSizeLimit / Format.SectorSize, sectorsToRead << " " << pieceSizeLimit); + Y_VERIFY_S(pieceSizeLimit % Format.SectorSize == 0, pieceSizeLimit); Y_ABORT_UNLESS(sectorsToRead); - if (reallyReadDiskBytes) { - *reallyReadDiskBytes = bytesToRead; - } - ui64 firstSector; ui64 lastSector; ui64 sectorOffset; @@ -2229,9 +2226,14 @@ void TPDisk::ProcessChunkWriteQueue() { case ERequestType::RequestChunkWritePiece: { TChunkWritePiece *piece = static_cast<TChunkWritePiece*>(req); - if (ChunkWritePiece(piece->ChunkWrite.Get(), piece->PieceShift, piece->PieceSize)) { - Mon.IncrementQueueTime(piece->ChunkWrite->PriorityClass, - piece->ChunkWrite->LifeDurationMs(now)); + P_LOG(PRI_NOTICE, BPD01, "ChunkWritePiece", + (ChunkIdx, piece->ChunkWrite->ChunkIdx), + (Offset, piece->PieceShift), + (Size, piece->PieceSize) + ); + bool lastPart = ChunkWritePiece(piece->ChunkWrite.Get(), piece->PieceShift, piece->PieceSize); + if (lastPart) { + Mon.IncrementQueueTime(piece->ChunkWrite->PriorityClass, piece->ChunkWrite->LifeDurationMs(now)); } delete piece; break; @@ -2250,49 +2252,38 @@ void TPDisk::ProcessChunkReadQueue() { ui64 bufferSize = BufferPool->GetBufferSize() / Format.SectorSize * Format.SectorSize; for (auto& req : JointChunkReads) { - req->SpanStack.PopOk(); req->SpanStack.Push(TWilson::PDiskDetailed, "PDisk.InBlockDevice", NWilson::EFlags::AUTO_END); - switch (req->GetType()) { - case ERequestType::RequestChunkReadPiece: - { - TChunkReadPiece *piece = static_cast<TChunkReadPiece*>(req.Get()); - Y_ABORT_UNLESS(!piece->SelfPointer); - TIntrusivePtr<TChunkRead> &read = piece->ChunkRead; - TReqId reqId = read->ReqId; - ui32 chunkIdx = read->ChunkIdx; - bool isComplete = false; - ui8 priorityClass = read->PriorityClass; - NHPTimer::STime creationTime = read->CreationTime; - if (!read->IsReplied) { - P_LOG(PRI_DEBUG, BPD36, "Performing TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx)); - - ui32 size = 0; - while (!isComplete && size < piece->PieceSizeLimit) { - ui64 currentLimit = Min(bufferSize, piece->PieceSizeLimit - size); - ui64 reallyReadDiskBytes; - EChunkReadPieceResult result = ChunkReadPiece(read, piece->PieceCurrentSector + size / Format.SectorSize, - currentLimit, &reallyReadDiskBytes, piece->SpanStack.GetTraceId(), std::move(piece->Orbit)); - isComplete = (result != ReadPieceResultInProgress); - // Read pieces is sliced previously and it is expected that ChunkReadPiece will read exactly - // currentLimit bytes - Y_VERIFY_S(reallyReadDiskBytes == currentLimit, reallyReadDiskBytes << " != " << currentLimit); - size += currentLimit; - } - } - piece->OnSuccessfulDestroy(PCtx->ActorSystem); - if (isComplete) { - // - // WARNING: Don't access "read" after this point. - // Don't add code before the warning! - // - Mon.IncrementQueueTime(priorityClass, HPMilliSeconds(now - creationTime)); - P_LOG(PRI_DEBUG, BPD37, "enqueued all TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx)); - } - break; - } - default: - Y_FAIL_S("Unexpected request type# " << ui64(req->GetType()) << " in JointChunkReads"); + + Y_VERIFY_S(req->GetType() == ERequestType::RequestChunkReadPiece, "Unexpected request type# " << ui64(req->GetType()) << " in JointChunkReads"); + TChunkReadPiece *piece = static_cast<TChunkReadPiece*>(req.Get()); + Y_ABORT_UNLESS(!piece->SelfPointer); + TIntrusivePtr<TChunkRead> &read = piece->ChunkRead; + TReqId reqId = read->ReqId; + ui32 chunkIdx = read->ChunkIdx; + ui8 priorityClass = read->PriorityClass; + NHPTimer::STime creationTime = read->CreationTime; + Y_VERIFY(!read->IsReplied); + P_LOG(PRI_NOTICE, BPD36, "Performing TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx), + (PieceCurrentSector, piece->PieceCurrentSector), + (PieceSizeLimit, piece->PieceSizeLimit), + (IsTheLastPiece, piece->IsTheLastPiece), + (BufferSize, bufferSize) + ); + + ui64 currentLimit = Min(bufferSize, piece->PieceSizeLimit); + EChunkReadPieceResult result = ChunkReadPiece(read, piece->PieceCurrentSector, currentLimit, + piece->SpanStack.GetTraceId(), std::move(piece->Orbit)); + bool isComplete = (result != ReadPieceResultInProgress); + Y_VERIFY_S(isComplete || currentLimit >= piece->PieceSizeLimit, isComplete << " " << currentLimit << " " << piece->PieceSizeLimit); + piece->OnSuccessfulDestroy(PCtx->ActorSystem); + if (isComplete) { + // + // WARNING: Don't access "read" after this point. + // Don't add code before the warning! + // + Mon.IncrementQueueTime(priorityClass, HPMilliSeconds(now - creationTime)); + P_LOG(PRI_NOTICE, BPD37, "enqueued all TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx)); } } LWTRACK(PDiskProcessChunkReadQueue, UpdateCycleOrbit, PCtx->PDiskId, JointChunkReads.size()); @@ -3150,65 +3141,46 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) { if (!isAdded) { if (request->GetType() == ERequestType::RequestChunkWrite) { TIntrusivePtr<TChunkWrite> whole(static_cast<TChunkWrite*>(request)); - ui32 smallJobSize = 0; - ui32 smallJobCount = 0; - ui32 largeJobSize = 0; - SplitChunkJobSize(whole->TotalSize, &smallJobSize, &largeJobSize, &smallJobCount); - for (ui32 idx = 0; idx < smallJobCount; ++idx) { - // Schedule small job. + + const ui32 jobSizeLimit = ui64(ForsetiOpPieceSizeCached) * Format.SectorPayloadSize() / Format.SectorSize; + const ui32 jobCount = (whole->TotalSize + jobSizeLimit - 1) / jobSizeLimit; + + ui32 remainingSize = whole->TotalSize; + for (ui32 idx = 0; idx < jobCount; ++idx) { auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END); span.Attribute("small_job_idx", idx) - .Attribute("is_last_piece", false); - TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * smallJobSize, smallJobSize, std::move(span)); + .Attribute("is_last_piece", idx == jobCount - 1); + ui32 jobSize = Min(remainingSize, jobSizeLimit); + TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * jobSizeLimit, jobSize, std::move(span)); piece->EstimateCost(DriveModel); AddJobToForseti(cbs, piece, request->JobKind); + remainingSize -= jobSize; } - // Schedule large job (there always is one) - auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END); - span.Attribute("is_last_piece", true); - TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize, std::move(span)); - piece->EstimateCost(DriveModel); - AddJobToForseti(cbs, piece, request->JobKind); - LWTRACK(PDiskChunkWriteAddToScheduler, request->Orbit, PCtx->PDiskId, request->ReqId.Id, - HPSecondsFloat(HPNow() - request->CreationTime), request->Owner, request->IsFast, - request->PriorityClass, whole->TotalSize); + Y_VERIFY_S(remainingSize == 0, remainingSize); } else if (request->GetType() == ERequestType::RequestChunkRead) { TIntrusivePtr<TChunkRead> read = std::move(static_cast<TChunkRead*>(request)->SelfPointer); ui32 totalSectors = read->LastSector - read->FirstSector + 1; - ui32 smallJobSize = (ForsetiOpPieceSizeCached + Format.SectorSize - 1) / Format.SectorSize; - ui32 smallJobCount = totalSectors / smallJobSize; - if (smallJobCount) { - smallJobCount--; - } - ui32 largeJobSize = totalSectors - smallJobSize * smallJobCount; - - for (ui32 idx = 0; idx < smallJobCount; ++idx) { + Y_DEBUG_ABORT_UNLESS(ForsetiOpPieceSizeCached % Format.SectorSize == 0); + const ui32 jobSizeLimit = ForsetiOpPieceSizeCached / Format.SectorSize; + const ui32 jobCount = (totalSectors + jobSizeLimit - 1) / jobSizeLimit; + for (ui32 idx = 0; idx < jobCount; ++idx) { auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkReadPiece", NWilson::EFlags::AUTO_END); + bool isLast = idx == jobCount - 1; span.Attribute("small_job_idx", idx) - .Attribute("is_last_piece", false); - // Schedule small job. - auto piece = new TChunkReadPiece(read, idx * smallJobSize, - smallJobSize * Format.SectorSize, false, std::move(span)); + .Attribute("is_last_piece", isLast); + + ui32 jobSize = Min(totalSectors, jobSizeLimit); + auto piece = new TChunkReadPiece(read, idx * jobSizeLimit, jobSize * Format.SectorSize, isLast, std::move(span)); read->Orbit.Fork(piece->Orbit); - LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, idx, idx * smallJobSize * Format.SectorSize, - smallJobSize * Format.SectorSize); + LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, idx, idx * jobSizeLimit * Format.SectorSize, + jobSizeLimit * Format.SectorSize); piece->EstimateCost(DriveModel); piece->SelfPointer = piece; AddJobToForseti(cbs, piece, request->JobKind); + totalSectors -= jobSize; } - // Schedule large job (there always is one) - auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkReadPiece"); - span.Attribute("is_last_piece", true); - auto piece = new TChunkReadPiece(read, smallJobCount * smallJobSize, - largeJobSize * Format.SectorSize, true, std::move(span)); - read->Orbit.Fork(piece->Orbit); - LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, smallJobCount, - smallJobCount * smallJobSize * Format.SectorSize, largeJobSize * Format.SectorSize); - piece->EstimateCost(DriveModel); - piece->SelfPointer = piece; - AddJobToForseti(cbs, piece, request->JobKind); - + Y_VERIFY_S(totalSectors == 0, totalSectors); } else { AddJobToForseti(cbs, request, request->JobKind); } @@ -3221,17 +3193,6 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) { } } -// Always produces a large job and sometimes produces some small jobs and a large job. -void TPDisk::SplitChunkJobSize(ui32 totalSize, ui32 *outSmallJobSize, ui32 *outLargeJobSize, ui32 *outSmallJobCount) { - const ui64 sectorPayloadSize = Format.SectorPayloadSize(); - *outSmallJobSize = (ForsetiOpPieceSizeCached + sectorPayloadSize - 1) / sectorPayloadSize * sectorPayloadSize; - *outSmallJobCount = totalSize / *outSmallJobSize; - if (*outSmallJobCount) { - (*outSmallJobCount)--; - } - *outLargeJobSize = totalSize - *outSmallJobSize * *outSmallJobCount; -} - void TPDisk::AddJobToForseti(NSchLab::TCbs *cbs, TRequestBase *request, NSchLab::EJobKind jobKind) { LWTRACK(PDiskAddToScheduler, request->Orbit, PCtx->PDiskId, request->ReqId.Id, HPSecondsFloat(request->CreationTime), request->Owner, request->IsFast, request->PriorityClass); @@ -3458,18 +3419,17 @@ void TPDisk::Update() { Mon.UpdateDurationTracker.UpdateStarted(); LWTRACK(PDiskUpdateStarted, UpdateCycleOrbit, PCtx->PDiskId); - // ui32 userSectorSize = 0; - - // Make input queue empty { TGuard<TMutex> guard(StateMutex); ForsetiMaxLogBatchNsCached = ForsetiMaxLogBatchNs; ForsetiOpPieceSizeCached = PDiskCategory.IsSolidState() ? ForsetiOpPieceSizeSsd : ForsetiOpPieceSizeRot; - EnqueueAll(); - /*userSectorSize = */Format.SectorPayloadSize(); - + ForsetiOpPieceSizeCached = Min<i64>(ForsetiOpPieceSizeCached, Cfg->BufferPoolBufferSizeBytes); + ForsetiOpPieceSizeCached = AlignDown<i64>(ForsetiOpPieceSizeCached, Format.SectorSize); // Switch the scheduler when possible ForsetiScheduler.SetIsBinLogEnabled(EnableForsetiBinLog); + + // Make input queue empty + EnqueueAll(); } // Make token injection to correct drive model underestimations and avoid disk underutilization diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index 3f789bae54..e571605489 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -294,8 +294,7 @@ public: void SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringStream& errorReason, NKikimrProto::EReplyStatus status); EChunkReadPieceResult ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector, ui64 pieceSizeLimit, - ui64 *reallyReadBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit); - void SplitChunkJobSize(ui32 totalSize, ui32 *outSmallJobSize, ui32 *outLargeJObSize, ui32 *outSmallJobCount); + NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Chunk locking TVector<TChunkIdx> LockChunksForOwner(TOwner owner, const ui32 count, TString &errorReason); |