aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2024-09-24 17:07:31 +0300
committerGitHub <noreply@github.com>2024-09-24 17:07:31 +0300
commit18174b04e8714e997619817309f894b524832178 (patch)
tree5b7df4abc96586156f7e2b5358e93b46915b9fc3
parentca658e7ad94e3d3c539d09f8ab0c610ed56ce95d (diff)
downloadydb-18174b04e8714e997619817309f894b524832178.tar.gz
Prepare to disable blob header by default in VDisk (#9683)
-rw-r--r--ydb/core/blobstorage/vdisk/common/disk_part.h6
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h1
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h2
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h247
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp4
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp8
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h5
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readbatch.cpp16
8 files changed, 105 insertions, 184 deletions
diff --git a/ydb/core/blobstorage/vdisk/common/disk_part.h b/ydb/core/blobstorage/vdisk/common/disk_part.h
index 28fd4e81b3..25faf131bc 100644
--- a/ydb/core/blobstorage/vdisk/common/disk_part.h
+++ b/ydb/core/blobstorage/vdisk/common/disk_part.h
@@ -131,11 +131,7 @@ namespace NKikimr {
}
ui64 Hash() const {
- ui64 x = 0;
- x |= (ui64)ChunkIdx;
- x <<= 32u;
- x |= (ui64)Offset;
- return x;
+ return MultiHash(ChunkIdx, Offset);
}
inline bool operator <(const TDiskPart &x) const {
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 10477c1c62..9a6962b9db 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -1252,6 +1252,7 @@ namespace NKikimr {
if (record.GetIndexOnly())
str << " IndexOnly";
if (record.HasMsgQoS()) {
+ str << ' ';
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << " Notify# " << record.GetNotifyIfNotReady()
diff --git a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h
index 6d90131716..0f1a0085c7 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h
@@ -379,7 +379,7 @@ namespace NKikimr {
template <class TRecordMerger>
void PutToMerger(const TMemRec &memRec, ui64 lsn, TRecordMerger *merger) {
TKey key = It.GetValue().Key;
- if (merger->HaveToMergeData() && memRec.HasData() && memRec.GetType() == TBlobType::MemBlob) {
+ if (merger->HaveToMergeData() && memRec.GetType() == TBlobType::MemBlob) {
const TMemPart p = memRec.GetMemData();
const TRope& rope = Seg->GetLogoBlobData(p);
merger->AddFromFresh(memRec, &rope, key, lsn);
diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h
index 59145f3078..cf6eba18ab 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h
@@ -109,13 +109,11 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
- // TCompactRecordMergerBase
- ////////////////////////////////////////////////////////////////////////////
// Valid call sequence:
// Clear(); Add(); ... Add(); Finish()
// GetMemRec(); GetData();
template <class TKey, class TMemRec>
- class TCompactRecordMergerBase : public TRecordMergerBase<TKey, TMemRec> {
+ class TCompactRecordMerger : public TRecordMergerBase<TKey, TMemRec> {
protected:
using TBase = TRecordMergerBase<TKey, TMemRec>;
using TBase::MemRec;
@@ -132,11 +130,8 @@ namespace NKikimr {
};
public:
- TCompactRecordMergerBase(const TBlobStorageGroupType &gtype, bool addHeader)
+ TCompactRecordMerger(const TBlobStorageGroupType &gtype, bool addHeader)
: TBase(gtype, true)
- , MemRecs()
- , ProducingSmallBlob(false)
- , NeedToLoadData(ELoadData::NotSet)
, AddHeader(addHeader)
{}
@@ -144,6 +139,7 @@ namespace NKikimr {
TBase::Clear();
MemRecs.clear();
ProducingSmallBlob = false;
+ ProducingHugeBlob = false;
NeedToLoadData = ELoadData::NotSet;
DataMerger.Clear();
}
@@ -156,51 +152,51 @@ namespace NKikimr {
}
void AddFromSegment(const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
- Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
- AddBasic(memRec, key);
- switch (memRec.GetType()) {
- case TBlobType::DiskBlob: {
- if (memRec.HasData() && NeedToLoadData == ELoadData::LoadData) {
- MemRecs.push_back(memRec);
- ProducingSmallBlob = true;
- }
- break;
- }
- case TBlobType::HugeBlob:
- case TBlobType::ManyHugeBlobs: {
- TDiskDataExtractor extr;
- memRec.GetDiskData(&extr, outbound);
- const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
- DataMerger.AddHugeBlob(extr.Begin, extr.End, v, circaLsn);
- break;
- }
- default:
- Y_ABORT("Impossible case");
- }
- VerifyConsistency(memRec, outbound);
+ Add(memRec, nullptr, outbound, key, circaLsn);
}
void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
+ Add(memRec, data, nullptr, key, lsn);
+ }
+
+ void Add(const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
AddBasic(memRec, key);
- if (memRec.HasData()) {
- if (data) {
- Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob || memRec.GetType() == TBlobType::DiskBlob);
- if (NeedToLoadData == ELoadData::LoadData) {
- DataMerger.AddBlob(TDiskBlob(data, memRec.GetLocalParts(GType), GType, key.LogoBlobID()));
- ProducingSmallBlob = true;
- } else {
- // intentionally do nothing: don't add any data to DataMerger, because we don't need it
- }
- } else {
- Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob);
- TDiskDataExtractor extr;
- memRec.GetDiskData(&extr, nullptr);
- const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
- DataMerger.AddHugeBlob(extr.Begin, extr.End, v, lsn);
+ if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
+ TDiskDataExtractor extr;
+ switch (memRec.GetType()) {
+ case TBlobType::MemBlob:
+ case TBlobType::DiskBlob:
+ if (NeedToLoadData == ELoadData::LoadData) {
+ if (data) {
+ // we have some data in-memory
+ DataMerger.AddBlob(TDiskBlob(data, local, GType, key.LogoBlobID()));
+ }
+ if (memRec.GetType() == TBlobType::DiskBlob) {
+ if (memRec.HasData()) { // there is something to read from the disk
+ MemRecs.push_back(memRec);
+ } else { // headerless metadata stored
+ static TRope emptyRope;
+ DataMerger.AddBlob(TDiskBlob(&emptyRope, local, GType, key.LogoBlobID()));
+ }
+ }
+ Y_DEBUG_ABORT_UNLESS(!ProducingHugeBlob);
+ ProducingSmallBlob = true;
+ }
+ break;
+
+ case TBlobType::ManyHugeBlobs:
+ Y_ABORT_UNLESS(outbound);
+ [[fallthrough]];
+ case TBlobType::HugeBlob:
+ memRec.GetDiskData(&extr, outbound);
+ DataMerger.AddHugeBlob(extr.Begin, extr.End, local, lsn);
+ Y_DEBUG_ABORT_UNLESS(!ProducingSmallBlob);
+ ProducingHugeBlob = true;
+ break;
}
}
- VerifyConsistency(memRec, nullptr);
+ VerifyConsistency(memRec, outbound);
}
void VerifyConsistency(const TMemRec& memRec, const TDiskPart *outbound) {
@@ -239,6 +235,13 @@ namespace NKikimr {
}
void Finish() {
+ if (NeedToLoadData == ELoadData::DontLoadData) {
+ Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
+ // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
+ // TBlobType::ManyHugeBlobs a few lines below
+ MemRec.SetNoBlob();
+ }
+
Y_DEBUG_ABORT_UNLESS(!Empty());
VerifyConsistency();
@@ -263,118 +266,22 @@ namespace NKikimr {
return &DataMerger;
}
- protected:
- TStackVec<TMemRec, 16> MemRecs;
- bool ProducingSmallBlob;
- ELoadData NeedToLoadData;
- TDataMerger DataMerger;
- const bool AddHeader;
- };
-
- ////////////////////////////////////////////////////////////////////////////
- // TCompactRecordMergerIndexPass
- ////////////////////////////////////////////////////////////////////////////
- template<typename TKey, typename TMemRec>
- class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase<TKey, TMemRec> {
- using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
-
- using ELoadData = typename TBase::ELoadData;
-
- using TBase::MemRecs;
- using TBase::ProducingSmallBlob;
- using TBase::NeedToLoadData;
- using TBase::DataMerger;
- using TBase::MemRec;
-
- public:
- TCompactRecordMergerIndexPass(const TBlobStorageGroupType &gtype, bool addHeader)
- : TBase(gtype, addHeader)
- {}
-
- void Finish() {
- if (NeedToLoadData == ELoadData::DontLoadData) {
- Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
- // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
- // TBlobType::ManyHugeBlobs a few lines below
- MemRec.SetNoBlob();
- }
-
- TBase::Finish();
- }
-
template<typename TCallback>
void ForEachSmallDiskBlob(TCallback&& callback) {
for (const auto& memRec : MemRecs) {
callback(memRec);
}
}
- };
-
- ////////////////////////////////////////////////////////////////////////////
- // TCompactRecordMergerDataPass
- ////////////////////////////////////////////////////////////////////////////
- template<typename TKey, typename TMemRec>
- class TCompactRecordMergerDataPass : public TCompactRecordMergerBase<TKey, TMemRec> {
- using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
-
- using TBase::ProducingSmallBlob;
- using TBase::MemRecs;
- using TBase::MemRec;
- using TBase::DataMerger;
- using TBase::GType;
- using TBase::SetLoadDataMode;
-
- public:
- TCompactRecordMergerDataPass(const TBlobStorageGroupType &gtype, bool addHeader)
- : TBase(gtype, addHeader)
- {
- SetLoadDataMode(true);
- }
- void Clear() {
- TBase::Clear();
- ReadSmallBlobs.clear();
- SetLoadDataMode(true);
- }
-
- // add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger
- void AddReadSmallBlob(TString data) {
- Y_ABORT_UNLESS(ProducingSmallBlob);
- ReadSmallBlobs.push_back(std::move(data));
- }
-
- void Finish() {
- // ensure we are producing small blobs; otherwise this merger should never be created
- Y_ABORT_UNLESS(ProducingSmallBlob);
-
- // add all read small blobs into blob merger
- const size_t count = ReadSmallBlobs.size();
- Y_ABORT_UNLESS(count == +MemRecs, "count# %zu +MemRecs# %zu", count, +MemRecs);
- for (size_t i = 0; i < count; ++i) {
- const TMemRec& memRec = MemRecs[i]->GetMemRec();
- const TString& buffer = ReadSmallBlobs[i];
- Y_ABORT_UNLESS(buffer.size() == memRec.DataSize());
- DataMerger.AddBlob(TDiskBlob(buffer.data(), buffer.size(), memRec.GetLocalParts(GType)));
- }
-
-
- // ensure that data merger has small blob
- Y_ABORT_UNLESS(DataMerger.HasSmallBlobs());
-
- // finalize base class logic; it also generates blob record
- TBase::Finish();
-
- // ensure that we have generated correct DiskBlob with full set of declared parts
- const TDiskBlob& blob = DataMerger.GetDiskBlobMerger().GetDiskBlob();
- Y_ABORT_UNLESS(blob.GetParts() == MemRec.GetLocalParts(GType));
- Y_ABORT_UNLESS(MemRec.GetType() == TBlobType::DiskBlob);
- }
-
- private:
- TVector<TString> ReadSmallBlobs;
+ protected:
+ TStackVec<TMemRec, 16> MemRecs;
+ bool ProducingSmallBlob = false;
+ bool ProducingHugeBlob = false;
+ ELoadData NeedToLoadData = ELoadData::NotSet;
+ TDataMerger DataMerger;
+ const bool AddHeader;
};
-
////////////////////////////////////////////////////////////////////////////
// TRecordMergerCallback
////////////////////////////////////////////////////////////////////////////
@@ -412,9 +319,8 @@ namespace NKikimr {
case 1: {
if (memRec.GetType() == TBlobType::DiskBlob) {
// don't deduplicate inplaced data
- const TDiskPart &data = extr.SwearOne();
- if (data.ChunkIdx && data.Size) {
- (*Callback)(data, v);
+ if (!v.Empty()) {
+ (*Callback)(extr.SwearOne(), v);
}
} else if (memRec.GetType() == TBlobType::HugeBlob) {
Y_ABORT_UNLESS(v.CountBits() == 1u);
@@ -446,20 +352,31 @@ namespace NKikimr {
void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
AddBasic(memRec, key);
- if (memRec.HasData()) {
- const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
- if (data) {
- Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob);
- // we have in-memory data in a rope, it always wins among other data,
- // so we call Callback immediately and remove any data for this local part
- // from LastWriteWinsMerger
- (*Callback)(TDiskBlob(data, v, GType, key.LogoBlobID()));
- } else {
- Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob && v.CountBits() == 1u);
- TDiskDataExtractor extr;
- memRec.GetDiskData(&extr, nullptr);
- // deduplicate huge blob
- LastWriteWinsMerger.Add(extr.SwearOne(), v, lsn);
+ if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
+ TDiskDataExtractor extr;
+ static TRope rope;
+ switch (memRec.GetType()) {
+ case TBlobType::MemBlob:
+ // we have in-memory data in a rope, it always wins among other data,
+ // so we call Callback immediately and remove any data for this local part
+ // from LastWriteWinsMerger
+ Y_ABORT_UNLESS(data); // HaveToMergeData is true, so data must be present
+ (*Callback)(TDiskBlob(data, local, GType, key.LogoBlobID()));
+ break;
+
+ case TBlobType::DiskBlob:
+ Y_ABORT_UNLESS(!memRec.HasData());
+ (*Callback)(TDiskBlob(&rope, local, GType, key.LogoBlobID())); // pure metadata parts only
+ break;
+
+ case TBlobType::HugeBlob:
+ Y_ABORT_UNLESS(local.CountBits() == 1);
+ memRec.GetDiskData(&extr, nullptr);
+ LastWriteWinsMerger.Add(extr.SwearOne(), local, lsn);
+ break;
+
+ case TBlobType::ManyHugeBlobs:
+ Y_ABORT("unexpected case");
}
}
}
diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp
index 3660b27fcb..385996f473 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp
@@ -20,11 +20,11 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////
typedef TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob> TSstLogoBlob;
typedef TSstLogoBlob::TWriter TWriterLogoBlob;
- typedef TCompactRecordMergerIndexPass<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger;
+ typedef TCompactRecordMerger<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger;
typedef TLevelSegment<TKeyBlock, TMemRecBlock> TSstBlock;
typedef TSstBlock::TWriter TWriterBlock;
- typedef TCompactRecordMergerIndexPass<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger;
+ typedef TCompactRecordMerger<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger;
TTestContexts TestCtx;
diff --git a/ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp b/ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp
index c6e072950c..d2eb83b751 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp
+++ b/ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.cpp
@@ -32,7 +32,13 @@ namespace NKikimr {
"Db# LogoBlobs action# add_data mode# %s id# %s lsn# %" PRIu64 " bufSize# %" PRIu32,
OpMode2Str(mode), id.ToString().data(), lsn, ui32(buffer.GetSize())));
- HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), partId, ingress, std::move(buffer));
+ if (buffer) {
+ HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), partId, ingress, std::move(buffer));
+ } else {
+ const TBlobStorageGroupType gtype = HullDs->HullCtx->VCtx->Top->GType;
+ Y_DEBUG_ABORT_UNLESS(!gtype.PartSize(TLogoBlobID(id, partId)));
+ HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), TMemRecLogoBlob(ingress));
+ }
}
void THullDbRecovery::ReplayAddLogoBlobCmd(
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h
index 7ebfb73f63..b2bb2e1eec 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h
@@ -27,8 +27,7 @@ namespace NKikimr {
using TGcMapIterator = typename TGcMap::TIterator;
// compaction record merger
- using TCompactRecordMergerIndexPass = NKikimr::TCompactRecordMergerIndexPass<TKey, TMemRec>;
- using TCompactRecordMergerDataPass = NKikimr::TCompactRecordMergerDataPass<TKey, TMemRec>;
+ using TCompactRecordMerger = NKikimr::TCompactRecordMerger<TKey, TMemRec>;
// level segment
using TLevelSegment = NKikimr::TLevelSegment<TKey, TMemRec>;
@@ -131,7 +130,7 @@ namespace NKikimr {
TDeque<TChunkIdx> AllocatedChunks;
// record merger for compaction
- TCompactRecordMergerIndexPass IndexMerger;
+ TCompactRecordMerger IndexMerger;
// current handoff-transformed item
const TTransformedItem *TransformedItem = nullptr;
diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
index 3db0d88877..7aff723af2 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
@@ -64,17 +64,19 @@ namespace NKikimr {
if (QueryPartId == 0 || QueryPartId == i + 1) {
FoundAnything = true;
auto& tmpItem = TmpItems[i];
- if ((QueryShift >= partSize && partSize) || QuerySize > partSize - QueryShift) {
+ if ((partSize ? QueryShift >= partSize : QueryShift) || QuerySize > partSize - QueryShift) {
tmpItem.UpdateWithError(partId, Cookie);
+ } else if (!partSize) {
+ tmpItem.UpdateWithMemItem(partId, Cookie, TRope());
} else if (tmpItem.ShouldUpdateWithDisk()) {
const ui32 size = QuerySize ? QuerySize : partSize - QueryShift;
- if (!size) { // for metadata reads
- tmpItem.UpdateWithMemItem(partId, Cookie, TRope());
- } else {
- tmpItem.UpdateWithDiskItem(partId, Cookie, TDiskPart(data.ChunkIdx, partOffs + QueryShift, size));
- }
+ Y_DEBUG_ABORT_UNLESS(size);
+ tmpItem.UpdateWithDiskItem(partId, Cookie, TDiskPart(data.ChunkIdx, partOffs + QueryShift, size));
}
}
+ if (QueryPartId && QueryPartId <= i + 1) {
+ break;
+ }
partOffs += partSize;
}
}
@@ -97,7 +99,7 @@ namespace NKikimr {
if (QueryPartId == 0 || QueryPartId == partId) {
FoundAnything = true;
auto& item = TmpItems[partId - 1];
- if ((QueryShift >= partSize && partSize) || QuerySize > partSize - QueryShift) {
+ if ((partSize ? QueryShift >= partSize : QueryShift) || QuerySize > partSize - QueryShift) {
item.UpdateWithError(blobId, Cookie);
} else if (item.ShouldUpdateWithMem()) {
const ui32 size = QuerySize ? QuerySize : partSize - QueryShift;