aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsenya0x5f <senya0x5f@yandex-team.com>2023-08-11 11:42:23 +0300
committersenya0x5f <senya0x5f@yandex-team.com>2023-08-11 12:42:54 +0300
commit1eb895279c52b0d2505a31b79ad326b56d0b2681 (patch)
tree2b33bc30ea8b506d4c2b77c1badc8d5f8b3c4e9d
parent4359573464bd608d0dc6e7318e2b35879095b54e (diff)
downloadydb-1eb895279c52b0d2505a31b79ad326b56d0b2681.tar.gz
KIKIMR-18909 Rework log cache usage
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp20
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h6
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h8
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp44
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp63
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h42
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp43
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp18
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h10
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp30
11 files changed, 177 insertions, 109 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
index c3e0138607..f37a388830 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
@@ -1271,19 +1271,17 @@ public:
ui64 chunkIdx = offset / PDisk->Format.ChunkSize;
Y_VERIFY(chunkIdx < PDisk->ChunkState.size());
- if (TChunkState::DATA_COMMITTED == PDisk->ChunkState[chunkIdx].CommitState) {
- if ((offset % PDisk->Format.ChunkSize) + completion->GetSize() > PDisk->Format.ChunkSize) {
- // TODO: split buffer if crossing chunk boundary instead of completely discarding it
- LOG_INFO_S(
- *ActorSystem, NKikimrServices::BS_DEVICE,
- "Skip caching log read due to chunk boundary crossing");
- } else {
- if (Cache.Size() >= MaxCount) {
- Cache.Pop();
- }
+
+ if ((offset % PDisk->Format.ChunkSize) + completion->GetSize() > PDisk->Format.ChunkSize) {
+ // TODO: split buffer if crossing chunk boundary instead of completely discarding it
+ LOG_INFO_S(
+ *ActorSystem, NKikimrServices::BS_DEVICE,
+ "Skip caching log read due to chunk boundary crossing");
+ } else {
+ if (Cache.Size() < MaxCount) {
const char* dataPtr = static_cast<const char*>(completion->GetData());
- Cache.Insert(dataPtr, completion->GetOffset(), completion->GetSize(), completion->GetBadOffsets());
+ Cache.Insert(dataPtr, completion->GetOffset(), completion->GetSize(), completion->GetBadOffsets());
}
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h
index ebf8b8f71b..7749b958fb 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h
@@ -329,7 +329,7 @@ struct TSysLogRecord {
TChunkIdx LogHeadChunkIdx;
ui32 Reserved1;
ui64 LogHeadChunkPreviousNonce;
- TVDiskID OwnerVDisks[256];
+ TVDiskID OwnerVDisks[OwnerCount];
TSysLogRecord()
: Version(PDISK_SYS_LOG_RECORD_VERSION_7)
@@ -337,7 +337,7 @@ struct TSysLogRecord {
, Reserved1(0)
, LogHeadChunkPreviousNonce((ui64)-1)
{
- for (size_t i = 0; i < 256; ++i) {
+ for (size_t i = 0; i < OwnerCount; ++i) {
OwnerVDisks[i] = TVDiskID::InvalidId;
}
}
@@ -354,7 +354,7 @@ struct TSysLogRecord {
str << " NonceSet# " << Nonces.ToString(isMultiline) << x;
str << " LogHeadChunkIdx# " << LogHeadChunkIdx << x;
str << " LogHeadChunkPreviousNonce# " << LogHeadChunkPreviousNonce << x;
- for (ui32 i = 0; i < 256; ++i) {
+ for (ui32 i = 0; i < OwnerCount; ++i) {
if (OwnerVDisks[i] != TVDiskID::InvalidId) {
str << " Owner[" << i << "]# " << OwnerVDisks[i].ToString() << x;
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
index 75e271996a..800f8c2a18 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
@@ -132,6 +132,7 @@ public:
bool TrimInFly = false; // TChunkTrim request is present somewhere in pdisk
TAtomic ChunkBeingTrimmed = 0;
TAtomic TrimOffset = 0;
+ TLogRecoveryState LogRecoveryState; // Recovery state: log chunk readers and log chunks order.
TList<TLogChunkInfo> LogChunks; // Log chunk list + log-specific information
bool IsLogChunksReleaseInflight = false;
ui64 InsaneLogChunks = 0; // Set when pdisk sees insanely large log, to give vdisks a chance to cut it
@@ -215,6 +216,13 @@ public:
bool InitCommonLogger();
bool LogNonceJump(ui64 previousNonce);
void GetStartingPoints(TOwner owner, TMap<TLogSignature, TLogRecord> &outStartingPoints);
+
+ /**
+ * Notifies that log chunk was read by a VDisk.
+ * @param chunkIdx Chunk's index.
+ * @param reader VDisk that read the chunk.
+ */
+ void NotifyLogChunkRead(ui32 chunkIdx, TOwner reader);
TString StartupOwnerInfo();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Destruction
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
index abdb0e0db0..d962b430b8 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
@@ -44,7 +44,7 @@ bool TPDisk::InitCommonLogger() {
}
CommonLogger->SwitchToNewChunk(TReqId(TReqId::InitCommonLoggerSwitchToNewChunk, 0), nullptr);
- // Log chunk can be collected as soon as noone needs it
+ // Log chunk can be collected as soon as no one needs it
ChunkState[chunkIdx].CommitState = TChunkState::DATA_COMMITTED;
}
bool isOk = LogNonceJump(InitialPreviousNonce);
@@ -200,6 +200,7 @@ bool TPDisk::ProcessChunk0(const NPDisk::TEvReadLogResult &readLogResult, TStrin
<< " Marker# BPD48");
return false;
}
+
TSysLogRecord *sysLogRecord = (TSysLogRecord*)(lastSysLogRecord.data());
if (sysLogRecord->Version < PDISK_SYS_LOG_RECORD_INCOMPATIBLE_VERSION_1000) {
@@ -481,10 +482,9 @@ TRcBuf TPDisk::ProcessReadSysLogResult(ui64 &outWritePosition, ui64 &outLsn,
}
void TPDisk::ReadAndParseMainLog(const TActorId &pDiskActor) {
- TVector<TLogChunkItem> chunksToRead;
TIntrusivePtr<TLogReaderBase> logReader(new TLogReader(true, this, ActorSystem, pDiskActor, 0, TLogPosition{0, 0},
EOwnerGroupType::Static, TLogPosition{0, 0}, (ui64)-1, SysLogRecord.LogHeadChunkPreviousNonce, 0, 0,
- TReqId(TReqId::ReadAndParseMainLog, 0), std::move(chunksToRead), 0, 0, TVDiskID::InvalidId));
+ TReqId(TReqId::ReadAndParseMainLog, 0), TVector<TLogChunkItem>(), 0, 0, TVDiskID::InvalidId));
TVector<ui64> badOffsets;
// Emits subrequests TCompletionLogReadPart which contains TIntrusivePtr to logReader
logReader->Exec(0, badOffsets, ActorSystem);
@@ -524,6 +524,7 @@ void TPDisk::ProcessLogReadQueue() {
}
ui32 endLogChunkIdx = CommonLogger->ChunkIdx;
ui64 endLogSectorIdx = CommonLogger->SectorIdx;
+
ownerData.LogReader = new TLogReader(false,
this, ActorSystem, logRead.Sender, logRead.Owner, logStartPosition,
logRead.OwnerGroupType,logRead.Position,
@@ -1200,7 +1201,7 @@ void TPDisk::OnLogCommitDone(TLogCommitDone &req) {
if (isChunkReleased) {
THolder<TCompletionEventSender> completion(new TCompletionEventSender(this));
if (ReleaseUnusedLogChunks(completion.Get())) {
- WriteSysLogRestorePoint(completion.Release(), req.ReqId, {}); // FIXME: wilson
+ WriteSysLogRestorePoint(completion.Release(), req.ReqId, {});
}
}
TryTrimChunk(false, 0);
@@ -1247,6 +1248,28 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) {
}
}
+void TPDisk::NotifyLogChunkRead(ui32 chunkIdx, TOwner reader) {
+ TGuard<TMutex> guard(StateMutex);
+ auto iter = LogRecoveryState.Readers.find(chunkIdx);
+
+ if (iter == LogRecoveryState.Readers.end()) {
+ return;
+ }
+
+ auto &readers = iter->second;
+
+ if (readers.any()) {
+ // If there's at least one registered reader.
+ readers.set(reader, false);
+
+ if (readers.none()) {
+ LogRecoveryState.Readers.erase(iter);
+
+ BlockDevice->EraseCacheRange(Format.Offset(chunkIdx, 0), Format.Offset(chunkIdx + 1, 0));
+ }
+ }
+}
+
// Schedules EvReadLogResult event for the system log
void TPDisk::InitiateReadSysLog(const TActorId &pDiskActor) {
Y_VERIFY_S(PDiskThread.Running(), "expect PDiskThread to be running");
@@ -1294,6 +1317,7 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
switch (InitPhase) {
case EInitPhase::ReadingSysLog:
{
+ // Finished reading sys log.
TString errorReason;
bool success = ProcessChunk0(evReadLogResult, errorReason);
@@ -1315,6 +1339,7 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
}
case EInitPhase::ReadingLog:
{
+ // Finished reading main log.
InitialLogPosition = evReadLogResult.NextPosition;
if (InitialLogPosition == TLogPosition{0, 0}) {
*Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialCommonLogParseError;
@@ -1403,6 +1428,17 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
ActorSystem->Send(pDiskActor, new TEvLogInitResult(false, errorReason));
return;
}
+
+ // For every log chunk build a list of readers. These readers will be reading this chunk.
+ for (auto it = LogChunks.begin(); it != LogChunks.end(); ++it) {
+ auto &readers = LogRecoveryState.Readers[it->ChunkIdx];
+
+ for (ui32 owner = 0; owner < it->OwnerLsnRange.size(); ++owner) {
+ if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) {
+ readers.set(owner, true);
+ }
+ }
+ }
}
// Increase Nonces to prevent collisions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp
index 448eb8480a..48de6dfcfc 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp
@@ -15,14 +15,6 @@ TLogCache::TCacheRecord::TCacheRecord(TCacheRecord&& other)
, BadOffsets(std::move(other.BadOffsets))
{}
-TLogCache::TItem::TItem(TItem&& other)
- : Value(std::move(other.Value))
-{}
-
-TLogCache::TItem::TItem(TCacheRecord&& value)
- : Value(std::move(value))
-{}
-
size_t TLogCache::Size() const {
return Index.size();
}
@@ -52,15 +44,7 @@ FindKeyLessEqual(C& c, const typename C::key_type& key) {
}
bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func) {
- return Find(offset, size, buffer, func, true);
-}
-
-bool TLogCache::FindWithoutPromote(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func) {
- return Find(offset, size, buffer, func, false);
-}
-
-bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, std::function<void(const std::vector<ui64>&)> func, bool promote) {
- TVector<TItem*> res;
+ TVector<TCacheRecord*> res;
auto indexIt = FindKeyLessEqual(Index, offset);
@@ -73,7 +57,7 @@ bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, std::function<void(co
while (indexIt != Index.end() && cur < end) {
ui64 recStart = indexIt->first;
- ui64 recEnd = recStart + indexIt->second.Value.Data.Size();
+ ui64 recEnd = recStart + indexIt->second.Data.Size();
if (cur >= recStart && cur < recEnd) {
res.push_back(&indexIt->second);
@@ -90,9 +74,7 @@ bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, std::function<void(co
return false;
}
- for (auto item : res) {
- auto cacheRecord = &item->Value;
-
+ for (auto cacheRecord : res) {
ui64 recStart = cacheRecord->Offset;
ui64 recEnd = recStart + cacheRecord->Data.Size();
@@ -109,24 +91,11 @@ bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, std::function<void(co
// Notify callee of bad offsets.
func(cacheRecord->BadOffsets);
-
- if (promote) {
- List.PushFront(item);
- }
}
return true;
}
-bool TLogCache::Pop() {
- if (Index.empty())
- return false;
-
- TItem* item = List.PopBack();
- Index.erase(item->Value.Offset);
- return true;
-}
-
std::pair<i64, i64> TLogCache::PrepareInsertion(ui64 start, ui32 size) {
ui64 end = start + size;
ui32 leftPadding = 0;
@@ -136,7 +105,7 @@ std::pair<i64, i64> TLogCache::PrepareInsertion(ui64 start, ui32 size) {
auto it1 = FindKeyLessEqual(Index, start);
if (it1 != Index.end()) {
ui64 maybeStart = it1->first;
- ui64 maybeEnd = maybeStart + it1->second.Value.Data.Size();
+ ui64 maybeEnd = maybeStart + it1->second.Data.Size();
if (start < maybeEnd) {
if (end <= maybeEnd) {
@@ -146,24 +115,36 @@ std::pair<i64, i64> TLogCache::PrepareInsertion(ui64 start, ui32 size) {
}
}
+ ui64 offsetStart = start + leftPadding;
+
// Check if there is a block that overlaps with the new insertion's end.
auto it2 = FindKeyLess(Index, end);
if (it2 != Index.end()) {
+ ui64 dataSize = it2->second.Data.Size();
+
ui64 maybeStart = it2->first;
- ui64 maybeEnd = maybeStart + it2->second.Value.Data.Size();
+ ui64 maybeEnd = maybeStart + dataSize;
+
+ if (offsetStart == maybeStart) {
+ // There is an overlapping block; return {-1, -1} to indicate it.
+ if (end <= maybeEnd) {
+ return {-1, -1};
+ }
+
+ leftPadding += dataSize;
+ }
if (end < maybeEnd) {
rightPadding = end - maybeStart;
}
}
-
- // Remove any blocks that are completely covered by the new insertion.
- ui64 offsetStart = start + leftPadding;
+
ui64 offsetEnd = start + (size - rightPadding);
+ // Remove any blocks that are completely covered by the new insertion.
auto it = Index.upper_bound(offsetStart);
while (it != Index.end()) {
- ui64 blockEnd = it->first + it->second.Value.Data.Size();
+ ui64 blockEnd = it->first + it->second.Data.Size();
if (blockEnd < offsetEnd) {
it = Index.erase(it);
} else {
@@ -194,8 +175,6 @@ bool TLogCache::Insert(const char* dataPtr, ui64 offset, ui32 size, const TVecto
Y_VERIFY_DEBUG(inserted);
- List.PushFront(&it->second);
-
return true;
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h
index 79bc068ff4..bf46ab0149 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h
@@ -8,8 +8,9 @@ namespace NKikimr {
namespace NPDisk {
/**
- * Key-value LRU cache without automatic eviction, but able to erase range of keys.
- **/
+ * Key-value cache without automatic eviction, but able to erase range of keys.
+ * Entries do not intersect with each other.
+ */
class TLogCache {
private:
struct TCacheRecord {
@@ -22,20 +23,7 @@ private:
TCacheRecord(ui64 offset, TRcBuf data, TVector<ui64> badOffsets);
};
- /**
- * Nested class representing a cache entry in the doubly linked list.
- * Inherits from TIntrusiveListItem to maintain the LRU order.
- */
- struct TItem : public TIntrusiveListItem<TItem> {
- TCacheRecord Value;
-
- // custom constructors ignoring TIntrusiveListItem
- TItem(TItem&& other);
- explicit TItem(TCacheRecord&& value);
- };
-
- using TListType = TIntrusiveList<TItem>;
- using TIndex = TMap<ui64, TItem>;
+ using TIndex = TMap<ui64, TCacheRecord>;
public:
using TBadOffsetsHandler = std::function<void(const std::vector<ui64>&)>;
@@ -46,17 +34,6 @@ public:
size_t Size() const;
/**
- * Finds a cache record by its offset and a specified size, copies the data to the buffer,
- * and promotes the record to the front of the cache list.
- * @param offset The offset key to search for.
- * @param size The size of data to copy.
- * @param buffer The buffer to store the copied data.
- * @param func Optional custom function to handle bad offsets.
- * @return True if the cache record is found and data is copied; otherwise, false.
- */
- bool Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func = [](const std::vector<ui64>&) {});
-
- /**
* Finds a cache record by its offset and a specified size, copies the data to the buffer.
* @param offset The offset key to search for.
* @param size The size of data to copy.
@@ -64,13 +41,7 @@ public:
* @param func Optional custom function to handle bad offsets.
* @return True if the cache record is found and data is copied; otherwise, false.
*/
- bool FindWithoutPromote(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func = [](const std::vector<ui64>&) {});
-
- /**
- * Removes the least recently used cache record from the cache.
- * @return True if a cache record was removed; otherwise, false (cache is empty).
- */
- bool Pop();
+ bool Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func = [](const std::vector<ui64>&) {});
/**
* Inserts a new cache record into the cache.
@@ -96,7 +67,6 @@ public:
void Clear();
private:
- TListType List;
TIndex Index;
/**
@@ -107,8 +77,6 @@ private:
* @return A pair of i64 values representing left and right data paddings.
*/
std::pair<i64, i64> PrepareInsertion(ui64 offset, ui32 size);
-
- bool Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func, bool promote);
};
} // NPDisk
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp
index 0e268399a2..dd072ab91e 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp
@@ -17,13 +17,13 @@ Y_UNIT_TEST_SUITE(TLogCache) {
UNIT_ASSERT_STRINGS_EQUAL(buf, "a");
UNIT_ASSERT(cache.Insert("c", 3, 1));
- UNIT_ASSERT(cache.Pop()); // 2 must be evicted
+ UNIT_ASSERT_EQUAL(1, cache.EraseRange(2, 3)); // 2 was removed
UNIT_ASSERT_EQUAL(cache.Size(), 2);
UNIT_ASSERT(!cache.Find(2, 1, buf));
UNIT_ASSERT(cache.Find(3, 1, buf));
UNIT_ASSERT_STRINGS_EQUAL(buf, "c");
- UNIT_ASSERT(cache.Pop()); // 1 must be evicted
+ UNIT_ASSERT_EQUAL(1, cache.EraseRange(1, 2)); // 1 was removed
UNIT_ASSERT(cache.Insert("d", 4, 1));
UNIT_ASSERT_EQUAL(cache.Size(), 2);
@@ -31,16 +31,14 @@ Y_UNIT_TEST_SUITE(TLogCache) {
UNIT_ASSERT(cache.Find(4, 1, buf));
UNIT_ASSERT_STRINGS_EQUAL(buf, "d");
- UNIT_ASSERT(cache.Pop()); // 3 must be evicted
+ UNIT_ASSERT_EQUAL(1, cache.EraseRange(3, 4)); // 3 was removed
UNIT_ASSERT_EQUAL(cache.Size(), 1);
UNIT_ASSERT(!cache.Find(3, 1, buf));
UNIT_ASSERT(cache.Find(4, 1, buf));
UNIT_ASSERT_STRINGS_EQUAL(buf, "d");
- UNIT_ASSERT_EQUAL(1, cache.EraseRange(3, 5));
- UNIT_ASSERT_EQUAL(cache.Size(), 0);
- UNIT_ASSERT(!cache.Pop());
+ UNIT_ASSERT_EQUAL(1, cache.EraseRange(4, 5));
UNIT_ASSERT_EQUAL(cache.Size(), 0);
}
@@ -189,6 +187,33 @@ Y_UNIT_TEST_SUITE(TLogCache) {
UNIT_ASSERT(!cache.Find(3, 10, buf3));
UNIT_ASSERT_STRINGS_EQUAL(buf3, "");
}
+
+ {
+ TLogCache cache;
+
+ UNIT_ASSERT(cache.Insert("abcdefghij", 0, 10));
+ UNIT_ASSERT_EQUAL(1, cache.Size());
+
+ UNIT_ASSERT(cache.Insert("klmno", 10, 5));
+ UNIT_ASSERT_EQUAL(2, cache.Size());
+
+ UNIT_ASSERT(!cache.Insert("fghijklmno", 5, 10));
+
+ UNIT_ASSERT_EQUAL(2, cache.Size());
+ }
+
+ {
+ TLogCache cache;
+
+ UNIT_ASSERT(cache.Insert("abcdefghij", 0, 10));
+ UNIT_ASSERT_EQUAL(1, cache.Size());
+
+ UNIT_ASSERT(cache.Insert("klmno", 10, 5));
+ UNIT_ASSERT_EQUAL(2, cache.Size());
+
+ UNIT_ASSERT(cache.Insert("fghijklmnopq", 5, 12));
+ UNIT_ASSERT_EQUAL(3, cache.Size());
+ }
}
TLogCache SetupCache(const TVector<std::pair<ui64, TString>>& content = {{5, "x"}, {1, "y"}, {10, "z"}}) {
@@ -207,15 +232,15 @@ Y_UNIT_TEST_SUITE(TLogCache) {
char buf[2] = {};
for (auto pair : content) {
- UNIT_ASSERT(cache.FindWithoutPromote(pair.first, 1, buf));
+ UNIT_ASSERT(cache.Find(pair.first, 1, buf));
UNIT_ASSERT_STRINGS_EQUAL(pair.second, buf);
}
for (auto pair : content) {
- UNIT_ASSERT(cache.Pop());
+ cache.EraseRange(pair.first, pair.first + 1);
- UNIT_ASSERT(!cache.FindWithoutPromote(pair.first, 1, buf));
+ UNIT_ASSERT(!cache.Find(pair.first, 1, buf));
}
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
index add4a820ac..374827cfaa 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
@@ -1148,13 +1148,22 @@ void TLogReader::Reply() {
PDisk->ProcessChunkOwnerMap(*ChunkOwnerMap.Get());
ChunkOwnerMap.Destroy();
+ // Remove invalid part of the last log chunk.
PDisk->BlockDevice->EraseCacheRange(
- PDisk->Format.Offset(ChunkIdx, 0),
+ PDisk->Format.Offset(ChunkIdx, SectorIdx) + OffsetInSector,
PDisk->Format.Offset(ChunkIdx + 1, 0)
);
}
LOG_DEBUG(*PDisk->ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " To ownerId# %" PRIu32 " %s",
(ui32)PDisk->PDiskId, (ui32)Owner, Result->ToString().c_str());
+
+ if (!IsInitial && Result->IsEndOfLog) {
+ ui32 prevChunkIdx = ChunkIdx;
+
+ // Finished reading owner's whole log.
+ PDisk->NotifyLogChunkRead(prevChunkIdx, Owner);
+ }
+
ActorSystem->Send(ReplyTo, Result.Release());
if (!IsInitial) {
PDisk->Mon.LogRead.CountResponse(ResultSize);
@@ -1296,6 +1305,13 @@ void TLogReader::UpdateNewChunkInfo(ui32 currChunk, const TMaybe<ui32> prevChunk
}
void TLogReader::SwitchToChunk(ui32 chunkIdx) {
+ if (!IsInitial) {
+ ui32 prevChunkIdx = ChunkIdx;
+
+ // Finished reading log chunk.
+ PDisk->NotifyLogChunkRead(prevChunkIdx, Owner);
+ }
+
ChunkIdx = chunkIdx;
SectorIdx = 0;
OffsetInSector = 0;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h
index 9318239f65..8c2568941b 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h
@@ -35,7 +35,7 @@ class TPDisk;
struct TLogChunkItem {
TChunkIdx ChunkIdx;
bool IsPreviousChunkDropped;
- bool IsPreviousChunkCut;;
+ bool IsPreviousChunkCut;
TLogChunkItem(TChunkIdx chunkIdx, bool isPreviousChunkDropped, bool isPreviousChunkCut)
: ChunkIdx(chunkIdx)
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
index a8ccc2d741..f0c95dae26 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
@@ -23,7 +23,7 @@ enum class EInitPhase {
};
enum EOwner {
- OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, mens "for dynamic" in requests
+ OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, means "for dynamic" in requests
OwnerUnallocated = 1, // Unallocated chunks, Trim scheduling, Slay commands
OwnerBeginUser = 2,
OwnerEndUser = 241,
@@ -338,6 +338,14 @@ struct TLogChunkInfo {
}
};
+struct TLogRecoveryState {
+ TMap<TChunkIdx, std::bitset<OwnerCount>> Readers; // Per-chunk information about future readers.
+
+ void Clear() {
+ Readers.clear();
+ }
+};
+
} // NPDisk
} // NKikimr
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
index 4d2f648eea..1f8ce61edb 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
@@ -466,6 +466,36 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
vdisk.SendEvLogSync();
}
+ Y_UNIT_TEST(PDiskRestartUsesLogRecoveryState) {
+ TActorTestContext testCtx({
+ .IsBad = false,
+ .ChunkSize = 4 * (1 << 20)
+ });
+ TVDiskMock vdisk(&testCtx);
+ vdisk.InitFull();
+
+ // Fill one log chunk so that on restart it is fully read.
+ for (int i = 0; i < 128; i++) {
+ vdisk.SendEvLogSync(32768);
+ }
+
+ testCtx.RestartPDiskSync();
+
+ vdisk.Init();
+
+ // Assert recovery state with readers 1.
+ testCtx.SafeRunOnPDisk([](NPDisk::TPDisk* disk) {
+ UNIT_ASSERT_EQUAL(1, disk->LogRecoveryState.Readers.size());
+ });
+
+ vdisk.ReadLog();
+
+ // Assert recovery state with readers 0.
+ testCtx.SafeRunOnPDisk([](NPDisk::TPDisk* disk) {
+ UNIT_ASSERT_EQUAL(0, disk->LogRecoveryState.Readers.size());
+ });
+ }
+
Y_UNIT_TEST(PDiskRestartManyLogWrites) {
TActorTestContext testCtx({ false });
testCtx.TestCtx.SectorMap->ImitateIoErrorProbability = 1e-4;