diff options
author | alexvru <alexvru@ydb.tech> | 2022-07-25 13:40:32 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-07-25 13:40:32 +0300 |
commit | d79a471fa6bf139887fd7087a16b8f3d3ec1c998 (patch) | |
tree | 9a93c169204a67109c86e5b32de1a188b0d761a7 | |
parent | 6e02ac55fbb46b2dbcf36c2a76fe6bf28a031a3c (diff) | |
download | ydb-d79a471fa6bf139887fd7087a16b8f3d3ec1c998.tar.gz |
BlobDepot work in progress
23 files changed, 619 insertions, 267 deletions
diff --git a/ydb/core/base/defs.h b/ydb/core/base/defs.h index bb4eb556895..e089d844a84 100644 --- a/ydb/core/base/defs.h +++ b/ydb/core/base/defs.h @@ -8,6 +8,7 @@ #include <ydb/core/debug/valgrind_check.h> #include <util/generic/array_ref.h> #include <util/generic/string.h> +#include <util/system/byteorder.h> namespace NKikimr { // actorlib is organic part of kikimr so we emulate global import by this directive diff --git a/ydb/core/base/logoblob.h b/ydb/core/base/logoblob.h index ddcecd3b065..7d06bd5592a 100644 --- a/ydb/core/base/logoblob.h +++ b/ydb/core/base/logoblob.h @@ -14,6 +14,8 @@ namespace NKikimr { static const ui32 MaxPartId = 15ul; static const ui32 MaxCrcMode = 3ul; + static constexpr size_t BinarySize = 3 * sizeof(ui64); + TLogoBlobID() { Set(0, 0, 0, 0, 0, 0, 0, 0); @@ -100,8 +102,28 @@ namespace NKikimr { const ui64* GetRaw() const { return Raw.X; } - TStringBuf AsBinaryString() const { - return {reinterpret_cast<const char*>(GetRaw()), 3 * sizeof(ui64)}; + void ToBinary(void *data) const { + ui64 *x = static_cast<ui64*>(data); + x[0] = HostToInet(Raw.X[0]); + x[1] = HostToInet(Raw.X[1]); + x[2] = HostToInet(Raw.X[2]); + } + + TString AsBinaryString() const { + std::array<char, BinarySize> data; + ToBinary(data.data()); + return TString(data.data(), data.size()); + } + + static TLogoBlobID FromBinary(const void *data) { + const ui64 *x = static_cast<const ui64*>(data); + ui64 arr[3] = {InetToHost(x[0]), InetToHost(x[1]), InetToHost(x[2])}; + return TLogoBlobID(arr); + } + + static TLogoBlobID FromBinary(const TString& data) { + Y_VERIFY(data.size() == BinarySize); + return FromBinary(data.data()); } TString ToString() const; diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index 236ee49e557..9a868b3e026 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -19,12 +19,14 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/given_id_range.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/mon_main.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_commit_blob_seq.cpp ) diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 7f1a4c9dd44..0ec66d59100 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -49,9 +49,9 @@ namespace NKikimr::NBlobDepot { NKikimrBlobDepot::TEvResolve resolve; auto *item = resolve.AddItems(); - item->SetBeginningKey(from.GetRaw(), 3 * sizeof(ui64)); + item->SetBeginningKey(from.AsBinaryString()); item->SetIncludeBeginning(true); - item->SetEndingKey(to.GetRaw(), 3 * sizeof(ui64)); + item->SetEndingKey(to.AsBinaryString()); item->SetIncludeEnding(true); item->SetMaxKeys(1); item->SetReverse(true); @@ -103,10 +103,7 @@ namespace NKikimr::NBlobDepot { if (status == NKikimrProto::OK) { for (const auto& item : msg.Record.GetResolvedKeys()) { - const TString& id = item.GetKey(); - Y_VERIFY(id.size() == 3 * sizeof(ui64)); - Y_VERIFY(!Id); - Id = TLogoBlobID(reinterpret_cast<const ui64*>(id.data())); + Id = TLogoBlobID::FromBinary(item.GetKey()); Y_VERIFY(item.ValueChainSize() == 1); if (ReadBody) { TString error; diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 13bc7ef8e94..f710bbfe0f7 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -35,7 +35,7 @@ namespace NKikimr::NBlobDepot { response.Shift = query.Shift; response.RequestedSize = query.Size; - TString blobId(reinterpret_cast<const char*>(query.Id.GetRaw()), 3 * sizeof(ui64)); + TString blobId = query.Id.AsBinaryString(); if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, std::make_shared<TResolveKeyContext>(i))) { if (!ProcessSingleResult(i, value)) { diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 325cb58d40a..c0bf7f2a084 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -142,10 +142,9 @@ namespace NKikimr::NBlobDepot { NKikimrBlobDepot::TEvCommitBlobSeq request; auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); - const TStringBuf key = msg.Id.AsBinaryString(); auto *item = request.AddItems(); - item->SetKey(key.data(), key.size()); + item->SetKey(msg.Id.AsBinaryString()); auto *locator = item->MutableBlobLocator(); locator->SetGroupId(GroupId); BlobSeqId.ToProto(locator->MutableBlobSeqId()); diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 57c5de67b9c..af054706490 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -31,8 +31,8 @@ namespace NKikimr::NBlobDepot { void IssueResolve() { auto& msg = *Event->Get<TEvBlobStorage::TEvRange>(); - TStringBuf from = msg.From.AsBinaryString(); - TStringBuf to = msg.To.AsBinaryString(); + TString from = msg.From.AsBinaryString(); + TString to = msg.To.AsBinaryString(); const bool reverse = msg.To < msg.From; if (reverse) { std::swap(from, to); @@ -40,9 +40,9 @@ namespace NKikimr::NBlobDepot { NKikimrBlobDepot::TEvResolve resolve; auto *item = resolve.AddItems(); - item->SetBeginningKey(from.data(), from.size()); + item->SetBeginningKey(from); item->SetIncludeBeginning(true); - item->SetEndingKey(to.data(), to.size()); + item->SetEndingKey(to); item->SetIncludeEnding(true); item->SetReverse(reverse); @@ -53,9 +53,9 @@ namespace NKikimr::NBlobDepot { void IssueResolve(TLogoBlobID id, size_t index) { NKikimrBlobDepot::TEvResolve resolve; auto *item = resolve.AddItems(); - item->SetBeginningKey(id.GetRaw(), 3 * sizeof(ui64)); + item->SetBeginningKey(id.AsBinaryString()); item->SetIncludeBeginning(true); - item->SetEndingKey(id.GetRaw(), 3 * sizeof(ui64)); + item->SetEndingKey(id.AsBinaryString()); item->SetIncludeEnding(true); Agent.Issue(std::move(resolve), this, std::make_shared<TExtraResolveContext>(index)); @@ -85,8 +85,7 @@ namespace NKikimr::NBlobDepot { for (const auto& key : msg.GetResolvedKeys()) { const TString& blobId = key.GetKey(); - Y_VERIFY(blobId.size() == 3 * sizeof(ui64)); - TLogoBlobID id(reinterpret_cast<const ui64*>(blobId.data())); + auto id = TLogoBlobID::FromBinary(blobId); const size_t index = context ? context->Obtain<TExtraResolveContext>().Index diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index 260caa7647a..bba9917e1a0 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -26,7 +26,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvRegisterAgent, Handle); hFunc(TEvBlobDepot::TEvAllocateIds, Handle); hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle); - hFunc(TEvBlobDepot::TEvResolve, Handle); + hFunc(TEvBlobDepot::TEvResolve, Data->Handle); hFunc(TEvBlobDepot::TEvBlock, BlocksManager->Handle); hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager->Handle); diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index b465995502d..05372c9bde7 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -167,7 +167,6 @@ namespace NKikimr::NBlobDepot { std::unique_ptr<TData> Data; void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev); - void Handle(TEvBlobDepot::TEvResolve::TPtr ev); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Monitoring diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index c5750dc3ebc..aa43ca596ab 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -1,81 +1,9 @@ #include "data.h" -#include "schema.h" namespace NKikimr::NBlobDepot { using TData = TBlobDepot::TData; - class TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - const ui8 Channel; - const ui32 GroupId; - const TGenStep IssuedGenStep; - std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> CollectGarbage; - - public: - TTxIssueGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep issuedGenStep, - std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage) - : TTransactionBase(self) - , Channel(channel) - , GroupId(groupId) - , IssuedGenStep(issuedGenStep) - , CollectGarbage(std::move(collectGarbage)) - {} - - bool Execute(TTransactionContext& txc, const TActorContext&) override { - NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::IssuedGenStep>(ui64(IssuedGenStep)); - return true; - } - - void Complete(const TActorContext&) override { - SendToBSProxy(Self->SelfId(), GroupId, CollectGarbage.release(), GroupId); - } - }; - - class TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - const ui8 Channel; - const ui32 GroupId; - std::vector<TLogoBlobID> TrashDeleted; - const TGenStep ConfirmedGenStep; - - static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; - - public: - TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep) - : TTransactionBase(self) - , Channel(channel) - , GroupId(groupId) - , TrashDeleted(std::move(trashDeleted)) - , ConfirmedGenStep(confirmedGenStep) - {} - - bool Execute(TTransactionContext& txc, const TActorContext&) override { - NIceDb::TNiceDb db(txc.DB); - - for (ui32 i = 0; i < TrashDeleted.size() && i < MaxKeysToProcessAtOnce; ++i) { - db.Table<Schema::Trash>().Key(TKey(TrashDeleted[i]).MakeBinaryKey()).Delete(); - } - if (TrashDeleted.size() <= MaxKeysToProcessAtOnce) { - TrashDeleted.clear(); - db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::ConfirmedGenStep>(ui64(ConfirmedGenStep)); - } else { - std::vector<TLogoBlobID> temp; - temp.insert(temp.end(), TrashDeleted.begin() + MaxKeysToProcessAtOnce, TrashDeleted.end()); - temp.swap(TrashDeleted); - } - - return true; - } - - void Complete(const TActorContext&) override { - if (TrashDeleted.empty()) { - Self->Data->OnCommitConfirmedGC(Channel, GroupId); - } else { // resume transaction - Self->Execute(std::make_unique<TTxConfirmGC>(Self, Channel, GroupId, std::move(TrashDeleted), ConfirmedGenStep)); - } - } - }; - std::optional<TData::TValue> TData::FindKey(const TKey& key) { const auto it = Data.find(key); return it != Data.end() ? std::make_optional(it->second) : std::nullopt; @@ -91,6 +19,10 @@ namespace NKikimr::NBlobDepot { } void TData::AddDataOnLoad(TKey key, TString value) { + if (Data.contains(key)) { + return; // we are racing with the offline load procedure -- skip this key, it is already new in memory + } + NKikimrBlobDepot::TValue proto; const bool success = proto.ParseFromString(value); Y_VERIFY(success); @@ -118,8 +50,7 @@ namespace NKikimr::NBlobDepot { } void TData::PutKey(TKey key, TValue&& data) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)), - (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState))); + ui64 referencedBytes = 0; EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id) { if (!RefCount[id]++) { @@ -129,8 +60,13 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(inserted); AccountBlob(id, 1); } + referencedBytes += id.BlobSize(); }); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (TabletId, Self->TabletID()), (Key, key), + (ValueChain.size, data.ValueChain.size()), (ReferencedBytes, referencedBytes), + (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState))); + Data[std::move(key)] = std::move(data); } @@ -292,8 +228,7 @@ namespace NKikimr::NBlobDepot { (TrashInFlight.size, record.TrashInFlight.size())); if (collect) { - Self->Execute(std::make_unique<TTxIssueGC>(Self, record.Channel, record.GroupId, record.IssuedGenStep, - std::move(ev))); + ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev)); } else { SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId); } @@ -323,8 +258,8 @@ namespace NKikimr::NBlobDepot { if (ev->Get()->Status == NKikimrProto::OK) { Y_VERIFY(record.CollectGarbageRequestInFlight); record.OnSuccessfulCollect(this); - Self->Execute(std::make_unique<TTxConfirmGC>(Self, record.Channel, record.GroupId, - std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep)); + ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), + record.LastConfirmedGenStep); } else { record.ClearInFlight(this); HandleTrash(); diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index f9e4d6f340e..4816dced3ae 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -116,29 +116,28 @@ namespace NKikimr::NBlobDepot { TString MakeBinaryKey() const { if (Data.Type == BlobIdType) { - return TString(GetBlobId().AsBinaryString()); + return GetBlobId().AsBinaryString(); } else { return TString(GetStringBuf()); } } - static TKey FromBinaryKey(const TStringBuf& key, const NKikimrBlobDepot::TBlobDepotConfig& config) { + static TKey FromBinaryKey(const TString& key, const NKikimrBlobDepot::TBlobDepotConfig& config) { if (config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) { - Y_VERIFY(key.size() == 3 * sizeof(ui64)); - return TKey(TLogoBlobID(reinterpret_cast<const ui64*>(key.data()))); + return TKey(TLogoBlobID::FromBinary(key)); } else { return TKey(key); } } - TString ToString(const NKikimrBlobDepot::TBlobDepotConfig& config) const { + TString ToString() const { TStringStream s; - Output(s, config); + Output(s); return s.Str(); } - void Output(IOutputStream& s, const NKikimrBlobDepot::TBlobDepotConfig& config) const { - if (config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) { + void Output(IOutputStream& s) const { + if (Data.Type == BlobIdType) { s << GetBlobId(); } else { s << EscapeC(GetStringBuf()); @@ -255,16 +254,23 @@ namespace NKikimr::NBlobDepot { void EnqueueForCollectionIfPossible(TData *self); }; + bool Loaded = false; std::map<TKey, TValue> Data; THashMap<TLogoBlobID, ui32> RefCount; THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup; TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash; + std::optional<TKey> LastLoadedKey; // keys are being loaded in ascending order THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed class TTxIssueGC; class TTxConfirmGC; + class TTxLoad; + + class TTxLoadSpecificKeys; + class TTxResolve; + public: TData(TBlobDepot *self) : Self(self) @@ -342,6 +348,18 @@ namespace NKikimr::NBlobDepot { } } } + + void StartLoad(); + void OnLoadComplete(); + bool IsLoaded() const { return Loaded; } + + void Handle(TEvBlobDepot::TEvResolve::TPtr ev); + + private: + void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep, + std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage); + + void ExecuteConfirmGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep); }; Y_DECLARE_OPERATORS_FOR_FLAGS(TBlobDepot::TData::TScanFlags) diff --git a/ydb/core/blob_depot/data_gc.cpp b/ydb/core/blob_depot/data_gc.cpp new file mode 100644 index 00000000000..4eff9208f76 --- /dev/null +++ b/ydb/core/blob_depot/data_gc.cpp @@ -0,0 +1,88 @@ +#include "data.h" +#include "schema.h" + +namespace NKikimr::NBlobDepot { + + using TData = TBlobDepot::TData; + + class TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui8 Channel; + const ui32 GroupId; + const TGenStep IssuedGenStep; + std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> CollectGarbage; + + public: + TTxIssueGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep issuedGenStep, + std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage) + : TTransactionBase(self) + , Channel(channel) + , GroupId(groupId) + , IssuedGenStep(issuedGenStep) + , CollectGarbage(std::move(collectGarbage)) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::IssuedGenStep>(ui64(IssuedGenStep)); + return true; + } + + void Complete(const TActorContext&) override { + SendToBSProxy(Self->SelfId(), GroupId, CollectGarbage.release(), GroupId); + } + }; + + void TData::ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep, + std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage) { + Self->Execute(std::make_unique<TTxIssueGC>(Self, channel, groupId, issuedGenStep, std::move(collectGarbage))); + } + + class TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui8 Channel; + const ui32 GroupId; + std::vector<TLogoBlobID> TrashDeleted; + const TGenStep ConfirmedGenStep; + + static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; + + public: + TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep) + : TTransactionBase(self) + , Channel(channel) + , GroupId(groupId) + , TrashDeleted(std::move(trashDeleted)) + , ConfirmedGenStep(confirmedGenStep) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + + for (ui32 i = 0; i < TrashDeleted.size() && i < MaxKeysToProcessAtOnce; ++i) { + db.Table<Schema::Trash>().Key(TKey(TrashDeleted[i]).MakeBinaryKey()).Delete(); + } + if (TrashDeleted.size() <= MaxKeysToProcessAtOnce) { + TrashDeleted.clear(); + db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::ConfirmedGenStep>(ui64(ConfirmedGenStep)); + } else { + std::vector<TLogoBlobID> temp; + temp.insert(temp.end(), TrashDeleted.begin() + MaxKeysToProcessAtOnce, TrashDeleted.end()); + temp.swap(TrashDeleted); + } + + return true; + } + + void Complete(const TActorContext&) override { + if (TrashDeleted.empty()) { + Self->Data->OnCommitConfirmedGC(Channel, GroupId); + } else { // resume transaction + Self->Data->ExecuteConfirmGC(Channel, GroupId, std::move(TrashDeleted), ConfirmedGenStep); + } + } + }; + + void TData::ExecuteConfirmGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep) { + Self->Execute(std::make_unique<TTxConfirmGC>(Self, channel, groupId, std::move(trashDeleted), confirmedGenStep)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp new file mode 100644 index 00000000000..820573e9ce6 --- /dev/null +++ b/ydb/core/blob_depot/data_load.cpp @@ -0,0 +1,162 @@ +#include "data.h" +#include "schema.h" +#include "garbage_collection.h" + +namespace NKikimr::NBlobDepot { + + using TData = TBlobDepot::TData; + + class TData::TTxLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + template<typename TTable_, typename TInProgressState_, typename TNextState_, bool Initial_> + struct TItem { + using TTable = TTable_; + using TInProgressState = TInProgressState_; + using TNextState = TNextState_; + static constexpr bool Initial = Initial_; + typename TTable::TKey::KeyValuesType Key; + }; + + struct TLoadFinished {}; + struct TLoadGcInProgress : TItem<Schema::GC, TLoadGcInProgress, TLoadFinished, false> {}; + struct TLoadGcBegin : TItem<Schema::GC, TLoadGcInProgress, TLoadFinished, true> {}; + struct TLoadTrashInProgress : TItem<Schema::Trash, TLoadTrashInProgress, TLoadGcBegin, false> {}; + struct TLoadTrashBegin : TItem<Schema::Trash, TLoadTrashInProgress, TLoadGcBegin, true> {}; + struct TLoadDataInProgress : TItem<Schema::Data, TLoadDataInProgress, TLoadTrashBegin, false> {}; + struct TLoadDataBegin : TItem<Schema::Data, TLoadDataInProgress, TLoadTrashBegin, true> {}; + + using TLoadState = std::variant< + TLoadDataBegin, + TLoadDataInProgress, + TLoadTrashBegin, + TLoadTrashInProgress, + TLoadGcBegin, + TLoadGcInProgress, + TLoadFinished + >; + + TLoadState LoadState; + std::unique_ptr<TTxLoad> SuccessorTx; + + public: + TTxLoad(TBlobDepot *self, TLoadState loadState = TLoadDataBegin{}) + : TTransactionBase(self) + , LoadState(std::move(loadState)) + {} + + TTxLoad(TTxLoad& predecessor) + : TTransactionBase(predecessor.Self) + , LoadState(std::move(predecessor.LoadState)) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT28, "TData::TTxLoad::Execute", (TabletId, Self->TabletID())); + + NIceDb::TNiceDb db(txc.DB); + bool progress = false; + auto visitor = [&](auto& item) { return ExecuteLoad(db, item, progress); }; + + if (std::visit(visitor, LoadState)) { + // load finished + return true; + } else if (progress) { + // something was read, but not all + SuccessorTx = std::make_unique<TTxLoad>(*this); + return true; + } else { + // didn't read anything this time + return false; + } + } + + template<typename T> + bool ExecuteLoad(NIceDb::TNiceDb& db, T& item, bool& progress) { + if constexpr (T::Initial) { + return LoadTable(db, db.Table<typename T::TTable>().All(), item, progress); + } else { + auto greaterOrEqual = [&](auto&&... key) { return db.Table<typename T::TTable>().GreaterOrEqual(key...); }; + return LoadTable(db, std::apply(greaterOrEqual, item.Key), item, progress); + } + } + + bool ExecuteLoad(NIceDb::TNiceDb&, TLoadFinished&, bool&) { + return true; + } + + template<typename T, typename TItem> + bool LoadTable(NIceDb::TNiceDb& db, T&& table, TItem& item, bool& progress) { + if (!table.Precharge(TItem::TTable::PrechargeRows, TItem::TTable::PrechargeBytes)) { + return false; + } + + for (auto rowset = table.Select();; rowset.Next()) { + if (!rowset.IsReady()) { + return false; + } else if (!rowset.IsValid()) { + break; + } + + typename TItem::TTable::TKey::KeyValuesType key(rowset.GetKey()); + bool processRow = true; + if constexpr (!TItem::Initial) { + processRow = item.Key < key; + } + if (processRow) { + progress = true; + typename TItem::TInProgressState state; + state.Key = std::move(key); + LoadState = std::move(state); + ProcessRow(rowset, static_cast<typename TItem::TTable*>(nullptr)); + } + } + + // table was read completely, advance to next state + LoadState = typename TItem::TNextState{}; + return ExecuteLoad(db, std::get<typename TItem::TNextState>(LoadState), progress); + } + + template<typename T> + void ProcessRow(T&& row, Schema::Data*) { + auto key = TData::TKey::FromBinaryKey(row.template GetValue<Schema::Data::Key>(), Self->Config); + Self->Data->AddDataOnLoad(key, row.template GetValue<Schema::Data::Value>()); + Y_VERIFY(!Self->Data->LastLoadedKey || *Self->Data->LastLoadedKey < key); + Self->Data->LastLoadedKey = std::move(key); + } + + template<typename T> + void ProcessRow(T&& row, Schema::Trash*) { + Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(row.template GetValue<Schema::Trash::BlobId>())); + } + + template<typename T> + void ProcessRow(T&& row, Schema::GC*) { + Self->Data->AddGenStepOnLoad( + row.template GetValue<Schema::GC::Channel>(), + row.template GetValue<Schema::GC::GroupId>(), + TGenStep(row.template GetValueOrDefault<Schema::GC::IssuedGenStep>()), + TGenStep(row.template GetValueOrDefault<Schema::GC::ConfirmedGenStep>()) + ); + } + + void Complete(const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "TData::TTxLoad::Complete", (TabletId, Self->TabletID()), + (SuccessorTx, bool(SuccessorTx)), (LoadState.index, LoadState.index())); + + if (SuccessorTx) { + Self->Execute(std::move(SuccessorTx)); + } else { + Self->Data->OnLoadComplete(); + } + } + }; + + void TData::StartLoad() { + Self->Execute(std::make_unique<TTxLoad>(Self)); + } + + void TData::OnLoadComplete() { + Loaded = true; + Self->BarrierServer->OnDataLoaded(); + HandleTrash(); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp new file mode 100644 index 00000000000..bdf96bb06b8 --- /dev/null +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -0,0 +1,252 @@ +#include "data.h" +#include "schema.h" + +namespace NKikimr::NBlobDepot { + + using TData = TBlobDepot::TData; + + class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request; + int ItemIndex = 0; + std::optional<TKey> LastScannedKey; + ui32 NumKeysRead = 0; // number of keys already read for this item + + // final state + std::deque<std::unique_ptr<TEvBlobDepot::TEvResolveResult>> Outbox; + std::unique_ptr<TTxResolve> SuccessorTx; + + public: + TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request) + : TTransactionBase(self) + , Request(request.Release()) + {} + + TTxResolve(TTxResolve& predecessor) + : TTransactionBase(predecessor.Self) + , Request(std::move(predecessor.Request)) + , ItemIndex(predecessor.ItemIndex) + , LastScannedKey(std::move(predecessor.LastScannedKey)) + , NumKeysRead(predecessor.NumKeysRead) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "TTxResolve::Execute", (TabletId, Self->TabletID()), + (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), + (LastScannedKey, LastScannedKey)); + + if (Self->Data->Loaded) { + GenerateResponse(); + return true; + } + + bool progress = false; // have we made some progress during scan? + const auto& record = Request->Get()->Record; + const auto& items = record.GetItems(); + for (; ItemIndex < items.size(); ++ItemIndex, LastScannedKey.reset(), NumKeysRead = 0) { + const auto& item = items[ItemIndex]; + + std::optional<TKey> begin = item.HasBeginningKey() + ? std::make_optional(TKey::FromBinaryKey(item.GetBeginningKey(), Self->Config)) + : std::nullopt; + + std::optional<TKey> end = item.HasEndingKey() + ? std::make_optional(TKey::FromBinaryKey(item.GetEndingKey(), Self->Config)) + : std::nullopt; + + TScanFlags flags; + if (item.GetIncludeBeginning()) { + flags |= EScanFlags::INCLUDE_BEGIN; + } + if (item.GetIncludeEnding()) { + flags |= EScanFlags::INCLUDE_END; + } + if (item.GetReverse()) { + flags |= EScanFlags::REVERSE; + } + + // adjust range according to actually generated data + if (LastScannedKey) { + if (flags & EScanFlags::REVERSE) { // reverse scan + end = *LastScannedKey; + flags &= ~EScanFlags::INCLUDE_END; + } else { // direct scan + begin = *LastScannedKey; + flags &= ~EScanFlags::INCLUDE_BEGIN; + } + } + + if (end && Self->Data->LastLoadedKey && *end <= Self->Data->LastLoadedKey) { + // we have everything we need contained in memory, skip this item + continue; + } else if (Self->Data->LastLoadedKey && (!begin || *begin <= Self->Data->LastLoadedKey)) { + // we can scan only some part from memory -- do it + auto callback = [&](const TKey& key, const TValue&) { + LastScannedKey = key; + return ++NumKeysRead != item.GetMaxKeys(); + }; + Self->Data->ScanRange(begin ? &begin.value() : nullptr, &Self->Data->LastLoadedKey.value(), + flags | EScanFlags::INCLUDE_END, callback); + + // adjust range beginning + begin = Self->Data->LastLoadedKey; + flags &= ~EScanFlags::INCLUDE_BEGIN; + + // check if we have read all the keys requested + if (NumKeysRead == item.GetMaxKeys()) { + continue; + } + } + + auto processRange = [&](auto&& table) { + for (auto rowset = table.Select();; rowset.Next()) { + if (!rowset.IsReady()) { + return false; + } else if (!rowset.IsValid()) { + // no more keys in our direction + return true; + } + auto key = TKey::FromBinaryKey(rowset.template GetValue<Schema::Data::Key>(), Self->Config); + if (key != LastScannedKey) { + LastScannedKey = key; + progress = true; + Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>()); + + const bool matchBegin = !begin || (flags & EScanFlags::INCLUDE_BEGIN ? *begin <= key : *begin < key); + const bool matchEnd = !end || (flags & EScanFlags::INCLUDE_END ? key <= *end : key < *end); + if (matchBegin && matchEnd && ++NumKeysRead == item.GetMaxKeys()) { + // we have hit the MaxItems limit, exit + return true; + } else if (flags & EScanFlags::REVERSE ? !matchBegin : !matchEnd) { + // we have exceeded the opposite boundary, exit + return true; + } + } + } + }; + + auto applyEnd = [&](auto&& x) { + return end + ? processRange(x.LessOrEqual(end->MakeBinaryKey())) + : processRange(std::forward<std::decay_t<decltype(x)>>(x)); + }; + auto applyBegin = [&](auto&& x) { + return begin + ? applyEnd(x.GreaterOrEqual(begin->MakeBinaryKey())) + : applyEnd(std::forward<std::decay_t<decltype(x)>>(x)); + }; + auto applyReverse = [&](auto&& x) { + return item.GetReverse() + ? applyBegin(x.Reverse()) + : applyBegin(std::forward<std::decay_t<decltype(x)>>(x)); + }; + if (applyReverse(db.Table<Schema::Data>())) { + continue; // all work done for this item + } else if (progress) { + // we have already done something, so let's finish this transaction and start a new one, continuing + // the job + SuccessorTx = std::make_unique<TTxResolve>(*this); + return true; + } else { + return false; // we'll have to restart this transaction to fetch some data + } + } + + GenerateResponse(); + return true; + } + + void Complete(const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT30, "TTxResolve::Complete", (TabletId, Self->TabletID()), + (Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, bool(SuccessorTx)), + (Outbox.size, Outbox.size())); + + if (SuccessorTx) { + Self->Execute(std::move(SuccessorTx)); + } else { + if (Outbox.empty()) { + Outbox.push_back(std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt)); + } + for (auto& ev : Outbox) { + auto handle = std::make_unique<IEventHandle>(Request->Sender, Self->SelfId(), ev.release(), 0, Request->Cookie); + if (Request->InterconnectSession) { + handle->Rewrite(TEvInterconnect::EvForward, Request->InterconnectSession); + } + TActivationContext::Send(handle.release()); + } + } + } + + void GenerateResponse() { + size_t lastResponseSize; + + for (const auto& item : Request->Get()->Record.GetItems()) { + std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt; + + std::optional<TKey> begin = item.HasBeginningKey() + ? std::make_optional(TKey::FromBinaryKey(item.GetBeginningKey(), Self->Config)) + : std::nullopt; + + std::optional<TKey> end = item.HasEndingKey() + ? std::make_optional(TKey::FromBinaryKey(item.GetEndingKey(), Self->Config)) + : std::nullopt; + + TScanFlags flags; + if (item.GetIncludeBeginning()) { + flags |= EScanFlags::INCLUDE_BEGIN; + } + if (item.GetIncludeEnding()) { + flags |= EScanFlags::INCLUDE_END; + } + if (item.GetReverse()) { + flags |= EScanFlags::REVERSE; + } + + ui64 count = item.GetMaxKeys(); + + auto callback = [&](const TKey& key, const TValue& value) { + IssueResponseItem(cookie, key, value, lastResponseSize); + return --count != 0; + }; + + Self->Data->ScanRange(begin ? &begin.value() : nullptr, end ? &end.value() : nullptr, flags, callback); + } + } + + void IssueResponseItem(std::optional<ui64> cookie, const TKey& key, const TValue& value, size_t& lastResponseSize) { + NKikimrBlobDepot::TEvResolveResult::TResolvedKey item; + + if (cookie) { + item.SetCookie(*cookie); + } + item.SetKey(key.MakeBinaryKey()); + item.MutableValueChain()->CopyFrom(value.ValueChain); + if (value.Meta) { + item.SetMeta(value.Meta.data(), value.Meta.size()); + } + + size_t itemSize = item.ByteSizeLong(); + if (Outbox.empty() || lastResponseSize + itemSize > EventMaxByteSize) { + if (!Outbox.empty()) { + auto& lastEvent = Outbox.back(); + lastEvent->Record.SetStatus(NKikimrProto::OVERRUN); + } + auto ev = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt); + lastResponseSize = ev->CalculateSerializedSize(); + Outbox.push_back(std::move(ev)); + } + + auto& lastEvent = Outbox.back(); + item.Swap(lastEvent->Record.AddResolvedKeys()); + lastResponseSize += itemSize; + } + }; + + void TData::Handle(TEvBlobDepot::TEvResolve::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "TEvResolve", (TabletId, Self->TabletID()), (Msg, ev->Get()->ToString()), + (Sender, ev->Sender), (Cookie, ev->Cookie)); + Self->Execute(std::make_unique<TTxResolve>(Self, ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 6715a61b239..ec44bbcf8ef 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -30,6 +30,7 @@ namespace NKikimr::NBlobDepot { {} bool Execute(TTransactionContext& txc, const TActorContext&) override { + Y_VERIFY(Self->Data->IsLoaded()); Validate(); if (!Error) { auto& record = Request->Get()->Record; @@ -124,7 +125,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "DeleteKey", (TabletId, Self->TabletID()), (BlobId, id)); db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Delete(); auto updateTrash = [&](TLogoBlobID id) { - db.Table<Schema::Trash>().Key(TString(id.AsBinaryString())).Update(); + db.Table<Schema::Trash>().Key(id.AsBinaryString()).Update(); }; Self->Data->DeleteKey(key, updateTrash, this); ++NumKeysProcessed; @@ -185,7 +186,7 @@ namespace NKikimr::NBlobDepot { const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); auto& barrier = Barriers[key]; barrier.ProcessingQ.emplace_back(ev.Release()); - if (barrier.ProcessingQ.size() == 1) { + if (Self->Data->IsLoaded() && barrier.ProcessingQ.size() == 1) { Self->Execute(std::make_unique<TTxCollectGarbage>(Self, record.GetTabletId(), record.GetChannel())); } } @@ -202,4 +203,12 @@ namespace NKikimr::NBlobDepot { *underHard = it == Barriers.end() ? false : genStep <= it->second.Hard; } + void TBlobDepot::TBarrierServer::OnDataLoaded() { + for (auto& [key, barrier] : Barriers) { + if (!barrier.ProcessingQ.empty()) { + Self->Execute(std::make_unique<TTxCollectGarbage>(Self, key.first, key.second)); + } + } + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h index 3164828fda3..e398fcd9f11 100644 --- a/ydb/core/blob_depot/garbage_collection.h +++ b/ydb/core/blob_depot/garbage_collection.h @@ -31,6 +31,7 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); bool CheckBlobForBarrier(TLogoBlobID id) const; void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const; + void OnDataLoaded(); template<typename TCallback> void Enumerate(TCallback&& callback) { diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp index 2eb4163dcc3..eb6fd40b165 100644 --- a/ydb/core/blob_depot/mon_main.cpp +++ b/ydb/core/blob_depot/mon_main.cpp @@ -114,7 +114,7 @@ namespace NKikimr::NBlobDepot { Self->Data->ScanRange(nullptr, nullptr, 0, [&](const TData::TKey& key, const TData::TValue& value) { TABLER() { TABLED() { - key.Output(Stream, Self->Config); + key.Output(Stream); } TABLED() { bool first = true; diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index 469bd04b49d..0f1d3154123 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -58,23 +58,23 @@ namespace NKikimr::NBlobDepot { if (!CheckKeyAgainstBarrier(key)) { responseItem->SetStatus(NKikimrProto::ERROR); - responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString(Self->Config) + responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString() << " is being put beyond the barrier"); continue; } + TString valueData; + const bool success = value.SerializeToString(&valueData); + Y_VERIFY(success); + + db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData); + Self->Data->PutKey(std::move(key), { .Meta = value.GetMeta(), .ValueChain = std::move(*value.MutableValueChain()), .KeepState = value.GetKeepState(), .Public = value.GetPublic(), }); - - TString valueData; - const bool success = value.SerializeToString(&valueData); - Y_VERIFY(success); - - db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData); } return true; diff --git a/ydb/core/blob_depot/op_init_schema.cpp b/ydb/core/blob_depot/op_init_schema.cpp index 6e2ba92d639..cf67bebb3f6 100644 --- a/ydb/core/blob_depot/op_init_schema.cpp +++ b/ydb/core/blob_depot/op_init_schema.cpp @@ -1,4 +1,5 @@ #include "blob_depot_tablet.h" +#include "data.h" #include "schema.h" namespace NKikimr::NBlobDepot { diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index 6cf66a9f37d..e166186398e 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -76,68 +76,13 @@ namespace NKikimr::NBlobDepot { } } - // Data - { - auto table = db.Table<Schema::Data>().Select(); - if (!table.IsReady()) { - return false; - } - while (table.IsValid()) { - Self->Data->AddDataOnLoad( - TData::TKey::FromBinaryKey(table.GetValue<Schema::Data::Key>(), Self->Config), - table.GetValue<Schema::Data::Value>() - ); - if (!table.Next()) { - return false; - } - } - } - - // Trash - { - auto table = db.Table<Schema::Trash>().Select(); - if (!table.IsReady()) { - return false; - } - while (table.IsValid()) { - const TString& blobId = table.GetValue<Schema::Trash::BlobId>(); - Self->Data->AddTrashOnLoad(TLogoBlobID(reinterpret_cast<const ui64*>(blobId.data()))); - if (!table.Next()) { - return false; - } - } - } - - // GC - { - using T = Schema::GC; - auto table = db.Table<T>().Select(); - if (!table.IsReady()) { - return false; - } - while (table.IsValid()) { - Self->Data->AddGenStepOnLoad(table.GetValue<T::Channel>(), - table.GetValue<T::GroupId>(), - TGenStep(table.GetValueOrDefault<T::IssuedGenStep>()), - TGenStep(table.GetValueOrDefault<T::ConfirmedGenStep>())); - if (!table.Next()) { - return false; - } - } - } - return true; } bool Precharge(NIceDb::TNiceDb& db) { - auto config = db.Table<Schema::Config>().Select(); - auto blocks = db.Table<Schema::Blocks>().Select(); - auto barriers = db.Table<Schema::Barriers>().Select(); - auto data = db.Table<Schema::Data>().Select(); - auto trash = db.Table<Schema::Trash>().Select(); - auto confirmedGC = db.Table<Schema::GC>().Select(); - return config.IsReady() && blocks.IsReady() && barriers.IsReady() && data.IsReady() && trash.IsReady() && - confirmedGC.IsReady(); + return db.Table<Schema::Config>().Precharge() + & db.Table<Schema::Blocks>().Precharge() + & db.Table<Schema::Barriers>().Precharge(); } void Complete(const TActorContext&) override { @@ -146,10 +91,10 @@ namespace NKikimr::NBlobDepot { if (Configured) { Self->InitChannelKinds(); - Self->Data->HandleTrash(); } Self->OnLoadFinished(); + Self->Data->StartLoad(); // we need at least Config to start correct loading of data } }; diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp deleted file mode 100644 index d60b0522c2d..00000000000 --- a/ydb/core/blob_depot/op_resolve.cpp +++ /dev/null @@ -1,88 +0,0 @@ -#include "blob_depot_tablet.h" -#include "data.h" - -namespace NKikimr::NBlobDepot { - - void TBlobDepot::Handle(TEvBlobDepot::TEvResolve::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()), - (Sender, ev->Sender), (Recipient, ev->Recipient), (Cookie, ev->Cookie)); - - // collect records if needed - - auto response = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt); - ui32 messageSize = response->CalculateSerializedSize(); - auto sendMessage = [&](bool more) { - if (more) { - response->Record.SetStatus(NKikimrProto::OVERRUN); - } - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record)); - - auto handle = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie); - if (ev->InterconnectSession) { - handle->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); - } - TActivationContext::Send(handle.release()); - - if (more) { - response = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt); - messageSize = response->CalculateSerializedSize(); - } - }; - - ui32 itemIndex = 0; - for (const auto& item : ev->Get()->Record.GetItems()) { - TData::TKey begin; - TData::TKey end; - - if (item.HasBeginningKey()) { - begin = TData::TKey::FromBinaryKey(item.GetBeginningKey(), Config); - } - if (item.HasEndingKey()) { - end = TData::TKey::FromBinaryKey(item.GetEndingKey(), Config); - } - - ui32 numItems = 0; - auto addKey = [&](const TData::TKey& key, const TData::TValue& value) { - NKikimrBlobDepot::TEvResolveResult::TResolvedKey resolvedKey; - resolvedKey.SetItemIndex(itemIndex); - resolvedKey.SetKey(key.MakeBinaryKey()); - resolvedKey.MutableValueChain()->CopyFrom(value.ValueChain); - - if (value.Meta) { - resolvedKey.SetMeta(value.Meta.data(), value.Meta.size()); - } - - const ui32 keySize = resolvedKey.ByteSizeLong(); - if (messageSize + keySize > EventMaxByteSize) { - sendMessage(true); - } - - // put resolved key into the result - resolvedKey.Swap(response->Record.AddResolvedKeys()); - messageSize += keySize; - ++numItems; - - return !item.HasMaxKeys() || numItems != item.GetMaxKeys(); - }; - - TData::TScanFlags flags; - if (item.GetIncludeBeginning()) { - flags |= TData::EScanFlags::INCLUDE_BEGIN; - } - if (item.GetIncludeEnding()) { - flags |= TData::EScanFlags::INCLUDE_END; - } - if (item.GetReverse()) { - flags |= TData::EScanFlags::REVERSE; - } - - Data->ScanRange(item.HasBeginningKey() ? &begin : nullptr, item.HasEndingKey() ? &end : nullptr, flags, addKey); - - ++itemIndex; - } - - sendMessage(false); - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h index 52a3f157503..1f168e96a37 100644 --- a/ydb/core/blob_depot/schema.h +++ b/ydb/core/blob_depot/schema.h @@ -68,6 +68,9 @@ namespace NKikimr::NBlobDepot { Key, Value >; + + static constexpr ui64 PrechargeRows = 10'000; + static constexpr ui64 PrechargeBytes = 1'000'000; }; struct Trash : Table<5> { @@ -75,6 +78,9 @@ namespace NKikimr::NBlobDepot { using TKey = TableKey<BlobId>; using TColumns = TableColumns<BlobId>; + + static constexpr ui64 PrechargeRows = 10'000; + static constexpr ui64 PrechargeBytes = 1'000'000; }; struct GC : Table<6> { @@ -85,6 +91,9 @@ namespace NKikimr::NBlobDepot { using TKey = TableKey<Channel, GroupId>; using TColumns = TableColumns<Channel, GroupId, IssuedGenStep, ConfirmedGenStep>; + + static constexpr ui64 PrechargeRows = 10'000; + static constexpr ui64 PrechargeBytes = 1'000'000; }; using TTables = SchemaTables< diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 74b9f9d7dbc..cd49115f4ce 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -174,10 +174,11 @@ message TEvResolve { optional bool IncludeBeginning = 2 [default = true]; optional bytes EndingKey = 3; // end with the key beyond the last one (if not set) optional bool IncludeEnding = 4 [default = false]; - optional uint32 MaxKeys = 5 [default = 0]; + optional uint32 MaxKeys = 5 [default = 0]; // zero or unset value means infinite amount optional bool ReturnMeta = 6 [default = false]; optional bool ReturnOwners = 7 [default = false]; optional bool Reverse = 8 [default = false]; // reverse output + optional uint64 Cookie = 9; // request cookie to match response item } repeated TItem Items = 1; @@ -185,7 +186,7 @@ message TEvResolve { message TEvResolveResult { message TResolvedKey { - optional uint32 ItemIndex = 1; + optional uint64 Cookie = 1; optional bytes Key = 2; repeated TValueChain ValueChain = 3; optional bytes Meta = 4; |