aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVlad Kuznetsov <va-kuznecov@ydb.tech>2024-09-23 15:50:07 +0200
committerGitHub <noreply@github.com>2024-09-23 15:50:07 +0200
commit9918d3c8872725654382e964dd58dce272022195 (patch)
tree7344911d0d1d5faec4ca0825c52c65512750a704
parenta3bbcb929783d1eb27f8015aeb112e340e0315bf (diff)
downloadydb-9918d3c8872725654382e964dd58dce272022195.tar.gz
Refactor chunk reads and writes (#8893)
Co-authored-by: Vlad Kuznecov <va-kuznecov@nebius.com>
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp186
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h3
3 files changed, 76 insertions, 115 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
index 75d4453337..2731863137 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
@@ -438,6 +438,8 @@ class TRealBlockDevice : public IBlockDevice {
Device.Mon.DeviceWriteDuration.Increment(duration);
LWPROBE(PDiskDeviceWriteDuration, Device.GetPDiskId(), duration, opSize);
}
+ P_LOG(PRI_TRACE, BPD01, "iop is done", (Type, op->GetType()), (Duration, duration),
+ (Offset, op->GetOffset()), (Size, opSize));
if (completionAction->FlushAction) {
ui64 idx = completionAction->FlushAction->OperationIdx;
Y_ABORT_UNLESS(WaitingNoops[idx % MaxWaitingNoops] == nullptr);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
index 7bf8b65e05..2595a4fdc3 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
@@ -60,8 +60,8 @@ TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig
0, 64 * 1024 * 1024);
ForsetiMaxLogBatchNs = TControlWrapper((PDiskCategory.IsSolidState() ? 50'000ll : 500'000ll), 0, 100'000'000ll);
ForsetiMaxLogBatchNsCached = ForsetiMaxLogBatchNs;
- ForsetiOpPieceSizeSsd = TControlWrapper(64 * 1024, 1, 64 * 1024 * 1024);
- ForsetiOpPieceSizeRot = TControlWrapper(2 * 1024 * 1024, 1, 64 * 1024 * 1024);
+ ForsetiOpPieceSizeSsd = TControlWrapper(64 * 1024, 1, 512 * 1024);
+ ForsetiOpPieceSizeRot = TControlWrapper(512 * 1024, 1, 512 * 1024);
ForsetiOpPieceSizeCached = PDiskCategory.IsSolidState() ? ForsetiOpPieceSizeSsd : ForsetiOpPieceSizeRot;
if (Cfg->SectorMap) {
@@ -809,10 +809,8 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
<< " evChunkWrite->TotalSize# " << evChunkWrite->TotalSize);
ui32 chunkIdx = evChunkWrite->ChunkIdx;
-
Y_ABORT_UNLESS(chunkIdx != 0);
-
ui64 desiredSectorIdx = 0;
ui64 sectorOffset = 0;
ui64 lastSectorIdx;
@@ -937,7 +935,7 @@ void TPDisk::SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringSt
}
TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector,
- ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit) {
+ ui64 pieceSizeLimit, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit) {
if (read->IsReplied) {
return ReadPieceResultOk;
}
@@ -950,13 +948,12 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &
sectorsToRead = pieceSizeLimit / Format.SectorSize;
bytesToRead = sectorsToRead * Format.SectorSize;
}
+ Y_VERIFY_S(bytesToRead == pieceSizeLimit, bytesToRead << " " << pieceSizeLimit);
+ Y_VERIFY_S(sectorsToRead == pieceSizeLimit / Format.SectorSize, sectorsToRead << " " << pieceSizeLimit);
+ Y_VERIFY_S(pieceSizeLimit % Format.SectorSize == 0, pieceSizeLimit);
Y_ABORT_UNLESS(sectorsToRead);
- if (reallyReadDiskBytes) {
- *reallyReadDiskBytes = bytesToRead;
- }
-
ui64 firstSector;
ui64 lastSector;
ui64 sectorOffset;
@@ -2229,9 +2226,14 @@ void TPDisk::ProcessChunkWriteQueue() {
case ERequestType::RequestChunkWritePiece:
{
TChunkWritePiece *piece = static_cast<TChunkWritePiece*>(req);
- if (ChunkWritePiece(piece->ChunkWrite.Get(), piece->PieceShift, piece->PieceSize)) {
- Mon.IncrementQueueTime(piece->ChunkWrite->PriorityClass,
- piece->ChunkWrite->LifeDurationMs(now));
+ P_LOG(PRI_NOTICE, BPD01, "ChunkWritePiece",
+ (ChunkIdx, piece->ChunkWrite->ChunkIdx),
+ (Offset, piece->PieceShift),
+ (Size, piece->PieceSize)
+ );
+ bool lastPart = ChunkWritePiece(piece->ChunkWrite.Get(), piece->PieceShift, piece->PieceSize);
+ if (lastPart) {
+ Mon.IncrementQueueTime(piece->ChunkWrite->PriorityClass, piece->ChunkWrite->LifeDurationMs(now));
}
delete piece;
break;
@@ -2250,49 +2252,38 @@ void TPDisk::ProcessChunkReadQueue() {
ui64 bufferSize = BufferPool->GetBufferSize() / Format.SectorSize * Format.SectorSize;
for (auto& req : JointChunkReads) {
-
req->SpanStack.PopOk();
req->SpanStack.Push(TWilson::PDiskDetailed, "PDisk.InBlockDevice", NWilson::EFlags::AUTO_END);
- switch (req->GetType()) {
- case ERequestType::RequestChunkReadPiece:
- {
- TChunkReadPiece *piece = static_cast<TChunkReadPiece*>(req.Get());
- Y_ABORT_UNLESS(!piece->SelfPointer);
- TIntrusivePtr<TChunkRead> &read = piece->ChunkRead;
- TReqId reqId = read->ReqId;
- ui32 chunkIdx = read->ChunkIdx;
- bool isComplete = false;
- ui8 priorityClass = read->PriorityClass;
- NHPTimer::STime creationTime = read->CreationTime;
- if (!read->IsReplied) {
- P_LOG(PRI_DEBUG, BPD36, "Performing TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx));
-
- ui32 size = 0;
- while (!isComplete && size < piece->PieceSizeLimit) {
- ui64 currentLimit = Min(bufferSize, piece->PieceSizeLimit - size);
- ui64 reallyReadDiskBytes;
- EChunkReadPieceResult result = ChunkReadPiece(read, piece->PieceCurrentSector + size / Format.SectorSize,
- currentLimit, &reallyReadDiskBytes, piece->SpanStack.GetTraceId(), std::move(piece->Orbit));
- isComplete = (result != ReadPieceResultInProgress);
- // Read pieces is sliced previously and it is expected that ChunkReadPiece will read exactly
- // currentLimit bytes
- Y_VERIFY_S(reallyReadDiskBytes == currentLimit, reallyReadDiskBytes << " != " << currentLimit);
- size += currentLimit;
- }
- }
- piece->OnSuccessfulDestroy(PCtx->ActorSystem);
- if (isComplete) {
- //
- // WARNING: Don't access "read" after this point.
- // Don't add code before the warning!
- //
- Mon.IncrementQueueTime(priorityClass, HPMilliSeconds(now - creationTime));
- P_LOG(PRI_DEBUG, BPD37, "enqueued all TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx));
- }
- break;
- }
- default:
- Y_FAIL_S("Unexpected request type# " << ui64(req->GetType()) << " in JointChunkReads");
+
+ Y_VERIFY_S(req->GetType() == ERequestType::RequestChunkReadPiece, "Unexpected request type# " << ui64(req->GetType()) << " in JointChunkReads");
+ TChunkReadPiece *piece = static_cast<TChunkReadPiece*>(req.Get());
+ Y_ABORT_UNLESS(!piece->SelfPointer);
+ TIntrusivePtr<TChunkRead> &read = piece->ChunkRead;
+ TReqId reqId = read->ReqId;
+ ui32 chunkIdx = read->ChunkIdx;
+ ui8 priorityClass = read->PriorityClass;
+ NHPTimer::STime creationTime = read->CreationTime;
+ Y_VERIFY(!read->IsReplied);
+ P_LOG(PRI_NOTICE, BPD36, "Performing TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx),
+ (PieceCurrentSector, piece->PieceCurrentSector),
+ (PieceSizeLimit, piece->PieceSizeLimit),
+ (IsTheLastPiece, piece->IsTheLastPiece),
+ (BufferSize, bufferSize)
+ );
+
+ ui64 currentLimit = Min(bufferSize, piece->PieceSizeLimit);
+ EChunkReadPieceResult result = ChunkReadPiece(read, piece->PieceCurrentSector, currentLimit,
+ piece->SpanStack.GetTraceId(), std::move(piece->Orbit));
+ bool isComplete = (result != ReadPieceResultInProgress);
+ Y_VERIFY_S(isComplete || currentLimit >= piece->PieceSizeLimit, isComplete << " " << currentLimit << " " << piece->PieceSizeLimit);
+ piece->OnSuccessfulDestroy(PCtx->ActorSystem);
+ if (isComplete) {
+ //
+ // WARNING: Don't access "read" after this point.
+ // Don't add code before the warning!
+ //
+ Mon.IncrementQueueTime(priorityClass, HPMilliSeconds(now - creationTime));
+ P_LOG(PRI_NOTICE, BPD37, "enqueued all TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx));
}
}
LWTRACK(PDiskProcessChunkReadQueue, UpdateCycleOrbit, PCtx->PDiskId, JointChunkReads.size());
@@ -3150,65 +3141,46 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
if (!isAdded) {
if (request->GetType() == ERequestType::RequestChunkWrite) {
TIntrusivePtr<TChunkWrite> whole(static_cast<TChunkWrite*>(request));
- ui32 smallJobSize = 0;
- ui32 smallJobCount = 0;
- ui32 largeJobSize = 0;
- SplitChunkJobSize(whole->TotalSize, &smallJobSize, &largeJobSize, &smallJobCount);
- for (ui32 idx = 0; idx < smallJobCount; ++idx) {
- // Schedule small job.
+
+ const ui32 jobSizeLimit = ui64(ForsetiOpPieceSizeCached) * Format.SectorPayloadSize() / Format.SectorSize;
+ const ui32 jobCount = (whole->TotalSize + jobSizeLimit - 1) / jobSizeLimit;
+
+ ui32 remainingSize = whole->TotalSize;
+ for (ui32 idx = 0; idx < jobCount; ++idx) {
auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END);
span.Attribute("small_job_idx", idx)
- .Attribute("is_last_piece", false);
- TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * smallJobSize, smallJobSize, std::move(span));
+ .Attribute("is_last_piece", idx == jobCount - 1);
+ ui32 jobSize = Min(remainingSize, jobSizeLimit);
+ TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * jobSizeLimit, jobSize, std::move(span));
piece->EstimateCost(DriveModel);
AddJobToForseti(cbs, piece, request->JobKind);
+ remainingSize -= jobSize;
}
- // Schedule large job (there always is one)
- auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END);
- span.Attribute("is_last_piece", true);
- TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize, std::move(span));
- piece->EstimateCost(DriveModel);
- AddJobToForseti(cbs, piece, request->JobKind);
- LWTRACK(PDiskChunkWriteAddToScheduler, request->Orbit, PCtx->PDiskId, request->ReqId.Id,
- HPSecondsFloat(HPNow() - request->CreationTime), request->Owner, request->IsFast,
- request->PriorityClass, whole->TotalSize);
+ Y_VERIFY_S(remainingSize == 0, remainingSize);
} else if (request->GetType() == ERequestType::RequestChunkRead) {
TIntrusivePtr<TChunkRead> read = std::move(static_cast<TChunkRead*>(request)->SelfPointer);
ui32 totalSectors = read->LastSector - read->FirstSector + 1;
- ui32 smallJobSize = (ForsetiOpPieceSizeCached + Format.SectorSize - 1) / Format.SectorSize;
- ui32 smallJobCount = totalSectors / smallJobSize;
- if (smallJobCount) {
- smallJobCount--;
- }
- ui32 largeJobSize = totalSectors - smallJobSize * smallJobCount;
-
- for (ui32 idx = 0; idx < smallJobCount; ++idx) {
+ Y_DEBUG_ABORT_UNLESS(ForsetiOpPieceSizeCached % Format.SectorSize == 0);
+ const ui32 jobSizeLimit = ForsetiOpPieceSizeCached / Format.SectorSize;
+ const ui32 jobCount = (totalSectors + jobSizeLimit - 1) / jobSizeLimit;
+ for (ui32 idx = 0; idx < jobCount; ++idx) {
auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkReadPiece", NWilson::EFlags::AUTO_END);
+ bool isLast = idx == jobCount - 1;
span.Attribute("small_job_idx", idx)
- .Attribute("is_last_piece", false);
- // Schedule small job.
- auto piece = new TChunkReadPiece(read, idx * smallJobSize,
- smallJobSize * Format.SectorSize, false, std::move(span));
+ .Attribute("is_last_piece", isLast);
+
+ ui32 jobSize = Min(totalSectors, jobSizeLimit);
+ auto piece = new TChunkReadPiece(read, idx * jobSizeLimit, jobSize * Format.SectorSize, isLast, std::move(span));
read->Orbit.Fork(piece->Orbit);
- LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, idx, idx * smallJobSize * Format.SectorSize,
- smallJobSize * Format.SectorSize);
+ LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, idx, idx * jobSizeLimit * Format.SectorSize,
+ jobSizeLimit * Format.SectorSize);
piece->EstimateCost(DriveModel);
piece->SelfPointer = piece;
AddJobToForseti(cbs, piece, request->JobKind);
+ totalSectors -= jobSize;
}
- // Schedule large job (there always is one)
- auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkReadPiece");
- span.Attribute("is_last_piece", true);
- auto piece = new TChunkReadPiece(read, smallJobCount * smallJobSize,
- largeJobSize * Format.SectorSize, true, std::move(span));
- read->Orbit.Fork(piece->Orbit);
- LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, smallJobCount,
- smallJobCount * smallJobSize * Format.SectorSize, largeJobSize * Format.SectorSize);
- piece->EstimateCost(DriveModel);
- piece->SelfPointer = piece;
- AddJobToForseti(cbs, piece, request->JobKind);
-
+ Y_VERIFY_S(totalSectors == 0, totalSectors);
} else {
AddJobToForseti(cbs, request, request->JobKind);
}
@@ -3221,17 +3193,6 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
}
}
-// Always produces a large job and sometimes produces some small jobs and a large job.
-void TPDisk::SplitChunkJobSize(ui32 totalSize, ui32 *outSmallJobSize, ui32 *outLargeJobSize, ui32 *outSmallJobCount) {
- const ui64 sectorPayloadSize = Format.SectorPayloadSize();
- *outSmallJobSize = (ForsetiOpPieceSizeCached + sectorPayloadSize - 1) / sectorPayloadSize * sectorPayloadSize;
- *outSmallJobCount = totalSize / *outSmallJobSize;
- if (*outSmallJobCount) {
- (*outSmallJobCount)--;
- }
- *outLargeJobSize = totalSize - *outSmallJobSize * *outSmallJobCount;
-}
-
void TPDisk::AddJobToForseti(NSchLab::TCbs *cbs, TRequestBase *request, NSchLab::EJobKind jobKind) {
LWTRACK(PDiskAddToScheduler, request->Orbit, PCtx->PDiskId, request->ReqId.Id, HPSecondsFloat(request->CreationTime),
request->Owner, request->IsFast, request->PriorityClass);
@@ -3458,18 +3419,17 @@ void TPDisk::Update() {
Mon.UpdateDurationTracker.UpdateStarted();
LWTRACK(PDiskUpdateStarted, UpdateCycleOrbit, PCtx->PDiskId);
- // ui32 userSectorSize = 0;
-
- // Make input queue empty
{
TGuard<TMutex> guard(StateMutex);
ForsetiMaxLogBatchNsCached = ForsetiMaxLogBatchNs;
ForsetiOpPieceSizeCached = PDiskCategory.IsSolidState() ? ForsetiOpPieceSizeSsd : ForsetiOpPieceSizeRot;
- EnqueueAll();
- /*userSectorSize = */Format.SectorPayloadSize();
-
+ ForsetiOpPieceSizeCached = Min<i64>(ForsetiOpPieceSizeCached, Cfg->BufferPoolBufferSizeBytes);
+ ForsetiOpPieceSizeCached = AlignDown<i64>(ForsetiOpPieceSizeCached, Format.SectorSize);
// Switch the scheduler when possible
ForsetiScheduler.SetIsBinLogEnabled(EnableForsetiBinLog);
+
+ // Make input queue empty
+ EnqueueAll();
}
// Make token injection to correct drive model underestimations and avoid disk underutilization
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
index 3f789bae54..e571605489 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
@@ -294,8 +294,7 @@ public:
void SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringStream& errorReason,
NKikimrProto::EReplyStatus status);
EChunkReadPieceResult ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector, ui64 pieceSizeLimit,
- ui64 *reallyReadBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit);
- void SplitChunkJobSize(ui32 totalSize, ui32 *outSmallJobSize, ui32 *outLargeJObSize, ui32 *outSmallJobCount);
+ NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Chunk locking
TVector<TChunkIdx> LockChunksForOwner(TOwner owner, const ui32 count, TString &errorReason);