aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-09-10 13:11:30 +0300
committeralexvru <alexvru@ydb.tech>2022-09-10 13:11:30 +0300
commitefecd0b9a7144d4972bdd0a36962f21efaeeb774 (patch)
tree4e2fbb484ea9f316b4a5b34faa94773c3de59fcf
parentaf65c83ed13fcc8d90e74fe3566ca57e2203c04d (diff)
downloadydb-efecd0b9a7144d4972bdd0a36962f21efaeeb774.tar.gz
Support blobs without footers in BlobDepot
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp67
-rw-r--r--ydb/core/blob_depot/types.h6
-rw-r--r--ydb/core/util/lz4_data_generator.h2
3 files changed, 36 insertions, 39 deletions
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 04d6a0af6c7..875af36a284 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -8,9 +8,9 @@ namespace NKikimr::NBlobDepot {
class TPutQuery : public TQuery {
ui32 BlockChecksRemain = 3;
ui32 PutsInFlight = 0;
+ const bool SuppressFooter = true;
+ NKikimrBlobDepot::TEvCommitBlobSeq CommitBlobSeq;
TBlobSeqId BlobSeqId;
- ui32 GroupId;
- ui64 TotalDataLen;
public:
using TQuery::TQuery;
@@ -23,6 +23,9 @@ namespace NKikimr::NBlobDepot {
void Initiate() override {
auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
+ if (msg.Buffer.size() > MaxBlobSize) {
+ return EndWithError(NKikimrProto::ERROR, "blob is way too big");
+ }
// zero step -- for decommission blobs just issue put immediately
if (msg.Decommission) {
@@ -41,6 +44,8 @@ namespace NKikimr::NBlobDepot {
}
void IssuePuts() {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
+
const auto it = Agent.ChannelKinds.find(NKikimrBlobDepot::TChannelKind::Data);
if (it == Agent.ChannelKinds.end()) {
return EndWithError(NKikimrProto::ERROR, "no Data channels");
@@ -53,46 +58,48 @@ namespace NKikimr::NBlobDepot {
if (!blobSeqId) {
return kind.EnqueueQueryWaitingForId(this);
}
-
BlobSeqId = *blobSeqId;
kind.WritesInFlight.insert(BlobSeqId);
- auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
- const ui32 size = msg.Id.BlobSize();
- TotalDataLen = size;
+ Y_VERIFY(CommitBlobSeq.ItemsSize() == 0);
+ auto *commitItem = CommitBlobSeq.AddItems();
+ commitItem->SetKey(msg.Id.AsBinaryString());
+ auto *locator = commitItem->MutableBlobLocator();
+ BlobSeqId.ToProto(locator->MutableBlobSeqId());
+ //locator->SetChecksum(Crc32c(msg.Buffer.data(), msg.Buffer.size()));
+ locator->SetTotalDataLen(msg.Buffer.size());
+ if (!SuppressFooter) {
+ locator->SetFooterLen(sizeof(TVirtualGroupBlobFooter));
+ }
TVirtualGroupBlobFooter footer;
memset(&footer, 0, sizeof(footer));
footer.StoredBlobId = msg.Id;
-
- TLogoBlobID id;
TStringBuf footerData(reinterpret_cast<const char*>(&footer), sizeof(footer));
- auto sendPut = [&](TLogoBlobID id, const TString& buffer) {
+ auto put = [&](EBlobType type, const TString& buffer) {
+ const auto& [id, groupId] = kind.MakeBlobId(Agent, BlobSeqId, type, 0, buffer.size());
+ Y_VERIFY(!locator->HasGroupId() || locator->GetGroupId() == groupId);
+ locator->SetGroupId(groupId);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, msg.Deadline, msg.HandleClass, msg.Tactic);
ev->ExtraBlockChecks = msg.ExtraBlockChecks;
if (!msg.Decommission) { // do not check original blob against blocks when writing decommission copy
ev->ExtraBlockChecks.emplace_back(msg.Id.TabletID(), msg.Id.Generation());
}
- Agent.SendToProxy(GroupId, std::move(ev), this, nullptr);
+ Agent.SendToProxy(groupId, std::move(ev), this, nullptr);
+ ++PutsInFlight;
};
- if (size + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) {
+ if (SuppressFooter) {
+ // write the blob as is, we don't need footer for this kind
+ put(EBlobType::VG_DATA_BLOB, msg.Buffer);
+ } else if (msg.Buffer.size() + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) {
// write single blob with footer
- TString buffer = msg.Buffer;
- buffer.append(footerData);
- std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_COMPOSITE_BLOB, 0, buffer.size());
- sendPut(id, buffer);
- ++PutsInFlight;
+ put(EBlobType::VG_COMPOSITE_BLOB, msg.Buffer + footerData);
} else {
// write data blob and blob with footer
- std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_DATA_BLOB, 0, msg.Buffer.size());
- sendPut(id, msg.Buffer);
-
- std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_FOOTER_BLOB, 0, footerData.size());
- sendPut(id, TString(footerData));
-
- PutsInFlight += 2;
+ put(EBlobType::VG_DATA_BLOB, msg.Buffer);
+ put(EBlobType::VG_FOOTER_BLOB, TString(footerData));
}
}
@@ -146,19 +153,7 @@ namespace NKikimr::NBlobDepot {
const size_t numErased = kind.WritesInFlight.erase(BlobSeqId);
Y_VERIFY(numErased);
- NKikimrBlobDepot::TEvCommitBlobSeq request;
-
- auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
-
- auto *item = request.AddItems();
- item->SetKey(msg.Id.AsBinaryString());
- auto *locator = item->MutableBlobLocator();
- locator->SetGroupId(GroupId);
- BlobSeqId.ToProto(locator->MutableBlobSeqId());
- locator->SetChecksum(0);
- locator->SetTotalDataLen(TotalDataLen);
- locator->SetFooterLen(sizeof(TVirtualGroupBlobFooter));
- Agent.Issue(std::move(request), this, nullptr);
+ Agent.Issue(std::move(CommitBlobSeq), this, nullptr);
}
}
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 3eb87d2b054..1b926f5f875 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -21,7 +21,7 @@ namespace NKikimr::NBlobDepot {
enum class EBlobType : ui32 {
VG_COMPOSITE_BLOB = 0, // data + footer
- VG_DATA_BLOB = 1, // just data, footer aside
+ VG_DATA_BLOB = 1, // just data, footer aside (optional)
VG_FOOTER_BLOB = 2, // footer only
VG_GC_BLOB = 3, // garbage collection command
};
@@ -132,7 +132,9 @@ namespace NKikimr::NBlobDepot {
for (const auto& item : valueChain) {
const auto& locator = item.GetLocator();
const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId());
- if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) {
+ if (locator.GetFooterLen() == 0) {
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen());
+ } else if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) {
callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen());
callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()), 0, 0);
} else {
diff --git a/ydb/core/util/lz4_data_generator.h b/ydb/core/util/lz4_data_generator.h
index 1c979bb9bd2..04ba9508f5a 100644
--- a/ydb/core/util/lz4_data_generator.h
+++ b/ydb/core/util/lz4_data_generator.h
@@ -23,7 +23,7 @@ inline TString FastGenDataForLZ4(size_t size, ui64 seed) {
constexpr size_t minRunLen = 32;
constexpr size_t maxRunLen = 64;
- const size_t runLen = minRunLen + sizeof(ui32) * rng() % ((maxRunLen - minRunLen) / sizeof(ui32) + 1);
+ const size_t runLen = minRunLen + sizeof(ui32) * (rng() % ((maxRunLen - minRunLen) / sizeof(ui32) + 1));
char run[maxRunLen];
ui32 i;