aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <stanly@yandex-team.com>2023-05-04 17:17:39 +0300
committerstanly <stanly@yandex-team.com>2023-05-04 17:17:39 +0300
commit140757450482b97591047d49221ef0a0f513a32d (patch)
treec454ef640b18d0ff467c2980092f91b3c9f7b04f
parentdf25b3d56ae727879aa8287b6ba1ba123c123cc0 (diff)
downloadydb-140757450482b97591047d49221ef0a0f513a32d.tar.gz
pass loaded records via const ref
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h11
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp32
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h12
-rw-r--r--ydb/core/tx/columnshard/engines/columns_table.h2
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.h12
-rw-r--r--ydb/core/tx/columnshard/engines/granules_table.h2
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h4
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp16
11 files changed, 55 insertions, 52 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 8e842e40ed1..5e5d70ca018 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -516,7 +516,7 @@ struct Schema : NIceDb::Schema {
}
static bool IndexGranules_Load(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine,
- std::function<void(TGranuleRecord&&)> callback) {
+ const std::function<void(const TGranuleRecord&)>& callback) {
auto rowset = db.Table<IndexGranules>().Prefix(index).Select();
if (!rowset.IsReady())
return false;
@@ -528,8 +528,7 @@ struct Schema : NIceDb::Schema {
ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>();
ui64 txId = rowset.GetValue<IndexGranules::TxId>();
- TGranuleRecord row(pathId, granule, {planStep, txId}, engine.DeserializeMark(indexKey));
- callback(std::move(row));
+ callback(TGranuleRecord(pathId, granule, {planStep, txId}, engine.DeserializeMark(indexKey)));
if (!rowset.Next())
return false;
@@ -555,7 +554,7 @@ struct Schema : NIceDb::Schema {
}
static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index,
- std::function<void(TColumnRecord&&)> callback) {
+ const std::function<void(const TColumnRecord&)>& callback) {
auto rowset = db.Table<IndexColumns>().Prefix(index).Select();
if (!rowset.IsReady())
return false;
@@ -579,7 +578,7 @@ struct Schema : NIceDb::Schema {
TLogoBlobID logoBlobId((const ui64*)strBlobId.data());
row.BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
- callback(std::move(row));
+ callback(row);
if (!rowset.Next())
return false;
@@ -595,7 +594,7 @@ struct Schema : NIceDb::Schema {
);
}
- static bool IndexCounters_Load(NIceDb::TNiceDb& db, ui32 index, std::function<void(ui32 id, ui64 value)> callback) {
+ static bool IndexCounters_Load(NIceDb::TNiceDb& db, ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) {
auto rowset = db.Table<IndexCounters>().Prefix(index).Select();
if (!rowset.IsReady())
return false;
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 5ea71878d00..b4c9c16c322 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -66,9 +66,7 @@ struct TMark {
return Hash();
}
- operator bool () const {
- Y_FAIL("unexpected call");
- }
+ operator bool () const = delete;
static TString Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema);
static NArrow::TReplaceKey Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index c8bfe987bb1..7c699723c21 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -653,7 +653,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
}
bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) {
- auto callback = [&](TGranuleRecord&& rec) {
+ auto callback = [&](const TGranuleRecord& rec) {
bool ok = SetGranule(rec, true);
Y_VERIFY(ok);
};
@@ -662,7 +662,7 @@ bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) {
}
bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) {
- auto callback = [&](TColumnRecord&& row) {
+ auto callback = [&](const TColumnRecord& row) {
lostBlobs.erase(row.BlobRange.BlobId); // We have such a blob in index. It isn't lost.
AddColumnRecord(row);
};
@@ -1309,24 +1309,27 @@ void TColumnEngineForLogs::FreeLocks(std::shared_ptr<TColumnEngineChanges> index
}
bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) {
- TMark mark(rec.Mark);
+ const TMark mark(rec.Mark);
- if (!apply) {
+ if (apply) {
+ // There should be only one granule with (PathId, Mark).
+ Y_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second);
+
+ // Allocate granule info and ensure that there is no granule with same id inserted before.
+ Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec)).second);
+ } else {
+ // Granule with same id already exists.
if (Granules.contains(rec.Granule)) {
return false;
}
+ // Granule with same (PathId, Mark) already exists.
if (PathGranules.contains(rec.PathId) && PathGranules[rec.PathId].contains(mark)) {
return false;
}
- return true;
}
- PathGranules[rec.PathId].emplace(mark, rec.Granule);
- auto& spg = Granules[rec.Granule];
- Y_VERIFY(!spg);
- spg = std::make_shared<TGranuleMeta>(rec);
- return true; // It must return true if (apply == true)
+ return true;
}
void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) {
@@ -1389,17 +1392,18 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap
void TColumnEngineForLogs::AddColumnRecord(const TColumnRecord& rec) {
Y_VERIFY(rec.Valid());
- auto& spg = Granules[rec.Granule];
+
+ const auto gi = Granules.find(rec.Granule);
#if 0
- if (!spg) {
+ if (gi == Granules.end()) {
LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId);
Granules.erase(rec.Granule);
return;
}
#else
- Y_VERIFY(spg);
+ Y_VERIFY(gi != Granules.end());
#endif
- auto& portionInfo = spg->Portions[rec.Portion];
+ auto& portionInfo = gi->second->Portions[rec.Portion];
portionInfo.AddRecord(IndexInfo, rec);
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 36391221401..c23eedd1f7b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -17,6 +17,8 @@ class TCountersTable;
/// Engine with 2 tables:
/// - Granules: PK -> granules (use part of PK)
/// - Columns: granule -> blobs
+///
+/// @note One instance per tablet.
class TColumnEngineForLogs : public IColumnEngine {
public:
class TMarksGranules {
@@ -229,15 +231,15 @@ public:
private:
struct TGranuleMeta {
- TGranuleRecord Record;
+ const TGranuleRecord Record;
THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo
- TGranuleMeta(const TGranuleRecord& rec)
+ explicit TGranuleMeta(const TGranuleRecord& rec)
: Record(rec)
{}
- ui64 PathId() const { return Record.PathId; }
- bool Empty() const { return Portions.empty(); }
+ ui64 PathId() const noexcept { return Record.PathId; }
+ bool Empty() const noexcept { return Portions.empty(); }
};
TIndexInfo IndexInfo;
@@ -282,6 +284,8 @@ private:
bool ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply);
void EraseGranule(ui64 pathId, ui64 granule, const TMark& mark);
+
+ /// Insert granule or check if same granule was already inserted.
bool SetGranule(const TGranuleRecord& rec, bool apply);
bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h
index dabf291aca7..951511d9ea5 100644
--- a/ydb/core/tx/columnshard/engines/columns_table.h
+++ b/ydb/core/tx/columnshard/engines/columns_table.h
@@ -104,7 +104,7 @@ public:
db.EraseColumn(IndexId, row);
}
- bool Load(IDbWrapper& db, std::function<void(TColumnRecord&&)> callback) {
+ bool Load(IDbWrapper& db, std::function<void(const TColumnRecord&)> callback) {
return db.LoadColumns(IndexId, callback);
}
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index a93981f4f85..e8cef6ca7e1 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -52,7 +52,7 @@ void TDbWrapper::EraseGranule(ui32 index, const IColumnEngine& engine, const TGr
NColumnShard::Schema::IndexGranules_Erase(db, index, engine, row);
}
-bool TDbWrapper::LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) {
+bool TDbWrapper::LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::IndexGranules_Load(db, index, engine, callback);
}
@@ -67,7 +67,7 @@ void TDbWrapper::EraseColumn(ui32 index, const TColumnRecord& row) {
NColumnShard::Schema::IndexColumns_Erase(db, index, row);
}
-bool TDbWrapper::LoadColumns(ui32 index, std::function<void(TColumnRecord&&)> callback) {
+bool TDbWrapper::LoadColumns(ui32 index, const std::function<void(const TColumnRecord&)>& callback) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, index, callback);
}
@@ -77,7 +77,7 @@ void TDbWrapper::WriteCounter(ui32 index, ui32 counterId, ui64 value) {
return NColumnShard::Schema::IndexCounters_Write(db, index, counterId, value);
}
-bool TDbWrapper::LoadCounters(ui32 index, std::function<void(ui32 id, ui64 value)> callback) {
+bool TDbWrapper::LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::IndexCounters_Load(db, index, callback);
}
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h
index 632201ea998..b893c584733 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.h
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.h
@@ -30,14 +30,14 @@ public:
virtual void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0;
virtual void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0;
- virtual bool LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) = 0;
+ virtual bool LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) = 0;
virtual void WriteColumn(ui32 index, const TColumnRecord& row) = 0;
virtual void EraseColumn(ui32 index, const TColumnRecord& row) = 0;
- virtual bool LoadColumns(ui32 index, std::function<void(TColumnRecord&&)> callback) = 0;
+ virtual bool LoadColumns(ui32 index, const std::function<void(const TColumnRecord&)>& callback) = 0;
virtual void WriteCounter(ui32 index, ui32 counterId, ui64 value) = 0;
- virtual bool LoadCounters(ui32 index, std::function<void(ui32 id, ui64 value)> callback) = 0;
+ virtual bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) = 0;
};
class TDbWrapper : public IDbWrapper {
@@ -61,14 +61,14 @@ public:
void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override;
void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override;
- bool LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) override;
+ bool LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) override;
void WriteColumn(ui32 index, const TColumnRecord& row) override;
void EraseColumn(ui32 index, const TColumnRecord& row) override;
- bool LoadColumns(ui32 index, std::function<void(TColumnRecord&&)> callback) override;
+ bool LoadColumns(ui32 index, const std::function<void(const TColumnRecord&)>& callback) override;
void WriteCounter(ui32 index, ui32 counterId, ui64 value) override;
- bool LoadCounters(ui32 index, std::function<void(ui32 id, ui64 value)> callback) override;
+ bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) override;
private:
NTable::TDatabase& Database;
diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h
index e2ce21dbc74..176de66fa33 100644
--- a/ydb/core/tx/columnshard/engines/granules_table.h
+++ b/ydb/core/tx/columnshard/engines/granules_table.h
@@ -48,7 +48,7 @@ public:
db.EraseGranule(IndexId, Engine, row);
}
- bool Load(IDbWrapper& db, std::function<void(TGranuleRecord&&)> callback) {
+ bool Load(IDbWrapper& db, const std::function<void(const TGranuleRecord&)>& callback) {
return db.LoadGranules(IndexId, Engine, callback);
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 5db5aca2a05..40c220122c1 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -26,7 +26,7 @@ struct TPortionMeta {
std::shared_ptr<arrow::Scalar> Min;
std::shared_ptr<arrow::Scalar> Max;
- bool HasMinMax() const {
+ bool HasMinMax() const noexcept {
return Min.get() && Max.get();
}
};
@@ -123,7 +123,7 @@ struct TPortionInfo {
}
ui64 BlobsBytes() const {
- ui32 sum = 0;
+ ui64 sum = 0;
for (auto& rec : Records) {
sum += rec.BlobRange.Size;
}
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index 72faa6ae759..0255cbc3d29 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -28,14 +28,14 @@ public:
void WriteGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {}
void EraseGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {}
- bool LoadGranules(ui32, const IColumnEngine&, std::function<void(TGranuleRecord&&)>) override { return true; }
+ bool LoadGranules(ui32, const IColumnEngine&, const std::function<void(const TGranuleRecord&)>&) override { return true; }
void WriteColumn(ui32, const TColumnRecord&) override {}
void EraseColumn(ui32, const TColumnRecord&) override {}
- bool LoadColumns(ui32, std::function<void(TColumnRecord&&)>) override { return true; }
+ bool LoadColumns(ui32, const std::function<void(const TColumnRecord&)>&) override { return true; }
void WriteCounter(ui32, ui32, ui64) override {}
- bool LoadCounters(ui32, std::function<void(ui32 id, ui64 value)>) override { return true; }
+ bool LoadCounters(ui32, const std::function<void(ui32 id, ui64 value)>&) override { return true; }
};
}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 4634a912686..740c3766530 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -83,12 +83,11 @@ public:
pathGranules.swap(filtered);
}
- bool LoadGranules(ui32 index, const IColumnEngine&, std::function<void(TGranuleRecord&&)> callback) override {
+ bool LoadGranules(ui32 index, const IColumnEngine&, const std::function<void(const TGranuleRecord&)>& callback) override {
auto& granules = Indices[index].Granules;
for (auto& [pathId, vec] : granules) {
- for (auto& rec : vec) {
- TGranuleRecord tmp = rec;
- callback(std::move(rec));
+ for (const auto& rec : vec) {
+ callback(rec);
}
}
return true;
@@ -123,12 +122,11 @@ public:
columns.swap(filtered);
}
- bool LoadColumns(ui32 index, std::function<void(TColumnRecord&&)> callback) override {
+ bool LoadColumns(ui32 index, const std::function<void(const TColumnRecord&)>& callback) override {
auto& columns = Indices[index].Columns;
for (auto& [granule, vec] : columns) {
- for (auto& rec : vec) {
- TColumnRecord tmp = rec;
- callback(std::move(rec));
+ for (const auto& rec : vec) {
+ callback(rec);
}
}
return true;
@@ -139,7 +137,7 @@ public:
counters[counterId] = value;
}
- bool LoadCounters(ui32 index, std::function<void(ui32 id, ui64 value)> callback) override {
+ bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) override {
auto& counters = Indices[index].Counters;
for (auto& [id, value] : counters) {
callback(id, value);