diff options
author | senya0x5f <senya0x5f@yandex-team.com> | 2023-08-11 11:42:23 +0300 |
---|---|---|
committer | senya0x5f <senya0x5f@yandex-team.com> | 2023-08-11 12:42:54 +0300 |
commit | 1eb895279c52b0d2505a31b79ad326b56d0b2681 (patch) | |
tree | 2b33bc30ea8b506d4c2b77c1badc8d5f8b3c4e9d | |
parent | 4359573464bd608d0dc6e7318e2b35879095b54e (diff) | |
download | ydb-1eb895279c52b0d2505a31b79ad326b56d0b2681.tar.gz |
KIKIMR-18909 Rework log cache usage
11 files changed, 177 insertions, 109 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp index c3e0138607..f37a388830 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp @@ -1271,19 +1271,17 @@ public: ui64 chunkIdx = offset / PDisk->Format.ChunkSize; Y_VERIFY(chunkIdx < PDisk->ChunkState.size()); - if (TChunkState::DATA_COMMITTED == PDisk->ChunkState[chunkIdx].CommitState) { - if ((offset % PDisk->Format.ChunkSize) + completion->GetSize() > PDisk->Format.ChunkSize) { - // TODO: split buffer if crossing chunk boundary instead of completely discarding it - LOG_INFO_S( - *ActorSystem, NKikimrServices::BS_DEVICE, - "Skip caching log read due to chunk boundary crossing"); - } else { - if (Cache.Size() >= MaxCount) { - Cache.Pop(); - } + + if ((offset % PDisk->Format.ChunkSize) + completion->GetSize() > PDisk->Format.ChunkSize) { + // TODO: split buffer if crossing chunk boundary instead of completely discarding it + LOG_INFO_S( + *ActorSystem, NKikimrServices::BS_DEVICE, + "Skip caching log read due to chunk boundary crossing"); + } else { + if (Cache.Size() < MaxCount) { const char* dataPtr = static_cast<const char*>(completion->GetData()); - Cache.Insert(dataPtr, completion->GetOffset(), completion->GetSize(), completion->GetBadOffsets()); + Cache.Insert(dataPtr, completion->GetOffset(), completion->GetSize(), completion->GetBadOffsets()); } } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h index ebf8b8f71b..7749b958fb 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h @@ -329,7 +329,7 @@ struct TSysLogRecord { TChunkIdx LogHeadChunkIdx; ui32 Reserved1; ui64 LogHeadChunkPreviousNonce; - TVDiskID OwnerVDisks[256]; + TVDiskID OwnerVDisks[OwnerCount]; TSysLogRecord() : Version(PDISK_SYS_LOG_RECORD_VERSION_7) @@ -337,7 +337,7 @@ struct TSysLogRecord { , Reserved1(0) , LogHeadChunkPreviousNonce((ui64)-1) { - for (size_t i = 0; i < 256; ++i) { + for (size_t i = 0; i < OwnerCount; ++i) { OwnerVDisks[i] = TVDiskID::InvalidId; } } @@ -354,7 +354,7 @@ struct TSysLogRecord { str << " NonceSet# " << Nonces.ToString(isMultiline) << x; str << " LogHeadChunkIdx# " << LogHeadChunkIdx << x; str << " LogHeadChunkPreviousNonce# " << LogHeadChunkPreviousNonce << x; - for (ui32 i = 0; i < 256; ++i) { + for (ui32 i = 0; i < OwnerCount; ++i) { if (OwnerVDisks[i] != TVDiskID::InvalidId) { str << " Owner[" << i << "]# " << OwnerVDisks[i].ToString() << x; } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index 75e271996a..800f8c2a18 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -132,6 +132,7 @@ public: bool TrimInFly = false; // TChunkTrim request is present somewhere in pdisk TAtomic ChunkBeingTrimmed = 0; TAtomic TrimOffset = 0; + TLogRecoveryState LogRecoveryState; // Recovery state: log chunk readers and log chunks order. TList<TLogChunkInfo> LogChunks; // Log chunk list + log-specific information bool IsLogChunksReleaseInflight = false; ui64 InsaneLogChunks = 0; // Set when pdisk sees insanely large log, to give vdisks a chance to cut it @@ -215,6 +216,13 @@ public: bool InitCommonLogger(); bool LogNonceJump(ui64 previousNonce); void GetStartingPoints(TOwner owner, TMap<TLogSignature, TLogRecord> &outStartingPoints); + + /** + * Notifies that log chunk was read by a VDisk. + * @param chunkIdx Chunk's index. + * @param reader VDisk that read the chunk. + */ + void NotifyLogChunkRead(ui32 chunkIdx, TOwner reader); TString StartupOwnerInfo(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Destruction diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp index abdb0e0db0..d962b430b8 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp @@ -44,7 +44,7 @@ bool TPDisk::InitCommonLogger() { } CommonLogger->SwitchToNewChunk(TReqId(TReqId::InitCommonLoggerSwitchToNewChunk, 0), nullptr); - // Log chunk can be collected as soon as noone needs it + // Log chunk can be collected as soon as no one needs it ChunkState[chunkIdx].CommitState = TChunkState::DATA_COMMITTED; } bool isOk = LogNonceJump(InitialPreviousNonce); @@ -200,6 +200,7 @@ bool TPDisk::ProcessChunk0(const NPDisk::TEvReadLogResult &readLogResult, TStrin << " Marker# BPD48"); return false; } + TSysLogRecord *sysLogRecord = (TSysLogRecord*)(lastSysLogRecord.data()); if (sysLogRecord->Version < PDISK_SYS_LOG_RECORD_INCOMPATIBLE_VERSION_1000) { @@ -481,10 +482,9 @@ TRcBuf TPDisk::ProcessReadSysLogResult(ui64 &outWritePosition, ui64 &outLsn, } void TPDisk::ReadAndParseMainLog(const TActorId &pDiskActor) { - TVector<TLogChunkItem> chunksToRead; TIntrusivePtr<TLogReaderBase> logReader(new TLogReader(true, this, ActorSystem, pDiskActor, 0, TLogPosition{0, 0}, EOwnerGroupType::Static, TLogPosition{0, 0}, (ui64)-1, SysLogRecord.LogHeadChunkPreviousNonce, 0, 0, - TReqId(TReqId::ReadAndParseMainLog, 0), std::move(chunksToRead), 0, 0, TVDiskID::InvalidId)); + TReqId(TReqId::ReadAndParseMainLog, 0), TVector<TLogChunkItem>(), 0, 0, TVDiskID::InvalidId)); TVector<ui64> badOffsets; // Emits subrequests TCompletionLogReadPart which contains TIntrusivePtr to logReader logReader->Exec(0, badOffsets, ActorSystem); @@ -524,6 +524,7 @@ void TPDisk::ProcessLogReadQueue() { } ui32 endLogChunkIdx = CommonLogger->ChunkIdx; ui64 endLogSectorIdx = CommonLogger->SectorIdx; + ownerData.LogReader = new TLogReader(false, this, ActorSystem, logRead.Sender, logRead.Owner, logStartPosition, logRead.OwnerGroupType,logRead.Position, @@ -1200,7 +1201,7 @@ void TPDisk::OnLogCommitDone(TLogCommitDone &req) { if (isChunkReleased) { THolder<TCompletionEventSender> completion(new TCompletionEventSender(this)); if (ReleaseUnusedLogChunks(completion.Get())) { - WriteSysLogRestorePoint(completion.Release(), req.ReqId, {}); // FIXME: wilson + WriteSysLogRestorePoint(completion.Release(), req.ReqId, {}); } } TryTrimChunk(false, 0); @@ -1247,6 +1248,28 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) { } } +void TPDisk::NotifyLogChunkRead(ui32 chunkIdx, TOwner reader) { + TGuard<TMutex> guard(StateMutex); + auto iter = LogRecoveryState.Readers.find(chunkIdx); + + if (iter == LogRecoveryState.Readers.end()) { + return; + } + + auto &readers = iter->second; + + if (readers.any()) { + // If there's at least one registered reader. + readers.set(reader, false); + + if (readers.none()) { + LogRecoveryState.Readers.erase(iter); + + BlockDevice->EraseCacheRange(Format.Offset(chunkIdx, 0), Format.Offset(chunkIdx + 1, 0)); + } + } +} + // Schedules EvReadLogResult event for the system log void TPDisk::InitiateReadSysLog(const TActorId &pDiskActor) { Y_VERIFY_S(PDiskThread.Running(), "expect PDiskThread to be running"); @@ -1294,6 +1317,7 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul switch (InitPhase) { case EInitPhase::ReadingSysLog: { + // Finished reading sys log. TString errorReason; bool success = ProcessChunk0(evReadLogResult, errorReason); @@ -1315,6 +1339,7 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul } case EInitPhase::ReadingLog: { + // Finished reading main log. InitialLogPosition = evReadLogResult.NextPosition; if (InitialLogPosition == TLogPosition{0, 0}) { *Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialCommonLogParseError; @@ -1403,6 +1428,17 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul ActorSystem->Send(pDiskActor, new TEvLogInitResult(false, errorReason)); return; } + + // For every log chunk build a list of readers. These readers will be reading this chunk. + for (auto it = LogChunks.begin(); it != LogChunks.end(); ++it) { + auto &readers = LogRecoveryState.Readers[it->ChunkIdx]; + + for (ui32 owner = 0; owner < it->OwnerLsnRange.size(); ++owner) { + if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) { + readers.set(owner, true); + } + } + } } // Increase Nonces to prevent collisions diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp index 448eb8480a..48de6dfcfc 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp @@ -15,14 +15,6 @@ TLogCache::TCacheRecord::TCacheRecord(TCacheRecord&& other) , BadOffsets(std::move(other.BadOffsets)) {} -TLogCache::TItem::TItem(TItem&& other) - : Value(std::move(other.Value)) -{} - -TLogCache::TItem::TItem(TCacheRecord&& value) - : Value(std::move(value)) -{} - size_t TLogCache::Size() const { return Index.size(); } @@ -52,15 +44,7 @@ FindKeyLessEqual(C& c, const typename C::key_type& key) { } bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func) { - return Find(offset, size, buffer, func, true); -} - -bool TLogCache::FindWithoutPromote(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func) { - return Find(offset, size, buffer, func, false); -} - -bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, std::function<void(const std::vector<ui64>&)> func, bool promote) { - TVector<TItem*> res; + TVector<TCacheRecord*> res; auto indexIt = FindKeyLessEqual(Index, offset); @@ -73,7 +57,7 @@ bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, std::function<void(co while (indexIt != Index.end() && cur < end) { ui64 recStart = indexIt->first; - ui64 recEnd = recStart + indexIt->second.Value.Data.Size(); + ui64 recEnd = recStart + indexIt->second.Data.Size(); if (cur >= recStart && cur < recEnd) { res.push_back(&indexIt->second); @@ -90,9 +74,7 @@ bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, std::function<void(co return false; } - for (auto item : res) { - auto cacheRecord = &item->Value; - + for (auto cacheRecord : res) { ui64 recStart = cacheRecord->Offset; ui64 recEnd = recStart + cacheRecord->Data.Size(); @@ -109,24 +91,11 @@ bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, std::function<void(co // Notify callee of bad offsets. func(cacheRecord->BadOffsets); - - if (promote) { - List.PushFront(item); - } } return true; } -bool TLogCache::Pop() { - if (Index.empty()) - return false; - - TItem* item = List.PopBack(); - Index.erase(item->Value.Offset); - return true; -} - std::pair<i64, i64> TLogCache::PrepareInsertion(ui64 start, ui32 size) { ui64 end = start + size; ui32 leftPadding = 0; @@ -136,7 +105,7 @@ std::pair<i64, i64> TLogCache::PrepareInsertion(ui64 start, ui32 size) { auto it1 = FindKeyLessEqual(Index, start); if (it1 != Index.end()) { ui64 maybeStart = it1->first; - ui64 maybeEnd = maybeStart + it1->second.Value.Data.Size(); + ui64 maybeEnd = maybeStart + it1->second.Data.Size(); if (start < maybeEnd) { if (end <= maybeEnd) { @@ -146,24 +115,36 @@ std::pair<i64, i64> TLogCache::PrepareInsertion(ui64 start, ui32 size) { } } + ui64 offsetStart = start + leftPadding; + // Check if there is a block that overlaps with the new insertion's end. auto it2 = FindKeyLess(Index, end); if (it2 != Index.end()) { + ui64 dataSize = it2->second.Data.Size(); + ui64 maybeStart = it2->first; - ui64 maybeEnd = maybeStart + it2->second.Value.Data.Size(); + ui64 maybeEnd = maybeStart + dataSize; + + if (offsetStart == maybeStart) { + // There is an overlapping block; return {-1, -1} to indicate it. + if (end <= maybeEnd) { + return {-1, -1}; + } + + leftPadding += dataSize; + } if (end < maybeEnd) { rightPadding = end - maybeStart; } } - - // Remove any blocks that are completely covered by the new insertion. - ui64 offsetStart = start + leftPadding; + ui64 offsetEnd = start + (size - rightPadding); + // Remove any blocks that are completely covered by the new insertion. auto it = Index.upper_bound(offsetStart); while (it != Index.end()) { - ui64 blockEnd = it->first + it->second.Value.Data.Size(); + ui64 blockEnd = it->first + it->second.Data.Size(); if (blockEnd < offsetEnd) { it = Index.erase(it); } else { @@ -194,8 +175,6 @@ bool TLogCache::Insert(const char* dataPtr, ui64 offset, ui32 size, const TVecto Y_VERIFY_DEBUG(inserted); - List.PushFront(&it->second); - return true; } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h index 79bc068ff4..bf46ab0149 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h @@ -8,8 +8,9 @@ namespace NKikimr { namespace NPDisk { /** - * Key-value LRU cache without automatic eviction, but able to erase range of keys. - **/ + * Key-value cache without automatic eviction, but able to erase range of keys. + * Entries do not intersect with each other. + */ class TLogCache { private: struct TCacheRecord { @@ -22,20 +23,7 @@ private: TCacheRecord(ui64 offset, TRcBuf data, TVector<ui64> badOffsets); }; - /** - * Nested class representing a cache entry in the doubly linked list. - * Inherits from TIntrusiveListItem to maintain the LRU order. - */ - struct TItem : public TIntrusiveListItem<TItem> { - TCacheRecord Value; - - // custom constructors ignoring TIntrusiveListItem - TItem(TItem&& other); - explicit TItem(TCacheRecord&& value); - }; - - using TListType = TIntrusiveList<TItem>; - using TIndex = TMap<ui64, TItem>; + using TIndex = TMap<ui64, TCacheRecord>; public: using TBadOffsetsHandler = std::function<void(const std::vector<ui64>&)>; @@ -46,17 +34,6 @@ public: size_t Size() const; /** - * Finds a cache record by its offset and a specified size, copies the data to the buffer, - * and promotes the record to the front of the cache list. - * @param offset The offset key to search for. - * @param size The size of data to copy. - * @param buffer The buffer to store the copied data. - * @param func Optional custom function to handle bad offsets. - * @return True if the cache record is found and data is copied; otherwise, false. - */ - bool Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func = [](const std::vector<ui64>&) {}); - - /** * Finds a cache record by its offset and a specified size, copies the data to the buffer. * @param offset The offset key to search for. * @param size The size of data to copy. @@ -64,13 +41,7 @@ public: * @param func Optional custom function to handle bad offsets. * @return True if the cache record is found and data is copied; otherwise, false. */ - bool FindWithoutPromote(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func = [](const std::vector<ui64>&) {}); - - /** - * Removes the least recently used cache record from the cache. - * @return True if a cache record was removed; otherwise, false (cache is empty). - */ - bool Pop(); + bool Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func = [](const std::vector<ui64>&) {}); /** * Inserts a new cache record into the cache. @@ -96,7 +67,6 @@ public: void Clear(); private: - TListType List; TIndex Index; /** @@ -107,8 +77,6 @@ private: * @return A pair of i64 values representing left and right data paddings. */ std::pair<i64, i64> PrepareInsertion(ui64 offset, ui32 size); - - bool Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func, bool promote); }; } // NPDisk diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp index 0e268399a2..dd072ab91e 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp @@ -17,13 +17,13 @@ Y_UNIT_TEST_SUITE(TLogCache) { UNIT_ASSERT_STRINGS_EQUAL(buf, "a"); UNIT_ASSERT(cache.Insert("c", 3, 1)); - UNIT_ASSERT(cache.Pop()); // 2 must be evicted + UNIT_ASSERT_EQUAL(1, cache.EraseRange(2, 3)); // 2 was removed UNIT_ASSERT_EQUAL(cache.Size(), 2); UNIT_ASSERT(!cache.Find(2, 1, buf)); UNIT_ASSERT(cache.Find(3, 1, buf)); UNIT_ASSERT_STRINGS_EQUAL(buf, "c"); - UNIT_ASSERT(cache.Pop()); // 1 must be evicted + UNIT_ASSERT_EQUAL(1, cache.EraseRange(1, 2)); // 1 was removed UNIT_ASSERT(cache.Insert("d", 4, 1)); UNIT_ASSERT_EQUAL(cache.Size(), 2); @@ -31,16 +31,14 @@ Y_UNIT_TEST_SUITE(TLogCache) { UNIT_ASSERT(cache.Find(4, 1, buf)); UNIT_ASSERT_STRINGS_EQUAL(buf, "d"); - UNIT_ASSERT(cache.Pop()); // 3 must be evicted + UNIT_ASSERT_EQUAL(1, cache.EraseRange(3, 4)); // 3 was removed UNIT_ASSERT_EQUAL(cache.Size(), 1); UNIT_ASSERT(!cache.Find(3, 1, buf)); UNIT_ASSERT(cache.Find(4, 1, buf)); UNIT_ASSERT_STRINGS_EQUAL(buf, "d"); - UNIT_ASSERT_EQUAL(1, cache.EraseRange(3, 5)); - UNIT_ASSERT_EQUAL(cache.Size(), 0); - UNIT_ASSERT(!cache.Pop()); + UNIT_ASSERT_EQUAL(1, cache.EraseRange(4, 5)); UNIT_ASSERT_EQUAL(cache.Size(), 0); } @@ -189,6 +187,33 @@ Y_UNIT_TEST_SUITE(TLogCache) { UNIT_ASSERT(!cache.Find(3, 10, buf3)); UNIT_ASSERT_STRINGS_EQUAL(buf3, ""); } + + { + TLogCache cache; + + UNIT_ASSERT(cache.Insert("abcdefghij", 0, 10)); + UNIT_ASSERT_EQUAL(1, cache.Size()); + + UNIT_ASSERT(cache.Insert("klmno", 10, 5)); + UNIT_ASSERT_EQUAL(2, cache.Size()); + + UNIT_ASSERT(!cache.Insert("fghijklmno", 5, 10)); + + UNIT_ASSERT_EQUAL(2, cache.Size()); + } + + { + TLogCache cache; + + UNIT_ASSERT(cache.Insert("abcdefghij", 0, 10)); + UNIT_ASSERT_EQUAL(1, cache.Size()); + + UNIT_ASSERT(cache.Insert("klmno", 10, 5)); + UNIT_ASSERT_EQUAL(2, cache.Size()); + + UNIT_ASSERT(cache.Insert("fghijklmnopq", 5, 12)); + UNIT_ASSERT_EQUAL(3, cache.Size()); + } } TLogCache SetupCache(const TVector<std::pair<ui64, TString>>& content = {{5, "x"}, {1, "y"}, {10, "z"}}) { @@ -207,15 +232,15 @@ Y_UNIT_TEST_SUITE(TLogCache) { char buf[2] = {}; for (auto pair : content) { - UNIT_ASSERT(cache.FindWithoutPromote(pair.first, 1, buf)); + UNIT_ASSERT(cache.Find(pair.first, 1, buf)); UNIT_ASSERT_STRINGS_EQUAL(pair.second, buf); } for (auto pair : content) { - UNIT_ASSERT(cache.Pop()); + cache.EraseRange(pair.first, pair.first + 1); - UNIT_ASSERT(!cache.FindWithoutPromote(pair.first, 1, buf)); + UNIT_ASSERT(!cache.Find(pair.first, 1, buf)); } } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp index add4a820ac..374827cfaa 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp @@ -1148,13 +1148,22 @@ void TLogReader::Reply() { PDisk->ProcessChunkOwnerMap(*ChunkOwnerMap.Get()); ChunkOwnerMap.Destroy(); + // Remove invalid part of the last log chunk. PDisk->BlockDevice->EraseCacheRange( - PDisk->Format.Offset(ChunkIdx, 0), + PDisk->Format.Offset(ChunkIdx, SectorIdx) + OffsetInSector, PDisk->Format.Offset(ChunkIdx + 1, 0) ); } LOG_DEBUG(*PDisk->ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " To ownerId# %" PRIu32 " %s", (ui32)PDisk->PDiskId, (ui32)Owner, Result->ToString().c_str()); + + if (!IsInitial && Result->IsEndOfLog) { + ui32 prevChunkIdx = ChunkIdx; + + // Finished reading owner's whole log. + PDisk->NotifyLogChunkRead(prevChunkIdx, Owner); + } + ActorSystem->Send(ReplyTo, Result.Release()); if (!IsInitial) { PDisk->Mon.LogRead.CountResponse(ResultSize); @@ -1296,6 +1305,13 @@ void TLogReader::UpdateNewChunkInfo(ui32 currChunk, const TMaybe<ui32> prevChunk } void TLogReader::SwitchToChunk(ui32 chunkIdx) { + if (!IsInitial) { + ui32 prevChunkIdx = ChunkIdx; + + // Finished reading log chunk. + PDisk->NotifyLogChunkRead(prevChunkIdx, Owner); + } + ChunkIdx = chunkIdx; SectorIdx = 0; OffsetInSector = 0; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h index 9318239f65..8c2568941b 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h @@ -35,7 +35,7 @@ class TPDisk; struct TLogChunkItem { TChunkIdx ChunkIdx; bool IsPreviousChunkDropped; - bool IsPreviousChunkCut;; + bool IsPreviousChunkCut; TLogChunkItem(TChunkIdx chunkIdx, bool isPreviousChunkDropped, bool isPreviousChunkCut) : ChunkIdx(chunkIdx) diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h index a8ccc2d741..f0c95dae26 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h @@ -23,7 +23,7 @@ enum class EInitPhase { }; enum EOwner { - OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, mens "for dynamic" in requests + OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, means "for dynamic" in requests OwnerUnallocated = 1, // Unallocated chunks, Trim scheduling, Slay commands OwnerBeginUser = 2, OwnerEndUser = 241, @@ -338,6 +338,14 @@ struct TLogChunkInfo { } }; +struct TLogRecoveryState { + TMap<TChunkIdx, std::bitset<OwnerCount>> Readers; // Per-chunk information about future readers. + + void Clear() { + Readers.clear(); + } +}; + } // NPDisk } // NKikimr diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp index 4d2f648eea..1f8ce61edb 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp @@ -466,6 +466,36 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { vdisk.SendEvLogSync(); } + Y_UNIT_TEST(PDiskRestartUsesLogRecoveryState) { + TActorTestContext testCtx({ + .IsBad = false, + .ChunkSize = 4 * (1 << 20) + }); + TVDiskMock vdisk(&testCtx); + vdisk.InitFull(); + + // Fill one log chunk so that on restart it is fully read. + for (int i = 0; i < 128; i++) { + vdisk.SendEvLogSync(32768); + } + + testCtx.RestartPDiskSync(); + + vdisk.Init(); + + // Assert recovery state with readers 1. + testCtx.SafeRunOnPDisk([](NPDisk::TPDisk* disk) { + UNIT_ASSERT_EQUAL(1, disk->LogRecoveryState.Readers.size()); + }); + + vdisk.ReadLog(); + + // Assert recovery state with readers 0. + testCtx.SafeRunOnPDisk([](NPDisk::TPDisk* disk) { + UNIT_ASSERT_EQUAL(0, disk->LogRecoveryState.Readers.size()); + }); + } + Y_UNIT_TEST(PDiskRestartManyLogWrites) { TActorTestContext testCtx({ false }); testCtx.TestCtx.SectorMap->ImitateIoErrorProbability = 1e-4; |