diff options
author | alexvru <alexvru@ydb.tech> | 2022-12-28 16:20:55 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-12-28 16:20:55 +0300 |
commit | 7685775b0004801ef7e304eb195881a4831f4ec5 (patch) | |
tree | b2d1bcbc406c170dfb2f57d00b93c5d706cc8a53 | |
parent | 2612e0181f336d66c1d20040283a2bc333a6bba2 (diff) | |
download | ydb-7685775b0004801ef7e304eb195881a4831f4ec5.tar.gz |
Fix loading bugs
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_load.cpp | 162 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_trash.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_load.cpp | 22 | ||||
-rw-r--r-- | ydb/core/blob_depot/schema.h | 8 |
5 files changed, 76 insertions, 119 deletions
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 424d5dfceea..3bc3899d79a 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -544,7 +544,7 @@ namespace NKikimr::NBlobDepot { } void TData::TRecordsPerChannelGroup::CollectIfPossible(TData *self) { - if (!CollectGarbageRequestInFlight && !Trash.empty()) { + if (!CollectGarbageRequestInFlight && !Trash.empty() && self->Loaded) { self->HandleTrash(*this); } } diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index d07f27f78a0..019cd93a16f 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -7,47 +7,23 @@ namespace NKikimr::NBlobDepot { using TData = TBlobDepot::TData; class TData::TTxDataLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - template<typename TTable_, typename TInProgressState_, typename TNextState_, bool Initial_> - struct TItem { - using TTable = TTable_; - using TInProgressState = TInProgressState_; - using TNextState = TNextState_; - static constexpr bool Initial = Initial_; - typename TTable::TKey::KeyValuesType Key; - }; - - struct TLoadFinished {}; - struct TLoadGcInProgress : TItem<Schema::GC, TLoadGcInProgress, TLoadFinished, false> {}; - struct TLoadGcBegin : TItem<Schema::GC, TLoadGcInProgress, TLoadFinished, true> {}; - struct TLoadTrashInProgress : TItem<Schema::Trash, TLoadTrashInProgress, TLoadGcBegin, false> {}; - struct TLoadTrashBegin : TItem<Schema::Trash, TLoadTrashInProgress, TLoadGcBegin, true> {}; - struct TLoadDataInProgress : TItem<Schema::Data, TLoadDataInProgress, TLoadTrashBegin, false> {}; - struct TLoadDataBegin : TItem<Schema::Data, TLoadDataInProgress, TLoadTrashBegin, true> {}; - - using TLoadState = std::variant< - TLoadDataBegin, - TLoadDataInProgress, - TLoadTrashBegin, - TLoadTrashInProgress, - TLoadGcBegin, - TLoadGcInProgress, - TLoadFinished - >; - - TLoadState LoadState; - bool SuccessorTx = false; + std::optional<TString> LastTrashKey; + std::optional<TString> LastDataKey; + bool TrashLoaded = false; + bool SuccessorTx = true; public: TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_DATA_LOAD; } - TTxDataLoad(TBlobDepot *self, TLoadState loadState = TLoadDataBegin{}) + TTxDataLoad(TBlobDepot *self) : TTransactionBase(self) - , LoadState(std::move(loadState)) {} TTxDataLoad(TTxDataLoad& predecessor) : TTransactionBase(predecessor.Self) - , LoadState(std::move(predecessor.LoadState)) + , LastTrashKey(std::move(predecessor.LastTrashKey)) + , LastDataKey(std::move(predecessor.LastDataKey)) + , TrashLoaded(predecessor.TrashLoaded) {} bool Execute(TTransactionContext& txc, const TActorContext&) override { @@ -55,95 +31,60 @@ namespace NKikimr::NBlobDepot { NIceDb::TNiceDb db(txc.DB); bool progress = false; - auto visitor = [&](auto& item) { return ExecuteLoad(db, item, progress); }; - if (std::visit(visitor, LoadState)) { - // load finished - return true; - } else if (progress) { - // something was read, but not all - SuccessorTx = true; - return true; - } else { - // didn't read anything this time - return false; - } - } - - template<typename T> - bool ExecuteLoad(NIceDb::TNiceDb& db, T& item, bool& progress) { - if constexpr (T::Initial) { - return LoadTable(db, db.Table<typename T::TTable>().All(), item, progress); - } else { - auto greaterOrEqual = [&](auto&&... key) { return db.Table<typename T::TTable>().GreaterOrEqual(key...); }; - return LoadTable(db, std::apply(greaterOrEqual, item.Key), item, progress); - } - } - - bool ExecuteLoad(NIceDb::TNiceDb&, TLoadFinished&, bool&) { - return true; - } - - template<typename T, typename TItem> - bool LoadTable(NIceDb::TNiceDb& db, T&& table, TItem& item, bool& progress) { - if (!table.Precharge(TItem::TTable::PrechargeRows, TItem::TTable::PrechargeBytes)) { - return false; - } - - for (auto rowset = table.Select();; rowset.Next()) { - if (!rowset.IsReady()) { + auto load = [&](auto t, auto& lastKey, auto callback) { + auto table = t.GreaterOrEqual(lastKey.value_or(TString())); + static constexpr ui64 PrechargeRows = 10'000; + static constexpr ui64 PrechargeBytes = 1'000'000; + if (!table.Precharge(PrechargeRows, PrechargeBytes)) { return false; - } else if (!rowset.IsValid()) { - break; } - - typename TItem::TTable::TKey::KeyValuesType key(rowset.GetKey()); - bool processRow = true; - if constexpr (!TItem::Initial) { - processRow = item.Key < key; - Y_VERIFY_DEBUG(processRow || item.Key == key); + auto rows = table.Select(); + if (!rows.IsReady()) { + return false; } - if (processRow) { - progress = true; - typename TItem::TInProgressState state; - state.Key = std::move(key); - LoadState = std::move(state); - ProcessRow(rowset, static_cast<typename TItem::TTable*>(nullptr)); + while (rows.IsValid()) { + if (auto key = rows.GetKey(); key != lastKey) { + callback(key, rows); + lastKey.emplace(std::move(key)); + progress = true; + } + if (!rows.Next()) { + return false; + } } + lastKey.reset(); + return true; + }; + + if (!TrashLoaded) { + auto addTrash = [this](const auto& key, const auto& /*rows*/) { + Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(key)); + }; + if (!load(db.Table<Schema::Trash>(), LastTrashKey, addTrash)) { + return progress; + } + TrashLoaded = true; } - // table was read completely, advance to next state - LoadState = typename TItem::TNextState{}; - return ExecuteLoad(db, std::get<typename TItem::TNextState>(LoadState), progress); - } - - template<typename T> - void ProcessRow(T&& row, Schema::Data*) { - auto key = TData::TKey::FromBinaryKey(row.template GetValue<Schema::Data::Key>(), Self->Config); - Self->Data->AddDataOnLoad(key, row.template GetValue<Schema::Data::Value>(), - row.template GetValueOrDefault<Schema::Data::UncertainWrite>(), false); - Y_VERIFY(!Self->Data->LastLoadedKey || *Self->Data->LastLoadedKey < key); - Self->Data->LastLoadedKey = std::move(key); - } - - template<typename T> - void ProcessRow(T&& row, Schema::Trash*) { - Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(row.template GetValue<Schema::Trash::BlobId>())); - } + auto addData = [this](const auto& key, const auto& rows) { + auto k = TData::TKey::FromBinaryKey(key, Self->Config); + Self->Data->AddDataOnLoad(k, rows.template GetValue<Schema::Data::Value>(), + rows.template GetValueOrDefault<Schema::Data::UncertainWrite>(), false); + Y_VERIFY(!Self->Data->LastLoadedKey || *Self->Data->LastLoadedKey < k); + Self->Data->LastLoadedKey = std::move(k); + }; + if (!load(db.Table<Schema::Data>(), LastDataKey, addData)) { + return progress; + } - template<typename T> - void ProcessRow(T&& row, Schema::GC*) { - Self->Data->AddGenStepOnLoad( - row.template GetValue<Schema::GC::Channel>(), - row.template GetValue<Schema::GC::GroupId>(), - TGenStep(row.template GetValueOrDefault<Schema::GC::IssuedGenStep>()), - TGenStep(row.template GetValueOrDefault<Schema::GC::ConfirmedGenStep>()) - ); + SuccessorTx = false; // everything loaded + return true; } void Complete(const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "TData::TTxDataLoad::Complete", (Id, Self->GetLogId()), - (SuccessorTx, SuccessorTx), (LoadState.index, LoadState.index())); + (TrashLoaded, TrashLoaded), (SuccessorTx, SuccessorTx)); if (SuccessorTx) { Self->Execute(std::make_unique<TTxDataLoad>(*this)); @@ -161,6 +102,9 @@ namespace NKikimr::NBlobDepot { Loaded = true; LoadSkip.clear(); Self->OnDataLoadComplete(); + for (auto& [key, record] : RecordsPerChannelGroup) { + record.CollectIfPossible(this); + } } void TBlobDepot::StartDataLoad() { diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp index 3fa045297e4..1e41c0df710 100644 --- a/ydb/core/blob_depot/data_trash.cpp +++ b/ydb/core/blob_depot/data_trash.cpp @@ -27,6 +27,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(!record.CollectGarbageRequestInFlight); Y_VERIFY(!record.Trash.empty()); + Y_VERIFY(Loaded); // we must have correct Trash and Used values Y_VERIFY(record.Channel < Self->Channels.size()); auto& channel = Self->Channels[record.Channel]; diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index 14d238b5421..5c96c38ad11 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -80,6 +80,25 @@ namespace NKikimr::NBlobDepot { } } + // GC + { + auto table = db.Table<Schema::GC>().Select(); + if (!table.IsReady()) { + return false; + } + while (table.IsValid()) { + Self->Data->AddGenStepOnLoad( + table.GetValue<Schema::GC::Channel>(), + table.GetValue<Schema::GC::GroupId>(), + TGenStep(table.GetValueOrDefault<Schema::GC::IssuedGenStep>()), + TGenStep(table.GetValueOrDefault<Schema::GC::ConfirmedGenStep>()) + ); + if (!table.Next()) { + return false; + } + } + } + return true; } @@ -88,7 +107,8 @@ namespace NKikimr::NBlobDepot { #pragma clang diagnostic ignored "-Wbitwise-instead-of-logical" return db.Table<Schema::Config>().Precharge() & db.Table<Schema::Blocks>().Precharge() - & db.Table<Schema::Barriers>().Precharge(); + & db.Table<Schema::Barriers>().Precharge() + & db.Table<Schema::GC>().Precharge(); #pragma clang diagnostic pop } diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h index 22d55619f96..c19fe1a636e 100644 --- a/ydb/core/blob_depot/schema.h +++ b/ydb/core/blob_depot/schema.h @@ -84,8 +84,6 @@ namespace NKikimr::NBlobDepot { UncertainWrite >; - static constexpr ui64 PrechargeRows = 10'000; - static constexpr ui64 PrechargeBytes = 1'000'000; using Precharge = NoAutoPrecharge; }; @@ -95,8 +93,6 @@ namespace NKikimr::NBlobDepot { using TKey = TableKey<BlobId>; using TColumns = TableColumns<BlobId>; - static constexpr ui64 PrechargeRows = 10'000; - static constexpr ui64 PrechargeBytes = 1'000'000; using Precharge = NoAutoPrecharge; }; @@ -108,10 +104,6 @@ namespace NKikimr::NBlobDepot { using TKey = TableKey<Channel, GroupId>; using TColumns = TableColumns<Channel, GroupId, IssuedGenStep, ConfirmedGenStep>; - - static constexpr ui64 PrechargeRows = 10'000; - static constexpr ui64 PrechargeBytes = 1'000'000; - using Precharge = NoAutoPrecharge; }; using TTables = SchemaTables< |