aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsenya0x5f <senya0x5f@yandex-team.com>2023-08-23 20:47:53 +0300
committersenya0x5f <senya0x5f@yandex-team.com>2023-08-23 21:01:24 +0300
commitff9687dd6ba1e81aa0e53a9a0967128ccd5a7fa5 (patch)
treec734e5328cda155c3057d6e0a9171eb9f1ac8e51
parentd326ef1ecaa6f9550e0049ff24a23ca1eded8df8 (diff)
downloadydb-ff9687dd6ba1e81aa0e53a9a0967128ccd5a7fa5.tar.gz
Revert KIKIMR-18858, KIKIMR-18909, KIKIMR-19060
Revert "KIKIMR-18858 Rework log cache" This reverts commit b58028fa3ca554bba9abed1629f1039da89112a7, reversing changes made to cbf32038a86630264f72c9a8119eacd21ad201b7. Revert "KIKIMR-18909 Rework log cache usage" This reverts commit 8f4f06d976833371d513f9c07994450210137955, reversing changes made to 935e0105f7284510e823565a26a371743c8301d9. Revert "KIKIMR-19060 Only write chunks with state LOG_COMMITTED to cache" This reverts commit 1d5c0b6ce22c8daae3093dbe9dc963e0704cd8f0, reversing changes made to b423abe9686de538512f110a49f0693733a632ae.
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp38
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h6
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h8
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp46
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp173
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h71
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp239
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp18
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h10
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp30
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp9
-rw-r--r--ydb/library/pdisk_io/wcache.cpp2
13 files changed, 115 insertions, 537 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
index e93ae143fc..e8e3976206 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
@@ -1214,19 +1214,18 @@ class TCachedBlockDevice : public TRealBlockDevice {
for (auto it = ReadsForOffset.begin(); it != ReadsForOffset.end(); it = nextIt) {
nextIt++;
TRead &read = it->second;
-
- bool foundInCache = Cache.Find(read.Offset, read.Size, static_cast<char*>(read.Data), [compAction=read.CompletionAction](auto badOffsets) {
- for (size_t i = 0; i < badOffsets.size(); ++i) {
- compAction->RegisterBadOffset(badOffsets[i]);
+ const TLogCache::TCacheRecord* cached = Cache.Find(read.Offset);
+ if (cached) {
+ if (read.Size <= cached->Data.Size()) {
+ memcpy(read.Data, cached->Data.GetData(), read.Size);
+ Mon.DeviceReadCacheHits->Inc();
+ Y_VERIFY(read.CompletionAction);
+ for (size_t i = 0; i < cached->BadOffsets.size(); ++i) {
+ read.CompletionAction->RegisterBadOffset(cached->BadOffsets[i]);
+ }
+ NoopAsyncHackForLogReader(read.CompletionAction, read.ReqId);
+ ReadsForOffset.erase(it);
}
- });
-
- if (foundInCache) {
- Mon.DeviceReadCacheHits->Inc();
- Y_VERIFY(read.CompletionAction);
-
- NoopAsyncHackForLogReader(read.CompletionAction, read.ReqId);
- ReadsForOffset.erase(it);
}
}
if (ReadsInFly >= MaxReadsInFly) {
@@ -1271,19 +1270,22 @@ public:
ui64 chunkIdx = offset / PDisk->Format.ChunkSize;
Y_VERIFY(chunkIdx < PDisk->ChunkState.size());
-
- if (TChunkState::LOG_COMMITTED == PDisk->ChunkState[chunkIdx].CommitState) {
+ if (TChunkState::DATA_COMMITTED == PDisk->ChunkState[chunkIdx].CommitState) {
if ((offset % PDisk->Format.ChunkSize) + completion->GetSize() > PDisk->Format.ChunkSize) {
// TODO: split buffer if crossing chunk boundary instead of completely discarding it
LOG_INFO_S(
*ActorSystem, NKikimrServices::BS_DEVICE,
"Skip caching log read due to chunk boundary crossing");
} else {
- if (Cache.Size() < MaxCount) {
- const char* dataPtr = static_cast<const char*>(completion->GetData());
-
- Cache.Insert(dataPtr, completion->GetOffset(), completion->GetSize(), completion->GetBadOffsets());
+ if (Cache.Size() >= MaxCount) {
+ Cache.Pop();
}
+ const char* dataPtr = static_cast<const char*>(completion->GetData());
+ Cache.Insert(
+ TLogCache::TCacheRecord(
+ completion->GetOffset(),
+ TRcBuf(TString(dataPtr, dataPtr + completion->GetSize())),
+ completion->GetBadOffsets()));
}
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h
index 7749b958fb..ebf8b8f71b 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h
@@ -329,7 +329,7 @@ struct TSysLogRecord {
TChunkIdx LogHeadChunkIdx;
ui32 Reserved1;
ui64 LogHeadChunkPreviousNonce;
- TVDiskID OwnerVDisks[OwnerCount];
+ TVDiskID OwnerVDisks[256];
TSysLogRecord()
: Version(PDISK_SYS_LOG_RECORD_VERSION_7)
@@ -337,7 +337,7 @@ struct TSysLogRecord {
, Reserved1(0)
, LogHeadChunkPreviousNonce((ui64)-1)
{
- for (size_t i = 0; i < OwnerCount; ++i) {
+ for (size_t i = 0; i < 256; ++i) {
OwnerVDisks[i] = TVDiskID::InvalidId;
}
}
@@ -354,7 +354,7 @@ struct TSysLogRecord {
str << " NonceSet# " << Nonces.ToString(isMultiline) << x;
str << " LogHeadChunkIdx# " << LogHeadChunkIdx << x;
str << " LogHeadChunkPreviousNonce# " << LogHeadChunkPreviousNonce << x;
- for (ui32 i = 0; i < OwnerCount; ++i) {
+ for (ui32 i = 0; i < 256; ++i) {
if (OwnerVDisks[i] != TVDiskID::InvalidId) {
str << " Owner[" << i << "]# " << OwnerVDisks[i].ToString() << x;
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
index a3feee5bfd..61a42556ef 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
@@ -135,7 +135,6 @@ public:
bool TrimInFly = false; // TChunkTrim request is present somewhere in pdisk
TAtomic ChunkBeingTrimmed = 0;
TAtomic TrimOffset = 0;
- TLogRecoveryState LogRecoveryState; // Recovery state: log chunk readers and log chunks order.
TList<TLogChunkInfo> LogChunks; // Log chunk list + log-specific information
bool IsLogChunksReleaseInflight = false;
ui64 InsaneLogChunks = 0; // Set when pdisk sees insanely large log, to give vdisks a chance to cut it
@@ -219,13 +218,6 @@ public:
bool InitCommonLogger();
bool LogNonceJump(ui64 previousNonce);
void GetStartingPoints(TOwner owner, TMap<TLogSignature, TLogRecord> &outStartingPoints);
-
- /**
- * Notifies that log chunk was read by a VDisk.
- * @param chunkIdx Chunk's index.
- * @param reader VDisk that read the chunk.
- */
- void NotifyLogChunkRead(ui32 chunkIdx, TOwner reader);
TString StartupOwnerInfo();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Destruction
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
index 0822196fe5..abdb0e0db0 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
@@ -44,7 +44,7 @@ bool TPDisk::InitCommonLogger() {
}
CommonLogger->SwitchToNewChunk(TReqId(TReqId::InitCommonLoggerSwitchToNewChunk, 0), nullptr);
- // Log chunk can be collected as soon as no one needs it
+ // Log chunk can be collected as soon as noone needs it
ChunkState[chunkIdx].CommitState = TChunkState::DATA_COMMITTED;
}
bool isOk = LogNonceJump(InitialPreviousNonce);
@@ -200,7 +200,6 @@ bool TPDisk::ProcessChunk0(const NPDisk::TEvReadLogResult &readLogResult, TStrin
<< " Marker# BPD48");
return false;
}
-
TSysLogRecord *sysLogRecord = (TSysLogRecord*)(lastSysLogRecord.data());
if (sysLogRecord->Version < PDISK_SYS_LOG_RECORD_INCOMPATIBLE_VERSION_1000) {
@@ -482,9 +481,10 @@ TRcBuf TPDisk::ProcessReadSysLogResult(ui64 &outWritePosition, ui64 &outLsn,
}
void TPDisk::ReadAndParseMainLog(const TActorId &pDiskActor) {
+ TVector<TLogChunkItem> chunksToRead;
TIntrusivePtr<TLogReaderBase> logReader(new TLogReader(true, this, ActorSystem, pDiskActor, 0, TLogPosition{0, 0},
EOwnerGroupType::Static, TLogPosition{0, 0}, (ui64)-1, SysLogRecord.LogHeadChunkPreviousNonce, 0, 0,
- TReqId(TReqId::ReadAndParseMainLog, 0), TVector<TLogChunkItem>(), 0, 0, TVDiskID::InvalidId));
+ TReqId(TReqId::ReadAndParseMainLog, 0), std::move(chunksToRead), 0, 0, TVDiskID::InvalidId));
TVector<ui64> badOffsets;
// Emits subrequests TCompletionLogReadPart which contains TIntrusivePtr to logReader
logReader->Exec(0, badOffsets, ActorSystem);
@@ -524,7 +524,6 @@ void TPDisk::ProcessLogReadQueue() {
}
ui32 endLogChunkIdx = CommonLogger->ChunkIdx;
ui64 endLogSectorIdx = CommonLogger->SectorIdx;
-
ownerData.LogReader = new TLogReader(false,
this, ActorSystem, logRead.Sender, logRead.Owner, logStartPosition,
logRead.OwnerGroupType,logRead.Position,
@@ -1201,7 +1200,7 @@ void TPDisk::OnLogCommitDone(TLogCommitDone &req) {
if (isChunkReleased) {
THolder<TCompletionEventSender> completion(new TCompletionEventSender(this));
if (ReleaseUnusedLogChunks(completion.Get())) {
- WriteSysLogRestorePoint(completion.Release(), req.ReqId, {});
+ WriteSysLogRestorePoint(completion.Release(), req.ReqId, {}); // FIXME: wilson
}
}
TryTrimChunk(false, 0);
@@ -1211,8 +1210,6 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) {
TGuard<TMutex> guard(StateMutex);
for (const auto& chunkIdx : req.ChunksToRelease) {
- LogRecoveryState.Readers.erase(chunkIdx);
-
BlockDevice->EraseCacheRange(
Format.Offset(chunkIdx, 0),
Format.Offset(chunkIdx + 1, 0));
@@ -1250,28 +1247,6 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) {
}
}
-void TPDisk::NotifyLogChunkRead(ui32 chunkIdx, TOwner reader) {
- TGuard<TMutex> guard(StateMutex);
- auto iter = LogRecoveryState.Readers.find(chunkIdx);
-
- if (iter == LogRecoveryState.Readers.end()) {
- return;
- }
-
- auto &readers = iter->second;
-
- if (readers.any()) {
- // If there's at least one registered reader.
- readers.set(reader, false);
-
- if (readers.none()) {
- LogRecoveryState.Readers.erase(iter);
-
- BlockDevice->EraseCacheRange(Format.Offset(chunkIdx, 0), Format.Offset(chunkIdx + 1, 0));
- }
- }
-}
-
// Schedules EvReadLogResult event for the system log
void TPDisk::InitiateReadSysLog(const TActorId &pDiskActor) {
Y_VERIFY_S(PDiskThread.Running(), "expect PDiskThread to be running");
@@ -1319,7 +1294,6 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
switch (InitPhase) {
case EInitPhase::ReadingSysLog:
{
- // Finished reading sys log.
TString errorReason;
bool success = ProcessChunk0(evReadLogResult, errorReason);
@@ -1341,7 +1315,6 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
}
case EInitPhase::ReadingLog:
{
- // Finished reading main log.
InitialLogPosition = evReadLogResult.NextPosition;
if (InitialLogPosition == TLogPosition{0, 0}) {
*Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialCommonLogParseError;
@@ -1430,17 +1403,6 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
ActorSystem->Send(pDiskActor, new TEvLogInitResult(false, errorReason));
return;
}
-
- // For every log chunk build a list of readers. These readers will be reading this chunk.
- for (auto it = LogChunks.begin(); it != LogChunks.end(); ++it) {
- auto &readers = LogRecoveryState.Readers[it->ChunkIdx];
-
- for (ui32 owner = 0; owner < it->OwnerLsnRange.size(); ++owner) {
- if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) {
- readers.set(owner, true);
- }
- }
- }
}
// Increase Nonces to prevent collisions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp
index 48de6dfcfc..7d225c2efa 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.cpp
@@ -15,167 +15,54 @@ TLogCache::TCacheRecord::TCacheRecord(TCacheRecord&& other)
, BadOffsets(std::move(other.BadOffsets))
{}
-size_t TLogCache::Size() const {
- return Index.size();
-}
+TLogCache::TItem::TItem(TItem&& other)
+ : Value(std::move(other.Value))
+{}
-template <typename C>
-typename C::iterator
-FindKeyLess(C& c, const typename C::key_type& key) {
- auto iter = c.lower_bound(key);
-
- if (iter == c.begin()) {
- return c.end();
- }
+TLogCache::TItem::TItem(TCacheRecord&& value)
+ : Value(std::move(value))
+{}
- return --iter;
+size_t TLogCache::Size() const {
+ return Index.size();
}
-template <typename C>
-typename C::iterator
-FindKeyLessEqual(C& c, const typename C::key_type& key) {
- auto iter = c.upper_bound(key);
-
- if (iter == c.begin()) {
- return c.end();
+const TLogCache::TCacheRecord* TLogCache::Find(ui64 offset) {
+ auto indexIt = Index.find(offset);
+ if (indexIt == Index.end()) {
+ return nullptr;
}
- return --iter;
+ List.PushFront(&indexIt->second);
+ return &indexIt->second.Value;
}
-bool TLogCache::Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func) {
- TVector<TCacheRecord*> res;
-
- auto indexIt = FindKeyLessEqual(Index, offset);
-
+const TLogCache::TCacheRecord* TLogCache::FindWithoutPromote(ui64 offset) const {
+ auto indexIt = Index.find(offset);
if (indexIt == Index.end()) {
- return false;
- }
-
- ui64 cur = offset;
- ui64 end = offset + size;
-
- while (indexIt != Index.end() && cur < end) {
- ui64 recStart = indexIt->first;
- ui64 recEnd = recStart + indexIt->second.Data.Size();
-
- if (cur >= recStart && cur < recEnd) {
- res.push_back(&indexIt->second);
- } else {
- return false;
- }
-
- cur = recEnd;
-
- indexIt++;
+ return nullptr;
}
+
+ return &indexIt->second.Value;
+}
- if (cur < end) {
+bool TLogCache::Pop() {
+ if (Index.empty())
return false;
- }
-
- for (auto cacheRecord : res) {
- ui64 recStart = cacheRecord->Offset;
- ui64 recEnd = recStart + cacheRecord->Data.Size();
-
- // Determine the buffer's chunk start and end absolute offsets.
- ui64 chunkStartOffset = std::max(recStart, offset);
- ui64 chunkEndOffset = std::min(recEnd, offset + size);
- ui64 chunkSize = chunkEndOffset - chunkStartOffset;
-
- // Calculate the chunk's position within the buffer to start copying.
- ui64 chunkOffset = chunkStartOffset - recStart;
-
- // Copy the chunk data to the buffer.
- std::memcpy(buffer + (chunkStartOffset - offset), cacheRecord->Data.Data() + chunkOffset, chunkSize);
-
- // Notify callee of bad offsets.
- func(cacheRecord->BadOffsets);
- }
+ TItem* item = List.PopBack();
+ Index.erase(item->Value.Offset);
return true;
}
-std::pair<i64, i64> TLogCache::PrepareInsertion(ui64 start, ui32 size) {
- ui64 end = start + size;
- ui32 leftPadding = 0;
- ui32 rightPadding = 0;
-
- // Check if there is a block that overlaps with the new insertion's start.
- auto it1 = FindKeyLessEqual(Index, start);
- if (it1 != Index.end()) {
- ui64 maybeStart = it1->first;
- ui64 maybeEnd = maybeStart + it1->second.Data.Size();
-
- if (start < maybeEnd) {
- if (end <= maybeEnd) {
- return {-1, -1}; // There is an overlapping block; return {-1, -1} to indicate it.
- }
- leftPadding = maybeEnd - start;
- }
- }
-
- ui64 offsetStart = start + leftPadding;
-
- // Check if there is a block that overlaps with the new insertion's end.
- auto it2 = FindKeyLess(Index, end);
- if (it2 != Index.end()) {
- ui64 dataSize = it2->second.Data.Size();
-
- ui64 maybeStart = it2->first;
- ui64 maybeEnd = maybeStart + dataSize;
-
- if (offsetStart == maybeStart) {
- // There is an overlapping block; return {-1, -1} to indicate it.
- if (end <= maybeEnd) {
- return {-1, -1};
- }
-
- leftPadding += dataSize;
- }
-
- if (end < maybeEnd) {
- rightPadding = end - maybeStart;
- }
- }
-
- ui64 offsetEnd = start + (size - rightPadding);
-
- // Remove any blocks that are completely covered by the new insertion.
- auto it = Index.upper_bound(offsetStart);
- while (it != Index.end()) {
- ui64 blockEnd = it->first + it->second.Data.Size();
- if (blockEnd < offsetEnd) {
- it = Index.erase(it);
- } else {
- break;
- }
- }
-
- return {leftPadding, rightPadding};
+bool TLogCache::Insert(TCacheRecord&& value) {
+ auto [it, inserted] = Index.try_emplace(value.Offset, std::move(value));
+ List.PushFront(&it->second);
+ return inserted;
}
-bool TLogCache::Insert(const char* dataPtr, ui64 offset, ui32 size, const TVector<ui64>& badOffsets) {
- auto [leftPadding, rightPadding] = PrepareInsertion(offset, size);
-
- if (leftPadding == -1 && rightPadding == -1) {
- return false;
- }
-
- auto dataStart = dataPtr + leftPadding;
- auto dataEnd = dataPtr + (size - rightPadding);
-
- Y_VERIFY_DEBUG(dataStart < dataEnd);
-
- auto [it, inserted] = Index.try_emplace(offset + leftPadding, std::move(TLogCache::TCacheRecord(
- offset + leftPadding,
- TRcBuf(TString(dataStart, dataEnd)),
- badOffsets)
- ));
-
- Y_VERIFY_DEBUG(inserted);
-
- return true;
+size_t TLogCache::Erase(ui64 offset) {
+ return Index.erase(offset);
}
size_t TLogCache::EraseRange(ui64 begin, ui64 end) {
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h
index bf46ab0149..c3d91dcdd2 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache.h
@@ -8,11 +8,10 @@ namespace NKikimr {
namespace NPDisk {
/**
- * Key-value cache without automatic eviction, but able to erase range of keys.
- * Entries do not intersect with each other.
- */
+ * Key-value LRU cache without automatic eviction, but able to erase range of keys.
+ **/
class TLogCache {
-private:
+public:
struct TCacheRecord {
ui64 Offset = 0;
TRcBuf Data;
@@ -23,60 +22,32 @@ private:
TCacheRecord(ui64 offset, TRcBuf data, TVector<ui64> badOffsets);
};
- using TIndex = TMap<ui64, TCacheRecord>;
-
-public:
- using TBadOffsetsHandler = std::function<void(const std::vector<ui64>&)>;
-
- /**
- * Gets the current size of the cache.
- */
- size_t Size() const;
+private:
+ struct TItem : public TIntrusiveListItem<TItem> {
+ TCacheRecord Value;
- /**
- * Finds a cache record by its offset and a specified size, copies the data to the buffer.
- * @param offset The offset key to search for.
- * @param size The size of data to copy.
- * @param buffer The buffer to store the copied data.
- * @param func Optional custom function to handle bad offsets.
- * @return True if the cache record is found and data is copied; otherwise, false.
- */
- bool Find(ui64 offset, ui32 size, char* buffer, TBadOffsetsHandler func = [](const std::vector<ui64>&) {});
+ // custom constructors ignoring TIntrusiveListItem
+ TItem(TItem&& other);
+ explicit TItem(TCacheRecord&& value);
+ };
- /**
- * Inserts a new cache record into the cache.
- * @param dataPtr Pointer to the data to be inserted.
- * @param offset The offset key for the new cache record.
- * @param size The size of the data.
- * @param badOffsets Optional vector of bad offsets associated with the cache record.
- * @return True if the insertion was successful; otherwise, false (e.g., due to data being already cached).
- */
- bool Insert(const char* dataPtr, ui64 offset, ui32 size, const TVector<ui64>& badOffsets = {});
+ using TListType = TIntrusiveList<TItem>;
+ using TIndex = TMap<ui64, TItem>;
- /**
- * Erases a range of cache records from the cache.
- * @param begin The beginning of the range (inclusive).
- * @param end The end of the range (exclusive).
- * @return The number of cache records erased.
- */
- size_t EraseRange(ui64 begin, ui64 end);
+public:
+ size_t Size() const;
+ const TCacheRecord* Find(ui64 offset);
+ const TCacheRecord* FindWithoutPromote(ui64 offset) const;
- /**
- * Clears the entire cache, removing all cache records.
- */
+ bool Pop();
+ bool Insert(TCacheRecord&& value);
+ size_t Erase(ui64 offset);
+ size_t EraseRange(ui64 begin, ui64 end); // erases range [begin, end)
void Clear();
private:
+ TListType List;
TIndex Index;
-
- /**
- * Prepares for insertion of a new cache record and calculates the left and right paddings for the data being inserted if parts of the data
- * is already in the cache.
- * @param offset The offset key for the new cache record.
- * @param size The size of the data.
- * @return A pair of i64 values representing left and right data paddings.
- */
- std::pair<i64, i64> PrepareInsertion(ui64 offset, ui32 size);
};
} // NPDisk
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp
index dd072ab91e..ba0d99e641 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_log_cache_ut.cpp
@@ -6,241 +6,64 @@ namespace NKikimr {
namespace NPDisk {
Y_UNIT_TEST_SUITE(TLogCache) {
+ TLogCache::TCacheRecord MakeRecord(ui64 offset, TString str) {
+ return TLogCache::TCacheRecord(
+ offset,
+ TRcBuf(TString(str.c_str(), str.c_str() + str.size() + 1)),
+ {});
+ }
+
Y_UNIT_TEST(Simple) {
TLogCache cache;
- char buf[2] = {};
- UNIT_ASSERT(cache.Insert("a", 1, 1));
- UNIT_ASSERT(cache.Insert("b", 2, 1));
+ UNIT_ASSERT(cache.Insert(MakeRecord(1, "a")));
+ UNIT_ASSERT(cache.Insert(MakeRecord(2, "b")));
UNIT_ASSERT_EQUAL(cache.Size(), 2);
- UNIT_ASSERT(cache.Find(1, 1, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "a");
+ UNIT_ASSERT_STRINGS_EQUAL(cache.Find(1)->Data.GetData(), "a");
- UNIT_ASSERT(cache.Insert("c", 3, 1));
- UNIT_ASSERT_EQUAL(1, cache.EraseRange(2, 3)); // 2 was removed
+ UNIT_ASSERT(cache.Insert(MakeRecord(3, "c")));
+ UNIT_ASSERT(cache.Pop()); // 2 must be evicted
UNIT_ASSERT_EQUAL(cache.Size(), 2);
- UNIT_ASSERT(!cache.Find(2, 1, buf));
- UNIT_ASSERT(cache.Find(3, 1, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "c");
+ UNIT_ASSERT_EQUAL(nullptr, cache.Find(2));
+ UNIT_ASSERT_STRINGS_EQUAL(cache.Find(3)->Data.GetData(), "c");
- UNIT_ASSERT_EQUAL(1, cache.EraseRange(1, 2)); // 1 was removed
- UNIT_ASSERT(cache.Insert("d", 4, 1));
+ UNIT_ASSERT(cache.Pop()); // 1 must be evicted
+ UNIT_ASSERT(cache.Insert(MakeRecord(4, "d")));
UNIT_ASSERT_EQUAL(cache.Size(), 2);
- UNIT_ASSERT(!cache.Find(1, 1, buf));
- UNIT_ASSERT(cache.Find(4, 1, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "d");
+ UNIT_ASSERT_EQUAL(nullptr, cache.Find(1));
+ UNIT_ASSERT_STRINGS_EQUAL(cache.Find(4)->Data.GetData(), "d");
- UNIT_ASSERT_EQUAL(1, cache.EraseRange(3, 4)); // 3 was removed
+ UNIT_ASSERT(cache.Pop()); // 3 must be evicted
UNIT_ASSERT_EQUAL(cache.Size(), 1);
- UNIT_ASSERT(!cache.Find(3, 1, buf));
- UNIT_ASSERT(cache.Find(4, 1, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "d");
+ UNIT_ASSERT_EQUAL(nullptr, cache.Find(3));
+ UNIT_ASSERT_STRINGS_EQUAL(cache.Find(4)->Data.GetData(), "d");
-
- UNIT_ASSERT_EQUAL(1, cache.EraseRange(4, 5));
+ UNIT_ASSERT_EQUAL(0, cache.Erase(3));
+ UNIT_ASSERT_EQUAL(1, cache.Erase(4));
+ UNIT_ASSERT_EQUAL(cache.Size(), 0);
+ UNIT_ASSERT(!cache.Pop());
UNIT_ASSERT_EQUAL(cache.Size(), 0);
- }
-
- Y_UNIT_TEST(DoubleInsertion) {
- TLogCache cache;
-
- char buf[5] = {};
-
- auto checkFn = [&]() {
- UNIT_ASSERT_EQUAL(25, cache.Size());
-
- for (int i = 0; i < 100; i += 4) {
- UNIT_ASSERT(cache.Find(i, 4, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "abcd");
- }
- };
-
- for (int i = 0; i < 100; i += 4) {
- UNIT_ASSERT(cache.Insert("abcd", i, 4));
- }
-
- checkFn();
-
- for (int i = 0; i < 100; i += 4) {
- UNIT_ASSERT(!cache.Insert("abcd", i, 4));
- }
-
- checkFn();
- }
-
- Y_UNIT_TEST(FullyOverlapping) {
- TLogCache cache;
-
- cache.Insert("abcd", 0, 4);
- UNIT_ASSERT_EQUAL(1, cache.Size());
-
- UNIT_ASSERT(!cache.Insert("bc", 1, 2));
- UNIT_ASSERT_EQUAL(1, cache.Size());
-
- char buf[2] = {};
- UNIT_ASSERT(cache.Find(3, 1, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "d");
- }
-
- Y_UNIT_TEST(BetweenTwoEntries) {
- {
- TLogCache cache;
-
- UNIT_ASSERT(cache.Insert("abcd", 0, 4));
- UNIT_ASSERT_EQUAL(1, cache.Size());
- UNIT_ASSERT(cache.Insert("ijkl", 8, 4));
- UNIT_ASSERT_EQUAL(2, cache.Size());
- UNIT_ASSERT(cache.Insert("efgh", 4, 4));
- UNIT_ASSERT_EQUAL(3, cache.Size());
-
- char buf[5] = {};
- UNIT_ASSERT(cache.Find(4, 4, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "efgh");
- }
-
- {
- TLogCache cache;
-
- UNIT_ASSERT(cache.Insert("abcd", 0, 4));
- UNIT_ASSERT_EQUAL(1, cache.Size());
- UNIT_ASSERT(cache.Insert("ijkl", 8, 4));
- UNIT_ASSERT_EQUAL(2, cache.Size());
- UNIT_ASSERT(cache.Insert("defghi", 3, 6));
- UNIT_ASSERT_EQUAL(3, cache.Size());
-
- char buf[5] = {};
- UNIT_ASSERT(cache.Find(4, 4, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "efgh");
- }
-
- {
- TLogCache cache;
-
- UNIT_ASSERT(cache.Insert("abcd", 0, 4));
- UNIT_ASSERT_EQUAL(1, cache.Size());
- UNIT_ASSERT(cache.Insert("ijkl", 8, 4));
- UNIT_ASSERT_EQUAL(2, cache.Size());
- UNIT_ASSERT(cache.Insert("efgh", 4, 4));
- UNIT_ASSERT_EQUAL(3, cache.Size());
-
- char buf[7] = {};
- UNIT_ASSERT(cache.Find(3, 6, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "defghi");
- }
-
- {
- TLogCache cache;
-
- UNIT_ASSERT(cache.Insert("abcd", 0, 4));
- UNIT_ASSERT_EQUAL(1, cache.Size());
- UNIT_ASSERT(cache.Insert("ijkl", 8, 4));
- UNIT_ASSERT_EQUAL(2, cache.Size());
- UNIT_ASSERT(cache.Insert("defghi", 3, 6));
- UNIT_ASSERT_EQUAL(3, cache.Size());
-
- char buf[7] = {};
- UNIT_ASSERT(cache.Find(3, 6, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "defghi");
- }
- }
-
- Y_UNIT_TEST(NoDuplicates) {
- {
- TLogCache cache;
-
- UNIT_ASSERT(cache.Insert("abcd", 0, 4));
- UNIT_ASSERT_EQUAL(1, cache.Size());
- UNIT_ASSERT(cache.Insert("def", 3, 3));
- UNIT_ASSERT_EQUAL(2, cache.Size());
-
- char buf[2] = {};
- UNIT_ASSERT(cache.Find(3, 1, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "d");
-
- char buf2[3] = {};
- UNIT_ASSERT(cache.Find(3, 2, buf2));
- UNIT_ASSERT_STRINGS_EQUAL(buf2, "de");
-
- char buf3[11] = {};
- UNIT_ASSERT(!cache.Find(3, 10, buf3));
- UNIT_ASSERT_STRINGS_EQUAL(buf3, "");
- }
-
- {
- TLogCache cache;
-
- UNIT_ASSERT(cache.Insert("def", 3, 3));
- UNIT_ASSERT_EQUAL(1, cache.Size());
- UNIT_ASSERT(cache.Insert("abcd", 0, 4));
- UNIT_ASSERT_EQUAL(2, cache.Size());
-
- char buf[2] = {};
- UNIT_ASSERT(cache.Find(3, 1, buf));
- UNIT_ASSERT_STRINGS_EQUAL(buf, "d");
-
- char buf2[5] = {};
- UNIT_ASSERT(cache.Find(0, 4, buf2));
- UNIT_ASSERT_STRINGS_EQUAL(buf2, "abcd");
-
- char buf3[11] = {};
- UNIT_ASSERT(!cache.Find(3, 10, buf3));
- UNIT_ASSERT_STRINGS_EQUAL(buf3, "");
- }
-
- {
- TLogCache cache;
-
- UNIT_ASSERT(cache.Insert("abcdefghij", 0, 10));
- UNIT_ASSERT_EQUAL(1, cache.Size());
-
- UNIT_ASSERT(cache.Insert("klmno", 10, 5));
- UNIT_ASSERT_EQUAL(2, cache.Size());
-
- UNIT_ASSERT(!cache.Insert("fghijklmno", 5, 10));
-
- UNIT_ASSERT_EQUAL(2, cache.Size());
- }
-
- {
- TLogCache cache;
-
- UNIT_ASSERT(cache.Insert("abcdefghij", 0, 10));
- UNIT_ASSERT_EQUAL(1, cache.Size());
-
- UNIT_ASSERT(cache.Insert("klmno", 10, 5));
- UNIT_ASSERT_EQUAL(2, cache.Size());
-
- UNIT_ASSERT(cache.Insert("fghijklmnopq", 5, 12));
- UNIT_ASSERT_EQUAL(3, cache.Size());
- }
}
TLogCache SetupCache(const TVector<std::pair<ui64, TString>>& content = {{5, "x"}, {1, "y"}, {10, "z"}}) {
TLogCache cache;
for (auto pair : content) {
- auto& data = pair.second;
-
- cache.Insert(data.c_str(), pair.first, data.Size());
+ cache.Insert(MakeRecord(pair.first, pair.second));
}
return cache;
};
void AssertCacheContains(TLogCache& cache, const TVector<std::pair<ui64, TString>>& content = {{5, "x"}, {1, "y"}, {10, "z"}}) {
UNIT_ASSERT_VALUES_EQUAL(content.size(), cache.Size());
-
- char buf[2] = {};
-
for (auto pair : content) {
- UNIT_ASSERT(cache.Find(pair.first, 1, buf));
-
- UNIT_ASSERT_STRINGS_EQUAL(pair.second, buf);
+ UNIT_ASSERT_STRINGS_EQUAL(
+ pair.second,
+ cache.FindWithoutPromote(pair.first)->Data.GetData());
}
-
for (auto pair : content) {
- cache.EraseRange(pair.first, pair.first + 1);
-
- UNIT_ASSERT(!cache.Find(pair.first, 1, buf));
+ UNIT_ASSERT(cache.Pop());
+ UNIT_ASSERT_EQUAL(nullptr, cache.FindWithoutPromote(pair.first));
}
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
index 374827cfaa..add4a820ac 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
@@ -1148,22 +1148,13 @@ void TLogReader::Reply() {
PDisk->ProcessChunkOwnerMap(*ChunkOwnerMap.Get());
ChunkOwnerMap.Destroy();
- // Remove invalid part of the last log chunk.
PDisk->BlockDevice->EraseCacheRange(
- PDisk->Format.Offset(ChunkIdx, SectorIdx) + OffsetInSector,
+ PDisk->Format.Offset(ChunkIdx, 0),
PDisk->Format.Offset(ChunkIdx + 1, 0)
);
}
LOG_DEBUG(*PDisk->ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " To ownerId# %" PRIu32 " %s",
(ui32)PDisk->PDiskId, (ui32)Owner, Result->ToString().c_str());
-
- if (!IsInitial && Result->IsEndOfLog) {
- ui32 prevChunkIdx = ChunkIdx;
-
- // Finished reading owner's whole log.
- PDisk->NotifyLogChunkRead(prevChunkIdx, Owner);
- }
-
ActorSystem->Send(ReplyTo, Result.Release());
if (!IsInitial) {
PDisk->Mon.LogRead.CountResponse(ResultSize);
@@ -1305,13 +1296,6 @@ void TLogReader::UpdateNewChunkInfo(ui32 currChunk, const TMaybe<ui32> prevChunk
}
void TLogReader::SwitchToChunk(ui32 chunkIdx) {
- if (!IsInitial) {
- ui32 prevChunkIdx = ChunkIdx;
-
- // Finished reading log chunk.
- PDisk->NotifyLogChunkRead(prevChunkIdx, Owner);
- }
-
ChunkIdx = chunkIdx;
SectorIdx = 0;
OffsetInSector = 0;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h
index 8c2568941b..9318239f65 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.h
@@ -35,7 +35,7 @@ class TPDisk;
struct TLogChunkItem {
TChunkIdx ChunkIdx;
bool IsPreviousChunkDropped;
- bool IsPreviousChunkCut;
+ bool IsPreviousChunkCut;;
TLogChunkItem(TChunkIdx chunkIdx, bool isPreviousChunkDropped, bool isPreviousChunkCut)
: ChunkIdx(chunkIdx)
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
index f0c95dae26..a8ccc2d741 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
@@ -23,7 +23,7 @@ enum class EInitPhase {
};
enum EOwner {
- OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, means "for dynamic" in requests
+ OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, mens "for dynamic" in requests
OwnerUnallocated = 1, // Unallocated chunks, Trim scheduling, Slay commands
OwnerBeginUser = 2,
OwnerEndUser = 241,
@@ -338,14 +338,6 @@ struct TLogChunkInfo {
}
};
-struct TLogRecoveryState {
- TMap<TChunkIdx, std::bitset<OwnerCount>> Readers; // Per-chunk information about future readers.
-
- void Clear() {
- Readers.clear();
- }
-};
-
} // NPDisk
} // NKikimr
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
index 19c5d32d32..661adf2fa4 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
@@ -466,36 +466,6 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
vdisk.SendEvLogSync();
}
- Y_UNIT_TEST(PDiskRestartUsesLogRecoveryState) {
- TActorTestContext testCtx({
- .IsBad = false,
- .ChunkSize = 4 * (1 << 20)
- });
- TVDiskMock vdisk(&testCtx);
- vdisk.InitFull();
-
- // Fill log so that one chunk has state LOG_COMMITED on restart.
- for (int i = 0; i < 256; i++) {
- vdisk.SendEvLogSync(32768);
- }
-
- testCtx.RestartPDiskSync();
-
- vdisk.Init();
-
- // Assert recovery state has 2 chunks to be read by vdisk.
- testCtx.SafeRunOnPDisk([](NPDisk::TPDisk* disk) {
- UNIT_ASSERT_EQUAL(2, disk->LogRecoveryState.Readers.size());
- });
-
- vdisk.ReadLog();
-
- // Assert recovery state is empty after vdisk read its chunks.
- testCtx.SafeRunOnPDisk([](NPDisk::TPDisk* disk) {
- UNIT_ASSERT_EQUAL(0, disk->LogRecoveryState.Readers.size());
- });
- }
-
Y_UNIT_TEST(PDiskRestartManyLogWrites) {
TActorTestContext testCtx({ false });
testCtx.TestCtx.SectorMap->ImitateIoErrorProbability = 1e-4;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp
index 7c574e26d9..4a79eaedfe 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp
@@ -12,16 +12,11 @@
namespace NKikimr {
TString PrepareData(ui32 size, ui32 flavor) {
- TString str = TString::Uninitialized(size);
-
- // Using char* enables possibility to vectorize the following loop.
- char* data = str.Detach();
-
+ TString data = TString::Uninitialized(size);
for (ui32 i = 0; i < size; ++i) {
data[i] = '0' + (i + size + flavor) % 8;
}
-
- return str;
+ return data;
}
TString StatusToString(const NKikimrProto::EReplyStatus status) {
diff --git a/ydb/library/pdisk_io/wcache.cpp b/ydb/library/pdisk_io/wcache.cpp
index 1877a3d09f..0fc52e79af 100644
--- a/ydb/library/pdisk_io/wcache.cpp
+++ b/ydb/library/pdisk_io/wcache.cpp
@@ -412,7 +412,7 @@ struct TIdentifyData {
static const ui32 IdentifySizeBytes = 512;
bool IsGathered = false;
ui8 Data[IdentifySizeBytes];
- // Offset in words, description, size bytes for strings
+ // Offset in words, descirption, size bytes for strings
// 10, serial number, 20 ASCII
// 23, firmware revision, 8 ASCII
// 27, Model number, 40 ASCII