diff options
author | alexvru <alexvru@ydb.tech> | 2022-09-10 13:11:30 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-09-10 13:11:30 +0300 |
commit | efecd0b9a7144d4972bdd0a36962f21efaeeb774 (patch) | |
tree | 4e2fbb484ea9f316b4a5b34faa94773c3de59fcf | |
parent | af65c83ed13fcc8d90e74fe3566ca57e2203c04d (diff) | |
download | ydb-efecd0b9a7144d4972bdd0a36962f21efaeeb774.tar.gz |
Support blobs without footers in BlobDepot
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 67 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 6 | ||||
-rw-r--r-- | ydb/core/util/lz4_data_generator.h | 2 |
3 files changed, 36 insertions, 39 deletions
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 04d6a0af6c7..875af36a284 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -8,9 +8,9 @@ namespace NKikimr::NBlobDepot { class TPutQuery : public TQuery { ui32 BlockChecksRemain = 3; ui32 PutsInFlight = 0; + const bool SuppressFooter = true; + NKikimrBlobDepot::TEvCommitBlobSeq CommitBlobSeq; TBlobSeqId BlobSeqId; - ui32 GroupId; - ui64 TotalDataLen; public: using TQuery::TQuery; @@ -23,6 +23,9 @@ namespace NKikimr::NBlobDepot { void Initiate() override { auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); + if (msg.Buffer.size() > MaxBlobSize) { + return EndWithError(NKikimrProto::ERROR, "blob is way too big"); + } // zero step -- for decommission blobs just issue put immediately if (msg.Decommission) { @@ -41,6 +44,8 @@ namespace NKikimr::NBlobDepot { } void IssuePuts() { + auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); + const auto it = Agent.ChannelKinds.find(NKikimrBlobDepot::TChannelKind::Data); if (it == Agent.ChannelKinds.end()) { return EndWithError(NKikimrProto::ERROR, "no Data channels"); @@ -53,46 +58,48 @@ namespace NKikimr::NBlobDepot { if (!blobSeqId) { return kind.EnqueueQueryWaitingForId(this); } - BlobSeqId = *blobSeqId; kind.WritesInFlight.insert(BlobSeqId); - auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); - const ui32 size = msg.Id.BlobSize(); - TotalDataLen = size; + Y_VERIFY(CommitBlobSeq.ItemsSize() == 0); + auto *commitItem = CommitBlobSeq.AddItems(); + commitItem->SetKey(msg.Id.AsBinaryString()); + auto *locator = commitItem->MutableBlobLocator(); + BlobSeqId.ToProto(locator->MutableBlobSeqId()); + //locator->SetChecksum(Crc32c(msg.Buffer.data(), msg.Buffer.size())); + locator->SetTotalDataLen(msg.Buffer.size()); + if (!SuppressFooter) { + locator->SetFooterLen(sizeof(TVirtualGroupBlobFooter)); + } TVirtualGroupBlobFooter footer; memset(&footer, 0, sizeof(footer)); footer.StoredBlobId = msg.Id; - - TLogoBlobID id; TStringBuf footerData(reinterpret_cast<const char*>(&footer), sizeof(footer)); - auto sendPut = [&](TLogoBlobID id, const TString& buffer) { + auto put = [&](EBlobType type, const TString& buffer) { + const auto& [id, groupId] = kind.MakeBlobId(Agent, BlobSeqId, type, 0, buffer.size()); + Y_VERIFY(!locator->HasGroupId() || locator->GetGroupId() == groupId); + locator->SetGroupId(groupId); auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, msg.Deadline, msg.HandleClass, msg.Tactic); ev->ExtraBlockChecks = msg.ExtraBlockChecks; if (!msg.Decommission) { // do not check original blob against blocks when writing decommission copy ev->ExtraBlockChecks.emplace_back(msg.Id.TabletID(), msg.Id.Generation()); } - Agent.SendToProxy(GroupId, std::move(ev), this, nullptr); + Agent.SendToProxy(groupId, std::move(ev), this, nullptr); + ++PutsInFlight; }; - if (size + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) { + if (SuppressFooter) { + // write the blob as is, we don't need footer for this kind + put(EBlobType::VG_DATA_BLOB, msg.Buffer); + } else if (msg.Buffer.size() + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) { // write single blob with footer - TString buffer = msg.Buffer; - buffer.append(footerData); - std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_COMPOSITE_BLOB, 0, buffer.size()); - sendPut(id, buffer); - ++PutsInFlight; + put(EBlobType::VG_COMPOSITE_BLOB, msg.Buffer + footerData); } else { // write data blob and blob with footer - std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_DATA_BLOB, 0, msg.Buffer.size()); - sendPut(id, msg.Buffer); - - std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_FOOTER_BLOB, 0, footerData.size()); - sendPut(id, TString(footerData)); - - PutsInFlight += 2; + put(EBlobType::VG_DATA_BLOB, msg.Buffer); + put(EBlobType::VG_FOOTER_BLOB, TString(footerData)); } } @@ -146,19 +153,7 @@ namespace NKikimr::NBlobDepot { const size_t numErased = kind.WritesInFlight.erase(BlobSeqId); Y_VERIFY(numErased); - NKikimrBlobDepot::TEvCommitBlobSeq request; - - auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); - - auto *item = request.AddItems(); - item->SetKey(msg.Id.AsBinaryString()); - auto *locator = item->MutableBlobLocator(); - locator->SetGroupId(GroupId); - BlobSeqId.ToProto(locator->MutableBlobSeqId()); - locator->SetChecksum(0); - locator->SetTotalDataLen(TotalDataLen); - locator->SetFooterLen(sizeof(TVirtualGroupBlobFooter)); - Agent.Issue(std::move(request), this, nullptr); + Agent.Issue(std::move(CommitBlobSeq), this, nullptr); } } diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 3eb87d2b054..1b926f5f875 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -21,7 +21,7 @@ namespace NKikimr::NBlobDepot { enum class EBlobType : ui32 { VG_COMPOSITE_BLOB = 0, // data + footer - VG_DATA_BLOB = 1, // just data, footer aside + VG_DATA_BLOB = 1, // just data, footer aside (optional) VG_FOOTER_BLOB = 2, // footer only VG_GC_BLOB = 3, // garbage collection command }; @@ -132,7 +132,9 @@ namespace NKikimr::NBlobDepot { for (const auto& item : valueChain) { const auto& locator = item.GetLocator(); const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId()); - if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) { + if (locator.GetFooterLen() == 0) { + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen()); + } else if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) { callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen()); callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()), 0, 0); } else { diff --git a/ydb/core/util/lz4_data_generator.h b/ydb/core/util/lz4_data_generator.h index 1c979bb9bd2..04ba9508f5a 100644 --- a/ydb/core/util/lz4_data_generator.h +++ b/ydb/core/util/lz4_data_generator.h @@ -23,7 +23,7 @@ inline TString FastGenDataForLZ4(size_t size, ui64 seed) { constexpr size_t minRunLen = 32; constexpr size_t maxRunLen = 64; - const size_t runLen = minRunLen + sizeof(ui32) * rng() % ((maxRunLen - minRunLen) / sizeof(ui32) + 1); + const size_t runLen = minRunLen + sizeof(ui32) * (rng() % ((maxRunLen - minRunLen) / sizeof(ui32) + 1)); char run[maxRunLen]; ui32 i; |