diff options
author | stanly <stanly@yandex-team.com> | 2023-05-04 17:17:39 +0300 |
---|---|---|
committer | stanly <stanly@yandex-team.com> | 2023-05-04 17:17:39 +0300 |
commit | 140757450482b97591047d49221ef0a0f513a32d (patch) | |
tree | c454ef640b18d0ff467c2980092f91b3c9f7b04f | |
parent | df25b3d56ae727879aa8287b6ba1ba123c123cc0 (diff) | |
download | ydb-140757450482b97591047d49221ef0a0f513a32d.tar.gz |
pass loaded records via const ref
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_schema.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 32 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/columns_table.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/db_wrapper.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/db_wrapper.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/granules_table.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_insert_table.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 16 |
11 files changed, 55 insertions, 52 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 8e842e40ed1..5e5d70ca018 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -516,7 +516,7 @@ struct Schema : NIceDb::Schema { } static bool IndexGranules_Load(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine, - std::function<void(TGranuleRecord&&)> callback) { + const std::function<void(const TGranuleRecord&)>& callback) { auto rowset = db.Table<IndexGranules>().Prefix(index).Select(); if (!rowset.IsReady()) return false; @@ -528,8 +528,7 @@ struct Schema : NIceDb::Schema { ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>(); ui64 txId = rowset.GetValue<IndexGranules::TxId>(); - TGranuleRecord row(pathId, granule, {planStep, txId}, engine.DeserializeMark(indexKey)); - callback(std::move(row)); + callback(TGranuleRecord(pathId, granule, {planStep, txId}, engine.DeserializeMark(indexKey))); if (!rowset.Next()) return false; @@ -555,7 +554,7 @@ struct Schema : NIceDb::Schema { } static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index, - std::function<void(TColumnRecord&&)> callback) { + const std::function<void(const TColumnRecord&)>& callback) { auto rowset = db.Table<IndexColumns>().Prefix(index).Select(); if (!rowset.IsReady()) return false; @@ -579,7 +578,7 @@ struct Schema : NIceDb::Schema { TLogoBlobID logoBlobId((const ui64*)strBlobId.data()); row.BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId); - callback(std::move(row)); + callback(row); if (!rowset.Next()) return false; @@ -595,7 +594,7 @@ struct Schema : NIceDb::Schema { ); } - static bool IndexCounters_Load(NIceDb::TNiceDb& db, ui32 index, std::function<void(ui32 id, ui64 value)> callback) { + static bool IndexCounters_Load(NIceDb::TNiceDb& db, ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) { auto rowset = db.Table<IndexCounters>().Prefix(index).Select(); if (!rowset.IsReady()) return false; diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 5ea71878d00..b4c9c16c322 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -66,9 +66,7 @@ struct TMark { return Hash(); } - operator bool () const { - Y_FAIL("unexpected call"); - } + operator bool () const = delete; static TString Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema); static NArrow::TReplaceKey Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index c8bfe987bb1..7c699723c21 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -653,7 +653,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl } bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) { - auto callback = [&](TGranuleRecord&& rec) { + auto callback = [&](const TGranuleRecord& rec) { bool ok = SetGranule(rec, true); Y_VERIFY(ok); }; @@ -662,7 +662,7 @@ bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) { } bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) { - auto callback = [&](TColumnRecord&& row) { + auto callback = [&](const TColumnRecord& row) { lostBlobs.erase(row.BlobRange.BlobId); // We have such a blob in index. It isn't lost. AddColumnRecord(row); }; @@ -1309,24 +1309,27 @@ void TColumnEngineForLogs::FreeLocks(std::shared_ptr<TColumnEngineChanges> index } bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) { - TMark mark(rec.Mark); + const TMark mark(rec.Mark); - if (!apply) { + if (apply) { + // There should be only one granule with (PathId, Mark). + Y_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second); + + // Allocate granule info and ensure that there is no granule with same id inserted before. + Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec)).second); + } else { + // Granule with same id already exists. if (Granules.contains(rec.Granule)) { return false; } + // Granule with same (PathId, Mark) already exists. if (PathGranules.contains(rec.PathId) && PathGranules[rec.PathId].contains(mark)) { return false; } - return true; } - PathGranules[rec.PathId].emplace(mark, rec.Granule); - auto& spg = Granules[rec.Granule]; - Y_VERIFY(!spg); - spg = std::make_shared<TGranuleMeta>(rec); - return true; // It must return true if (apply == true) + return true; } void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) { @@ -1389,17 +1392,18 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap void TColumnEngineForLogs::AddColumnRecord(const TColumnRecord& rec) { Y_VERIFY(rec.Valid()); - auto& spg = Granules[rec.Granule]; + + const auto gi = Granules.find(rec.Granule); #if 0 - if (!spg) { + if (gi == Granules.end()) { LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId); Granules.erase(rec.Granule); return; } #else - Y_VERIFY(spg); + Y_VERIFY(gi != Granules.end()); #endif - auto& portionInfo = spg->Portions[rec.Portion]; + auto& portionInfo = gi->second->Portions[rec.Portion]; portionInfo.AddRecord(IndexInfo, rec); } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 36391221401..c23eedd1f7b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -17,6 +17,8 @@ class TCountersTable; /// Engine with 2 tables: /// - Granules: PK -> granules (use part of PK) /// - Columns: granule -> blobs +/// +/// @note One instance per tablet. class TColumnEngineForLogs : public IColumnEngine { public: class TMarksGranules { @@ -229,15 +231,15 @@ public: private: struct TGranuleMeta { - TGranuleRecord Record; + const TGranuleRecord Record; THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo - TGranuleMeta(const TGranuleRecord& rec) + explicit TGranuleMeta(const TGranuleRecord& rec) : Record(rec) {} - ui64 PathId() const { return Record.PathId; } - bool Empty() const { return Portions.empty(); } + ui64 PathId() const noexcept { return Record.PathId; } + bool Empty() const noexcept { return Portions.empty(); } }; TIndexInfo IndexInfo; @@ -282,6 +284,8 @@ private: bool ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply); void EraseGranule(ui64 pathId, ui64 granule, const TMark& mark); + + /// Insert granule or check if same granule was already inserted. bool SetGranule(const TGranuleRecord& rec, bool apply); bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h index dabf291aca7..951511d9ea5 100644 --- a/ydb/core/tx/columnshard/engines/columns_table.h +++ b/ydb/core/tx/columnshard/engines/columns_table.h @@ -104,7 +104,7 @@ public: db.EraseColumn(IndexId, row); } - bool Load(IDbWrapper& db, std::function<void(TColumnRecord&&)> callback) { + bool Load(IDbWrapper& db, std::function<void(const TColumnRecord&)> callback) { return db.LoadColumns(IndexId, callback); } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index a93981f4f85..e8cef6ca7e1 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -52,7 +52,7 @@ void TDbWrapper::EraseGranule(ui32 index, const IColumnEngine& engine, const TGr NColumnShard::Schema::IndexGranules_Erase(db, index, engine, row); } -bool TDbWrapper::LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) { +bool TDbWrapper::LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) { NIceDb::TNiceDb db(Database); return NColumnShard::Schema::IndexGranules_Load(db, index, engine, callback); } @@ -67,7 +67,7 @@ void TDbWrapper::EraseColumn(ui32 index, const TColumnRecord& row) { NColumnShard::Schema::IndexColumns_Erase(db, index, row); } -bool TDbWrapper::LoadColumns(ui32 index, std::function<void(TColumnRecord&&)> callback) { +bool TDbWrapper::LoadColumns(ui32 index, const std::function<void(const TColumnRecord&)>& callback) { NIceDb::TNiceDb db(Database); return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, index, callback); } @@ -77,7 +77,7 @@ void TDbWrapper::WriteCounter(ui32 index, ui32 counterId, ui64 value) { return NColumnShard::Schema::IndexCounters_Write(db, index, counterId, value); } -bool TDbWrapper::LoadCounters(ui32 index, std::function<void(ui32 id, ui64 value)> callback) { +bool TDbWrapper::LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) { NIceDb::TNiceDb db(Database); return NColumnShard::Schema::IndexCounters_Load(db, index, callback); } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index 632201ea998..b893c584733 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -30,14 +30,14 @@ public: virtual void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0; virtual void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0; - virtual bool LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) = 0; + virtual bool LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) = 0; virtual void WriteColumn(ui32 index, const TColumnRecord& row) = 0; virtual void EraseColumn(ui32 index, const TColumnRecord& row) = 0; - virtual bool LoadColumns(ui32 index, std::function<void(TColumnRecord&&)> callback) = 0; + virtual bool LoadColumns(ui32 index, const std::function<void(const TColumnRecord&)>& callback) = 0; virtual void WriteCounter(ui32 index, ui32 counterId, ui64 value) = 0; - virtual bool LoadCounters(ui32 index, std::function<void(ui32 id, ui64 value)> callback) = 0; + virtual bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) = 0; }; class TDbWrapper : public IDbWrapper { @@ -61,14 +61,14 @@ public: void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override; void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override; - bool LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) override; + bool LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) override; void WriteColumn(ui32 index, const TColumnRecord& row) override; void EraseColumn(ui32 index, const TColumnRecord& row) override; - bool LoadColumns(ui32 index, std::function<void(TColumnRecord&&)> callback) override; + bool LoadColumns(ui32 index, const std::function<void(const TColumnRecord&)>& callback) override; void WriteCounter(ui32 index, ui32 counterId, ui64 value) override; - bool LoadCounters(ui32 index, std::function<void(ui32 id, ui64 value)> callback) override; + bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) override; private: NTable::TDatabase& Database; diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h index e2ce21dbc74..176de66fa33 100644 --- a/ydb/core/tx/columnshard/engines/granules_table.h +++ b/ydb/core/tx/columnshard/engines/granules_table.h @@ -48,7 +48,7 @@ public: db.EraseGranule(IndexId, Engine, row); } - bool Load(IDbWrapper& db, std::function<void(TGranuleRecord&&)> callback) { + bool Load(IDbWrapper& db, const std::function<void(const TGranuleRecord&)>& callback) { return db.LoadGranules(IndexId, Engine, callback); } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 5db5aca2a05..40c220122c1 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -26,7 +26,7 @@ struct TPortionMeta { std::shared_ptr<arrow::Scalar> Min; std::shared_ptr<arrow::Scalar> Max; - bool HasMinMax() const { + bool HasMinMax() const noexcept { return Min.get() && Max.get(); } }; @@ -123,7 +123,7 @@ struct TPortionInfo { } ui64 BlobsBytes() const { - ui32 sum = 0; + ui64 sum = 0; for (auto& rec : Records) { sum += rec.BlobRange.Size; } diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index 72faa6ae759..0255cbc3d29 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -28,14 +28,14 @@ public: void WriteGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {} void EraseGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {} - bool LoadGranules(ui32, const IColumnEngine&, std::function<void(TGranuleRecord&&)>) override { return true; } + bool LoadGranules(ui32, const IColumnEngine&, const std::function<void(const TGranuleRecord&)>&) override { return true; } void WriteColumn(ui32, const TColumnRecord&) override {} void EraseColumn(ui32, const TColumnRecord&) override {} - bool LoadColumns(ui32, std::function<void(TColumnRecord&&)>) override { return true; } + bool LoadColumns(ui32, const std::function<void(const TColumnRecord&)>&) override { return true; } void WriteCounter(ui32, ui32, ui64) override {} - bool LoadCounters(ui32, std::function<void(ui32 id, ui64 value)>) override { return true; } + bool LoadCounters(ui32, const std::function<void(ui32 id, ui64 value)>&) override { return true; } }; } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 4634a912686..740c3766530 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -83,12 +83,11 @@ public: pathGranules.swap(filtered); } - bool LoadGranules(ui32 index, const IColumnEngine&, std::function<void(TGranuleRecord&&)> callback) override { + bool LoadGranules(ui32 index, const IColumnEngine&, const std::function<void(const TGranuleRecord&)>& callback) override { auto& granules = Indices[index].Granules; for (auto& [pathId, vec] : granules) { - for (auto& rec : vec) { - TGranuleRecord tmp = rec; - callback(std::move(rec)); + for (const auto& rec : vec) { + callback(rec); } } return true; @@ -123,12 +122,11 @@ public: columns.swap(filtered); } - bool LoadColumns(ui32 index, std::function<void(TColumnRecord&&)> callback) override { + bool LoadColumns(ui32 index, const std::function<void(const TColumnRecord&)>& callback) override { auto& columns = Indices[index].Columns; for (auto& [granule, vec] : columns) { - for (auto& rec : vec) { - TColumnRecord tmp = rec; - callback(std::move(rec)); + for (const auto& rec : vec) { + callback(rec); } } return true; @@ -139,7 +137,7 @@ public: counters[counterId] = value; } - bool LoadCounters(ui32 index, std::function<void(ui32 id, ui64 value)> callback) override { + bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) override { auto& counters = Indices[index].Counters; for (auto& [id, value] : counters) { callback(id, value); |