aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-12-28 16:20:55 +0300
committeralexvru <alexvru@ydb.tech>2022-12-28 16:20:55 +0300
commit7685775b0004801ef7e304eb195881a4831f4ec5 (patch)
treeb2d1bcbc406c170dfb2f57d00b93c5d706cc8a53
parent2612e0181f336d66c1d20040283a2bc333a6bba2 (diff)
downloadydb-7685775b0004801ef7e304eb195881a4831f4ec5.tar.gz
Fix loading bugs
-rw-r--r--ydb/core/blob_depot/data.cpp2
-rw-r--r--ydb/core/blob_depot/data_load.cpp162
-rw-r--r--ydb/core/blob_depot/data_trash.cpp1
-rw-r--r--ydb/core/blob_depot/op_load.cpp22
-rw-r--r--ydb/core/blob_depot/schema.h8
5 files changed, 76 insertions, 119 deletions
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 424d5dfceea..3bc3899d79a 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -544,7 +544,7 @@ namespace NKikimr::NBlobDepot {
}
void TData::TRecordsPerChannelGroup::CollectIfPossible(TData *self) {
- if (!CollectGarbageRequestInFlight && !Trash.empty()) {
+ if (!CollectGarbageRequestInFlight && !Trash.empty() && self->Loaded) {
self->HandleTrash(*this);
}
}
diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp
index d07f27f78a0..019cd93a16f 100644
--- a/ydb/core/blob_depot/data_load.cpp
+++ b/ydb/core/blob_depot/data_load.cpp
@@ -7,47 +7,23 @@ namespace NKikimr::NBlobDepot {
using TData = TBlobDepot::TData;
class TData::TTxDataLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- template<typename TTable_, typename TInProgressState_, typename TNextState_, bool Initial_>
- struct TItem {
- using TTable = TTable_;
- using TInProgressState = TInProgressState_;
- using TNextState = TNextState_;
- static constexpr bool Initial = Initial_;
- typename TTable::TKey::KeyValuesType Key;
- };
-
- struct TLoadFinished {};
- struct TLoadGcInProgress : TItem<Schema::GC, TLoadGcInProgress, TLoadFinished, false> {};
- struct TLoadGcBegin : TItem<Schema::GC, TLoadGcInProgress, TLoadFinished, true> {};
- struct TLoadTrashInProgress : TItem<Schema::Trash, TLoadTrashInProgress, TLoadGcBegin, false> {};
- struct TLoadTrashBegin : TItem<Schema::Trash, TLoadTrashInProgress, TLoadGcBegin, true> {};
- struct TLoadDataInProgress : TItem<Schema::Data, TLoadDataInProgress, TLoadTrashBegin, false> {};
- struct TLoadDataBegin : TItem<Schema::Data, TLoadDataInProgress, TLoadTrashBegin, true> {};
-
- using TLoadState = std::variant<
- TLoadDataBegin,
- TLoadDataInProgress,
- TLoadTrashBegin,
- TLoadTrashInProgress,
- TLoadGcBegin,
- TLoadGcInProgress,
- TLoadFinished
- >;
-
- TLoadState LoadState;
- bool SuccessorTx = false;
+ std::optional<TString> LastTrashKey;
+ std::optional<TString> LastDataKey;
+ bool TrashLoaded = false;
+ bool SuccessorTx = true;
public:
TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_DATA_LOAD; }
- TTxDataLoad(TBlobDepot *self, TLoadState loadState = TLoadDataBegin{})
+ TTxDataLoad(TBlobDepot *self)
: TTransactionBase(self)
- , LoadState(std::move(loadState))
{}
TTxDataLoad(TTxDataLoad& predecessor)
: TTransactionBase(predecessor.Self)
- , LoadState(std::move(predecessor.LoadState))
+ , LastTrashKey(std::move(predecessor.LastTrashKey))
+ , LastDataKey(std::move(predecessor.LastDataKey))
+ , TrashLoaded(predecessor.TrashLoaded)
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
@@ -55,95 +31,60 @@ namespace NKikimr::NBlobDepot {
NIceDb::TNiceDb db(txc.DB);
bool progress = false;
- auto visitor = [&](auto& item) { return ExecuteLoad(db, item, progress); };
- if (std::visit(visitor, LoadState)) {
- // load finished
- return true;
- } else if (progress) {
- // something was read, but not all
- SuccessorTx = true;
- return true;
- } else {
- // didn't read anything this time
- return false;
- }
- }
-
- template<typename T>
- bool ExecuteLoad(NIceDb::TNiceDb& db, T& item, bool& progress) {
- if constexpr (T::Initial) {
- return LoadTable(db, db.Table<typename T::TTable>().All(), item, progress);
- } else {
- auto greaterOrEqual = [&](auto&&... key) { return db.Table<typename T::TTable>().GreaterOrEqual(key...); };
- return LoadTable(db, std::apply(greaterOrEqual, item.Key), item, progress);
- }
- }
-
- bool ExecuteLoad(NIceDb::TNiceDb&, TLoadFinished&, bool&) {
- return true;
- }
-
- template<typename T, typename TItem>
- bool LoadTable(NIceDb::TNiceDb& db, T&& table, TItem& item, bool& progress) {
- if (!table.Precharge(TItem::TTable::PrechargeRows, TItem::TTable::PrechargeBytes)) {
- return false;
- }
-
- for (auto rowset = table.Select();; rowset.Next()) {
- if (!rowset.IsReady()) {
+ auto load = [&](auto t, auto& lastKey, auto callback) {
+ auto table = t.GreaterOrEqual(lastKey.value_or(TString()));
+ static constexpr ui64 PrechargeRows = 10'000;
+ static constexpr ui64 PrechargeBytes = 1'000'000;
+ if (!table.Precharge(PrechargeRows, PrechargeBytes)) {
return false;
- } else if (!rowset.IsValid()) {
- break;
}
-
- typename TItem::TTable::TKey::KeyValuesType key(rowset.GetKey());
- bool processRow = true;
- if constexpr (!TItem::Initial) {
- processRow = item.Key < key;
- Y_VERIFY_DEBUG(processRow || item.Key == key);
+ auto rows = table.Select();
+ if (!rows.IsReady()) {
+ return false;
}
- if (processRow) {
- progress = true;
- typename TItem::TInProgressState state;
- state.Key = std::move(key);
- LoadState = std::move(state);
- ProcessRow(rowset, static_cast<typename TItem::TTable*>(nullptr));
+ while (rows.IsValid()) {
+ if (auto key = rows.GetKey(); key != lastKey) {
+ callback(key, rows);
+ lastKey.emplace(std::move(key));
+ progress = true;
+ }
+ if (!rows.Next()) {
+ return false;
+ }
}
+ lastKey.reset();
+ return true;
+ };
+
+ if (!TrashLoaded) {
+ auto addTrash = [this](const auto& key, const auto& /*rows*/) {
+ Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(key));
+ };
+ if (!load(db.Table<Schema::Trash>(), LastTrashKey, addTrash)) {
+ return progress;
+ }
+ TrashLoaded = true;
}
- // table was read completely, advance to next state
- LoadState = typename TItem::TNextState{};
- return ExecuteLoad(db, std::get<typename TItem::TNextState>(LoadState), progress);
- }
-
- template<typename T>
- void ProcessRow(T&& row, Schema::Data*) {
- auto key = TData::TKey::FromBinaryKey(row.template GetValue<Schema::Data::Key>(), Self->Config);
- Self->Data->AddDataOnLoad(key, row.template GetValue<Schema::Data::Value>(),
- row.template GetValueOrDefault<Schema::Data::UncertainWrite>(), false);
- Y_VERIFY(!Self->Data->LastLoadedKey || *Self->Data->LastLoadedKey < key);
- Self->Data->LastLoadedKey = std::move(key);
- }
-
- template<typename T>
- void ProcessRow(T&& row, Schema::Trash*) {
- Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(row.template GetValue<Schema::Trash::BlobId>()));
- }
+ auto addData = [this](const auto& key, const auto& rows) {
+ auto k = TData::TKey::FromBinaryKey(key, Self->Config);
+ Self->Data->AddDataOnLoad(k, rows.template GetValue<Schema::Data::Value>(),
+ rows.template GetValueOrDefault<Schema::Data::UncertainWrite>(), false);
+ Y_VERIFY(!Self->Data->LastLoadedKey || *Self->Data->LastLoadedKey < k);
+ Self->Data->LastLoadedKey = std::move(k);
+ };
+ if (!load(db.Table<Schema::Data>(), LastDataKey, addData)) {
+ return progress;
+ }
- template<typename T>
- void ProcessRow(T&& row, Schema::GC*) {
- Self->Data->AddGenStepOnLoad(
- row.template GetValue<Schema::GC::Channel>(),
- row.template GetValue<Schema::GC::GroupId>(),
- TGenStep(row.template GetValueOrDefault<Schema::GC::IssuedGenStep>()),
- TGenStep(row.template GetValueOrDefault<Schema::GC::ConfirmedGenStep>())
- );
+ SuccessorTx = false; // everything loaded
+ return true;
}
void Complete(const TActorContext&) override {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "TData::TTxDataLoad::Complete", (Id, Self->GetLogId()),
- (SuccessorTx, SuccessorTx), (LoadState.index, LoadState.index()));
+ (TrashLoaded, TrashLoaded), (SuccessorTx, SuccessorTx));
if (SuccessorTx) {
Self->Execute(std::make_unique<TTxDataLoad>(*this));
@@ -161,6 +102,9 @@ namespace NKikimr::NBlobDepot {
Loaded = true;
LoadSkip.clear();
Self->OnDataLoadComplete();
+ for (auto& [key, record] : RecordsPerChannelGroup) {
+ record.CollectIfPossible(this);
+ }
}
void TBlobDepot::StartDataLoad() {
diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp
index 3fa045297e4..1e41c0df710 100644
--- a/ydb/core/blob_depot/data_trash.cpp
+++ b/ydb/core/blob_depot/data_trash.cpp
@@ -27,6 +27,7 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(!record.CollectGarbageRequestInFlight);
Y_VERIFY(!record.Trash.empty());
+ Y_VERIFY(Loaded); // we must have correct Trash and Used values
Y_VERIFY(record.Channel < Self->Channels.size());
auto& channel = Self->Channels[record.Channel];
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index 14d238b5421..5c96c38ad11 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -80,6 +80,25 @@ namespace NKikimr::NBlobDepot {
}
}
+ // GC
+ {
+ auto table = db.Table<Schema::GC>().Select();
+ if (!table.IsReady()) {
+ return false;
+ }
+ while (table.IsValid()) {
+ Self->Data->AddGenStepOnLoad(
+ table.GetValue<Schema::GC::Channel>(),
+ table.GetValue<Schema::GC::GroupId>(),
+ TGenStep(table.GetValueOrDefault<Schema::GC::IssuedGenStep>()),
+ TGenStep(table.GetValueOrDefault<Schema::GC::ConfirmedGenStep>())
+ );
+ if (!table.Next()) {
+ return false;
+ }
+ }
+ }
+
return true;
}
@@ -88,7 +107,8 @@ namespace NKikimr::NBlobDepot {
#pragma clang diagnostic ignored "-Wbitwise-instead-of-logical"
return db.Table<Schema::Config>().Precharge()
& db.Table<Schema::Blocks>().Precharge()
- & db.Table<Schema::Barriers>().Precharge();
+ & db.Table<Schema::Barriers>().Precharge()
+ & db.Table<Schema::GC>().Precharge();
#pragma clang diagnostic pop
}
diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h
index 22d55619f96..c19fe1a636e 100644
--- a/ydb/core/blob_depot/schema.h
+++ b/ydb/core/blob_depot/schema.h
@@ -84,8 +84,6 @@ namespace NKikimr::NBlobDepot {
UncertainWrite
>;
- static constexpr ui64 PrechargeRows = 10'000;
- static constexpr ui64 PrechargeBytes = 1'000'000;
using Precharge = NoAutoPrecharge;
};
@@ -95,8 +93,6 @@ namespace NKikimr::NBlobDepot {
using TKey = TableKey<BlobId>;
using TColumns = TableColumns<BlobId>;
- static constexpr ui64 PrechargeRows = 10'000;
- static constexpr ui64 PrechargeBytes = 1'000'000;
using Precharge = NoAutoPrecharge;
};
@@ -108,10 +104,6 @@ namespace NKikimr::NBlobDepot {
using TKey = TableKey<Channel, GroupId>;
using TColumns = TableColumns<Channel, GroupId, IssuedGenStep, ConfirmedGenStep>;
-
- static constexpr ui64 PrechargeRows = 10'000;
- static constexpr ui64 PrechargeBytes = 1'000'000;
- using Precharge = NoAutoPrecharge;
};
using TTables = SchemaTables<