diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2024-09-24 17:07:31 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-24 17:07:31 +0300 |
commit | 18174b04e8714e997619817309f894b524832178 (patch) | |
tree | 5b7df4abc96586156f7e2b5358e93b46915b9fc3 | |
parent | ca658e7ad94e3d3c539d09f8ab0c610ed56ce95d (diff) | |
download | ydb-18174b04e8714e997619817309f894b524832178.tar.gz |
Prepare to disable blob header by default in VDisk (#9683)
8 files changed, 105 insertions, 184 deletions
diff --git a/ydb/core/blobstorage/vdisk/common/disk_part.h b/ydb/core/blobstorage/vdisk/common/disk_part.h index 28fd4e81b3..25faf131bc 100644 --- a/ydb/core/blobstorage/vdisk/common/disk_part.h +++ b/ydb/core/blobstorage/vdisk/common/disk_part.h @@ -131,11 +131,7 @@ namespace NKikimr { } ui64 Hash() const { - ui64 x = 0; - x |= (ui64)ChunkIdx; - x <<= 32u; - x |= (ui64)Offset; - return x; + return MultiHash(ChunkIdx, Offset); } inline bool operator <(const TDiskPart &x) const { diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 10477c1c62..9a6962b9db 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -1252,6 +1252,7 @@ namespace NKikimr { if (record.GetIndexOnly()) str << " IndexOnly"; if (record.HasMsgQoS()) { + str << ' '; TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str); } str << " Notify# " << record.GetNotifyIfNotReady() diff --git a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h index 6d90131716..0f1a0085c7 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h +++ b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h @@ -379,7 +379,7 @@ namespace NKikimr { template <class TRecordMerger> void PutToMerger(const TMemRec &memRec, ui64 lsn, TRecordMerger *merger) { TKey key = It.GetValue().Key; - if (merger->HaveToMergeData() && memRec.HasData() && memRec.GetType() == TBlobType::MemBlob) { + if (merger->HaveToMergeData() && memRec.GetType() == TBlobType::MemBlob) { const TMemPart p = memRec.GetMemData(); const TRope& rope = Seg->GetLogoBlobData(p); merger->AddFromFresh(memRec, &rope, key, lsn); diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h index 59145f3078..cf6eba18ab 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h @@ -109,13 +109,11 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////// - // TCompactRecordMergerBase - //////////////////////////////////////////////////////////////////////////// // Valid call sequence: // Clear(); Add(); ... Add(); Finish() // GetMemRec(); GetData(); template <class TKey, class TMemRec> - class TCompactRecordMergerBase : public TRecordMergerBase<TKey, TMemRec> { + class TCompactRecordMerger : public TRecordMergerBase<TKey, TMemRec> { protected: using TBase = TRecordMergerBase<TKey, TMemRec>; using TBase::MemRec; @@ -132,11 +130,8 @@ namespace NKikimr { }; public: - TCompactRecordMergerBase(const TBlobStorageGroupType >ype, bool addHeader) + TCompactRecordMerger(const TBlobStorageGroupType >ype, bool addHeader) : TBase(gtype, true) - , MemRecs() - , ProducingSmallBlob(false) - , NeedToLoadData(ELoadData::NotSet) , AddHeader(addHeader) {} @@ -144,6 +139,7 @@ namespace NKikimr { TBase::Clear(); MemRecs.clear(); ProducingSmallBlob = false; + ProducingHugeBlob = false; NeedToLoadData = ELoadData::NotSet; DataMerger.Clear(); } @@ -156,51 +152,51 @@ namespace NKikimr { } void AddFromSegment(const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) { - Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet); - AddBasic(memRec, key); - switch (memRec.GetType()) { - case TBlobType::DiskBlob: { - if (memRec.HasData() && NeedToLoadData == ELoadData::LoadData) { - MemRecs.push_back(memRec); - ProducingSmallBlob = true; - } - break; - } - case TBlobType::HugeBlob: - case TBlobType::ManyHugeBlobs: { - TDiskDataExtractor extr; - memRec.GetDiskData(&extr, outbound); - const NMatrix::TVectorType v = memRec.GetLocalParts(GType); - DataMerger.AddHugeBlob(extr.Begin, extr.End, v, circaLsn); - break; - } - default: - Y_ABORT("Impossible case"); - } - VerifyConsistency(memRec, outbound); + Add(memRec, nullptr, outbound, key, circaLsn); } void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) { + Add(memRec, data, nullptr, key, lsn); + } + + void Add(const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) { Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet); AddBasic(memRec, key); - if (memRec.HasData()) { - if (data) { - Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob || memRec.GetType() == TBlobType::DiskBlob); - if (NeedToLoadData == ELoadData::LoadData) { - DataMerger.AddBlob(TDiskBlob(data, memRec.GetLocalParts(GType), GType, key.LogoBlobID())); - ProducingSmallBlob = true; - } else { - // intentionally do nothing: don't add any data to DataMerger, because we don't need it - } - } else { - Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob); - TDiskDataExtractor extr; - memRec.GetDiskData(&extr, nullptr); - const NMatrix::TVectorType v = memRec.GetLocalParts(GType); - DataMerger.AddHugeBlob(extr.Begin, extr.End, v, lsn); + if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) { + TDiskDataExtractor extr; + switch (memRec.GetType()) { + case TBlobType::MemBlob: + case TBlobType::DiskBlob: + if (NeedToLoadData == ELoadData::LoadData) { + if (data) { + // we have some data in-memory + DataMerger.AddBlob(TDiskBlob(data, local, GType, key.LogoBlobID())); + } + if (memRec.GetType() == TBlobType::DiskBlob) { + if (memRec.HasData()) { // there is something to read from the disk + MemRecs.push_back(memRec); + } else { // headerless metadata stored + static TRope emptyRope; + DataMerger.AddBlob(TDiskBlob(&emptyRope, local, GType, key.LogoBlobID())); + } + } + Y_DEBUG_ABORT_UNLESS(!ProducingHugeBlob); + ProducingSmallBlob = true; + } + break; + + case TBlobType::ManyHugeBlobs: + Y_ABORT_UNLESS(outbound); + [[fallthrough]]; + case TBlobType::HugeBlob: + memRec.GetDiskData(&extr, outbound); + DataMerger.AddHugeBlob(extr.Begin, extr.End, local, lsn); + Y_DEBUG_ABORT_UNLESS(!ProducingSmallBlob); + ProducingHugeBlob = true; + break; } } - VerifyConsistency(memRec, nullptr); + VerifyConsistency(memRec, outbound); } void VerifyConsistency(const TMemRec& memRec, const TDiskPart *outbound) { @@ -239,6 +235,13 @@ namespace NKikimr { } void Finish() { + if (NeedToLoadData == ELoadData::DontLoadData) { + Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger + // if we have huge blobs for the record, than we set TBlobType::HugeBlob or + // TBlobType::ManyHugeBlobs a few lines below + MemRec.SetNoBlob(); + } + Y_DEBUG_ABORT_UNLESS(!Empty()); VerifyConsistency(); @@ -263,118 +266,22 @@ namespace NKikimr { return &DataMerger; } - protected: - TStackVec<TMemRec, 16> MemRecs; - bool ProducingSmallBlob; - ELoadData NeedToLoadData; - TDataMerger DataMerger; - const bool AddHeader; - }; - - //////////////////////////////////////////////////////////////////////////// - // TCompactRecordMergerIndexPass - //////////////////////////////////////////////////////////////////////////// - template<typename TKey, typename TMemRec> - class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase<TKey, TMemRec> { - using TBase = TCompactRecordMergerBase<TKey, TMemRec>; - - using ELoadData = typename TBase::ELoadData; - - using TBase::MemRecs; - using TBase::ProducingSmallBlob; - using TBase::NeedToLoadData; - using TBase::DataMerger; - using TBase::MemRec; - - public: - TCompactRecordMergerIndexPass(const TBlobStorageGroupType >ype, bool addHeader) - : TBase(gtype, addHeader) - {} - - void Finish() { - if (NeedToLoadData == ELoadData::DontLoadData) { - Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger - // if we have huge blobs for the record, than we set TBlobType::HugeBlob or - // TBlobType::ManyHugeBlobs a few lines below - MemRec.SetNoBlob(); - } - - TBase::Finish(); - } - template<typename TCallback> void ForEachSmallDiskBlob(TCallback&& callback) { for (const auto& memRec : MemRecs) { callback(memRec); } } - }; - - //////////////////////////////////////////////////////////////////////////// - // TCompactRecordMergerDataPass - //////////////////////////////////////////////////////////////////////////// - template<typename TKey, typename TMemRec> - class TCompactRecordMergerDataPass : public TCompactRecordMergerBase<TKey, TMemRec> { - using TBase = TCompactRecordMergerBase<TKey, TMemRec>; - - using TBase::ProducingSmallBlob; - using TBase::MemRecs; - using TBase::MemRec; - using TBase::DataMerger; - using TBase::GType; - using TBase::SetLoadDataMode; - - public: - TCompactRecordMergerDataPass(const TBlobStorageGroupType >ype, bool addHeader) - : TBase(gtype, addHeader) - { - SetLoadDataMode(true); - } - void Clear() { - TBase::Clear(); - ReadSmallBlobs.clear(); - SetLoadDataMode(true); - } - - // add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger - void AddReadSmallBlob(TString data) { - Y_ABORT_UNLESS(ProducingSmallBlob); - ReadSmallBlobs.push_back(std::move(data)); - } - - void Finish() { - // ensure we are producing small blobs; otherwise this merger should never be created - Y_ABORT_UNLESS(ProducingSmallBlob); - - // add all read small blobs into blob merger - const size_t count = ReadSmallBlobs.size(); - Y_ABORT_UNLESS(count == +MemRecs, "count# %zu +MemRecs# %zu", count, +MemRecs); - for (size_t i = 0; i < count; ++i) { - const TMemRec& memRec = MemRecs[i]->GetMemRec(); - const TString& buffer = ReadSmallBlobs[i]; - Y_ABORT_UNLESS(buffer.size() == memRec.DataSize()); - DataMerger.AddBlob(TDiskBlob(buffer.data(), buffer.size(), memRec.GetLocalParts(GType))); - } - - - // ensure that data merger has small blob - Y_ABORT_UNLESS(DataMerger.HasSmallBlobs()); - - // finalize base class logic; it also generates blob record - TBase::Finish(); - - // ensure that we have generated correct DiskBlob with full set of declared parts - const TDiskBlob& blob = DataMerger.GetDiskBlobMerger().GetDiskBlob(); - Y_ABORT_UNLESS(blob.GetParts() == MemRec.GetLocalParts(GType)); - Y_ABORT_UNLESS(MemRec.GetType() == TBlobType::DiskBlob); - } - - private: - TVector<TString> ReadSmallBlobs; + protected: + TStackVec<TMemRec, 16> MemRecs; + bool ProducingSmallBlob = false; + bool ProducingHugeBlob = false; + ELoadData NeedToLoadData = ELoadData::NotSet; + TDataMerger DataMerger; + const bool AddHeader; }; - //////////////////////////////////////////////////////////////////////////// // TRecordMergerCallback //////////////////////////////////////////////////////////////////////////// @@ -412,9 +319,8 @@ namespace NKikimr { case 1: { if (memRec.GetType() == TBlobType::DiskBlob) { // don't deduplicate inplaced data - const TDiskPart &data = extr.SwearOne(); - if (data.ChunkIdx && data.Size) { - (*Callback)(data, v); + if (!v.Empty()) { + (*Callback)(extr.SwearOne(), v); } } else if (memRec.GetType() == TBlobType::HugeBlob) { Y_ABORT_UNLESS(v.CountBits() == 1u); @@ -446,20 +352,31 @@ namespace NKikimr { void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) { AddBasic(memRec, key); - if (memRec.HasData()) { - const NMatrix::TVectorType v = memRec.GetLocalParts(GType); - if (data) { - Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob); - // we have in-memory data in a rope, it always wins among other data, - // so we call Callback immediately and remove any data for this local part - // from LastWriteWinsMerger - (*Callback)(TDiskBlob(data, v, GType, key.LogoBlobID())); - } else { - Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob && v.CountBits() == 1u); - TDiskDataExtractor extr; - memRec.GetDiskData(&extr, nullptr); - // deduplicate huge blob - LastWriteWinsMerger.Add(extr.SwearOne(), v, lsn); + if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) { + TDiskDataExtractor extr; + static TRope rope; + switch (memRec.GetType()) { + case TBlobType::MemBlob: + // we have in-memory data in a rope, it always wins among other data, + // so we call Callback immediately and remove any data for this local part + // from LastWriteWinsMerger + Y_ABORT_UNLESS(data); // HaveToMergeData is true, so data must be present + (*Callback)(TDiskBlob(data, local, GType, key.LogoBlobID())); + break; + + case TBlobType::DiskBlob: + Y_ABORT_UNLESS(!memRec.HasData()); + (*Callback)(TDiskBlob(&rope, local, GType, key.LogoBlobID())); // pure metadata parts only + break; + + case TBlobType::HugeBlob: + Y_ABORT_UNLESS(local.CountBits() == 1); + memRec.GetDiskData(&extr, nullptr); + LastWriteWinsMerger.Add(extr.SwearOne(), local, lsn); + break; + + case TBlobType::ManyHugeBlobs: + Y_ABORT("unexpected case"); } } } diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp index 3660b27fcb..385996f473 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp @@ -20,11 +20,11 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////////////////// typedef TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob> TSstLogoBlob; typedef TSstLogoBlob::TWriter TWriterLogoBlob; - typedef TCompactRecordMergerIndexPass<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger; + typedef TCompactRecordMerger<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger; typedef TLevelSegment<TKeyBlock, TMemRecBlock> TSstBlock; typedef TSstBlock::TWriter TWriterBlock; - typedef TCompactRecordMergerIndexPass<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger; + typedef TCompactRecordMerger<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger; TTestContexts TestCtx; diff --git a/ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp b/ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp index c6e072950c..d2eb83b751 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp +++ b/ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp @@ -32,7 +32,13 @@ namespace NKikimr { "Db# LogoBlobs action# add_data mode# %s id# %s lsn# %" PRIu64 " bufSize# %" PRIu32, OpMode2Str(mode), id.ToString().data(), lsn, ui32(buffer.GetSize()))); - HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), partId, ingress, std::move(buffer)); + if (buffer) { + HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), partId, ingress, std::move(buffer)); + } else { + const TBlobStorageGroupType gtype = HullDs->HullCtx->VCtx->Top->GType; + Y_DEBUG_ABORT_UNLESS(!gtype.PartSize(TLogoBlobID(id, partId))); + HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), TMemRecLogoBlob(ingress)); + } } void THullDbRecovery::ReplayAddLogoBlobCmd( diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h index 7ebfb73f63..b2bb2e1eec 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h @@ -27,8 +27,7 @@ namespace NKikimr { using TGcMapIterator = typename TGcMap::TIterator; // compaction record merger - using TCompactRecordMergerIndexPass = NKikimr::TCompactRecordMergerIndexPass<TKey, TMemRec>; - using TCompactRecordMergerDataPass = NKikimr::TCompactRecordMergerDataPass<TKey, TMemRec>; + using TCompactRecordMerger = NKikimr::TCompactRecordMerger<TKey, TMemRec>; // level segment using TLevelSegment = NKikimr::TLevelSegment<TKey, TMemRec>; @@ -131,7 +130,7 @@ namespace NKikimr { TDeque<TChunkIdx> AllocatedChunks; // record merger for compaction - TCompactRecordMergerIndexPass IndexMerger; + TCompactRecordMerger IndexMerger; // current handoff-transformed item const TTransformedItem *TransformedItem = nullptr; diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp index 3db0d88877..7aff723af2 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp @@ -64,17 +64,19 @@ namespace NKikimr { if (QueryPartId == 0 || QueryPartId == i + 1) { FoundAnything = true; auto& tmpItem = TmpItems[i]; - if ((QueryShift >= partSize && partSize) || QuerySize > partSize - QueryShift) { + if ((partSize ? QueryShift >= partSize : QueryShift) || QuerySize > partSize - QueryShift) { tmpItem.UpdateWithError(partId, Cookie); + } else if (!partSize) { + tmpItem.UpdateWithMemItem(partId, Cookie, TRope()); } else if (tmpItem.ShouldUpdateWithDisk()) { const ui32 size = QuerySize ? QuerySize : partSize - QueryShift; - if (!size) { // for metadata reads - tmpItem.UpdateWithMemItem(partId, Cookie, TRope()); - } else { - tmpItem.UpdateWithDiskItem(partId, Cookie, TDiskPart(data.ChunkIdx, partOffs + QueryShift, size)); - } + Y_DEBUG_ABORT_UNLESS(size); + tmpItem.UpdateWithDiskItem(partId, Cookie, TDiskPart(data.ChunkIdx, partOffs + QueryShift, size)); } } + if (QueryPartId && QueryPartId <= i + 1) { + break; + } partOffs += partSize; } } @@ -97,7 +99,7 @@ namespace NKikimr { if (QueryPartId == 0 || QueryPartId == partId) { FoundAnything = true; auto& item = TmpItems[partId - 1]; - if ((QueryShift >= partSize && partSize) || QuerySize > partSize - QueryShift) { + if ((partSize ? QueryShift >= partSize : QueryShift) || QuerySize > partSize - QueryShift) { item.UpdateWithError(blobId, Cookie); } else if (item.ShouldUpdateWithMem()) { const ui32 size = QuerySize ? QuerySize : partSize - QueryShift; |