aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-05-05 21:03:28 +0300
committernsofya <nsofya@yandex-team.com>2023-05-05 21:03:28 +0300
commitde77cf349f6cbd1e0b7c70fa28234999233112fa (patch)
tree71cacbc61ee32b9066eedaf037fea1de9600e504
parent3b5a0620ae4bf7b16da5934b17f2ae4b6851865c (diff)
downloadydb-de77cf349f6cbd1e0b7c70fa28234999233112fa.tar.gz
Use UpdateDefaultSchema for index setup
Use UpdateDefaultSchema for index setup
-rw-r--r--ydb/core/tx/columnshard/columnshard_costs.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard_costs.h1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp33
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h9
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp42
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h6
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp24
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp7
8 files changed, 74 insertions, 54 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_costs.cpp b/ydb/core/tx/columnshard/columnshard_costs.cpp
index f9447ff905..42a71f5928 100644
--- a/ydb/core/tx/columnshard/columnshard_costs.cpp
+++ b/ydb/core/tx/columnshard/columnshard_costs.cpp
@@ -67,10 +67,8 @@ bool TKeyRanges::DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCost
return true;
}
-TKeyRangesBuilder::TKeyRangesBuilder(const TIndexInfo& indexInfo)
- : IndexInfo(indexInfo)
-{
- Constructor.InitColumns(NArrow::MakeArrowSchema(IndexInfo.GetPrimaryKey()));
+TKeyRangesBuilder::TKeyRangesBuilder(const TIndexInfo& indexInfo) {
+ Constructor.InitColumns(NArrow::MakeArrowSchema(indexInfo.GetPrimaryKey()));
}
NKikimr::NOlap::NCosts::TKeyRanges TKeyRangesBuilder::Build() {
diff --git a/ydb/core/tx/columnshard/columnshard_costs.h b/ydb/core/tx/columnshard/columnshard_costs.h
index bd35ab29d1..cde8f1a220 100644
--- a/ydb/core/tx/columnshard/columnshard_costs.h
+++ b/ydb/core/tx/columnshard/columnshard_costs.h
@@ -95,7 +95,6 @@ private:
NArrow::TRecordBatchConstructor Constructor;
TVector<TMarkRangeFeatures> Features;
- const TIndexInfo& IndexInfo;
bool AddMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p);
void AddMarkFromGranule(const TGranuleRecord& record);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index b691a939fb..9eeea89556 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -208,23 +208,12 @@ TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<ar
}
-TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits)
- : IndexInfo(std::move(info))
- , Limits(limits)
+TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits)
+ : Limits(limits)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0)
{
- /// @note Setting replace and sorting key to PK we are able to:
- /// * apply REPLACE by MergeSort
- /// * apply PK predicate before REPLACE
- IndexInfo.SetAllKeys();
- MarkSchema = IndexInfo.GetEffectiveKey();
-
- ui32 indexId = IndexInfo.GetId();
- GranulesTable = std::make_shared<TGranulesTable>(*this, indexId);
- ColumnsTable = std::make_shared<TColumnsTable>(indexId);
- CountersTable = std::make_shared<TCountersTable>(indexId);
}
ui64 TColumnEngineForLogs::MemoryUsage() const {
@@ -352,11 +341,15 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
}
void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) {
- // TODO(chertus): use step/txId for keeping older schema versions for older snapshots
Y_UNUSED(snapshot);
+ if (!GranulesTable) {
+ MarkSchema = info.GetEffectiveKey();
+ ui32 indexId = info.GetId();
+ GranulesTable = std::make_shared<TGranulesTable>(*this, indexId);
+ ColumnsTable = std::make_shared<TColumnsTable>(indexId);
+ CountersTable = std::make_shared<TCountersTable>(indexId);
+ }
IndexInfo = std::move(info);
- // copied from constructor above
- IndexInfo.SetAllKeys();
}
bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) {
@@ -440,12 +433,13 @@ bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) {
bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) {
return ColumnsTable->Load(db, [&](const TColumnRecord& rec) {
+ auto& indexInfo = GetIndexInfo();
Y_VERIFY(rec.Valid());
// Do not count the blob as lost since it exists in the index.
lostBlobs.erase(rec.BlobRange.BlobId);
// Locate granule and append the record.
if (const auto gi = Granules.find(rec.Granule); gi != Granules.end()) {
- gi->second->Portions[rec.Portion].AddRecord(IndexInfo, rec);
+ gi->second->Portions[rec.Portion].AddRecord(indexInfo, rec);
} else {
#if 0
LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId);
@@ -671,6 +665,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
ui64 dropBlobs = 0;
bool allowDrop = true;
+ auto& indexInfo = GetIndexInfo();
for (const auto& [pathId, ttl] : pathEviction) {
if (!PathGranules.contains(pathId)) {
continue; // It's not an error: allow TTL over multiple shards with different pathIds presented
@@ -681,7 +676,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
auto ttlColumnNames = ttl.GetTtlColumns();
Y_VERIFY(ttlColumnNames.size() == 1); // TODO: support different ttl columns
- ui32 ttlColumnId = IndexInfo.GetColumnId(*ttlColumnNames.begin());
+ ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin());
for (const auto& [ts, granule] : PathGranules[pathId]) {
auto spg = Granules[granule];
@@ -704,7 +699,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
TString tierName;
for (auto& tierRef : ttl.GetOrderedTiers()) { // TODO: lower/upper_bound + move into TEviction
auto& tierInfo = tierRef.Get();
- if (!IndexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
+ if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
continue; // Ignore tiers with bad ttl column
}
if (NArrow::ScalarLess(tierInfo.EvictScalar(schema), max)) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index d5f99fc98b..d63634bf9b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -170,9 +170,12 @@ public:
EVICT
};
- TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits = {});
+ TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits = {});
- const TIndexInfo& GetIndexInfo() const override { return IndexInfo; }
+ const TIndexInfo& GetIndexInfo() const override {
+ Y_VERIFY(IndexInfo);
+ return *IndexInfo;
+ }
const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override {
if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
@@ -232,7 +235,7 @@ private:
bool Empty() const noexcept { return Portions.empty(); }
};
- TIndexInfo IndexInfo;
+ std::optional<TIndexInfo> IndexInfo;
TCompactionLimits Limits;
ui64 TabletId;
std::shared_ptr<arrow::Schema> MarkSchema;
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 340e0c3a88..299fd63c45 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -237,22 +237,29 @@ std::shared_ptr<arrow::Schema> TIndexInfo::AddColumns(
return std::make_shared<arrow::Schema>(std::move(fields));
}
-std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const TVector<ui32>& columnIds) const {
- return MakeArrowSchema(Columns, columnIds);
+std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const TVector<ui32>& columnIds, bool withSpecials) const {
+ return MakeArrowSchema(Columns, columnIds, withSpecials);
}
-std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const TVector<TString>& names) const {
+TVector<ui32> TIndexInfo::GetColumnIds(const TVector<TString>& columnNames) const {
TVector<ui32> ids;
- ids.reserve(names.size());
- for (auto& name : names) {
- auto it = ColumnNames.find(name);
- if (it == ColumnNames.end()) {
+ ids.reserve(columnNames.size());
+ for (auto& name : columnNames) {
+ auto columnId = GetColumnIdOptional(name);
+ if (!columnId) {
return {};
}
- ids.emplace_back(it->second);
+ ids.emplace_back(*columnId);
}
+ return ids;
+}
- return MakeArrowSchema(Columns, ids);
+std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const TVector<TString>& names) const {
+ auto columnIds = GetColumnIds(names);
+ if (columnIds.empty()) {
+ return {};
+ }
+ return MakeArrowSchema(Columns, columnIds);
}
std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const {
@@ -260,6 +267,9 @@ std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const
}
void TIndexInfo::SetAllKeys() {
+ /// @note Setting replace and sorting key to PK we are able to:
+ /// * apply REPLACE by MergeSort
+ /// * apply PK predicate before REPLACE
const auto& primaryKeyNames = NamesOnly(GetPrimaryKey());
// Update set of required columns with names from primary key.
for (const auto& name: primaryKeyNames) {
@@ -320,21 +330,27 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
return MinMaxIdxColumnsIds.contains(it->second);
}
-std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids) {
+std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids, bool withSpecials) {
std::vector<std::shared_ptr<arrow::Field>> fields;
- fields.reserve(ids.size());
+ fields.reserve(withSpecials ? ids.size() + 2 : ids.size());
+
+ if (withSpecials) {
+ // Place special fields at the beginning of the schema.
+ fields.push_back(arrow::field(TIndexInfo::SPEC_COL_PLAN_STEP, arrow::uint64()));
+ fields.push_back(arrow::field(TIndexInfo::SPEC_COL_TX_ID, arrow::uint64()));
+ }
for (const ui32 id: ids) {
auto it = columns.find(id);
if (it == columns.end()) {
- return {};
+ continue;
}
const auto& column = it->second;
std::string colName(column.Name.data(), column.Name.size());
fields.emplace_back(std::make_shared<arrow::Field>(colName, NArrow::GetArrowType(column.PType)));
}
-
+
return std::make_shared<arrow::Schema>(std::move(fields));
}
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 6c17099052..ec28d2dc63 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -113,12 +113,14 @@ public:
Y_VERIFY(MinMaxIdxColumnsIds.contains(GetColumnId(ttlColumn)));
}
+ TVector<ui32> GetColumnIds(const TVector<TString>& columnNames) const;
+
std::shared_ptr<arrow::Schema> ArrowSchema() const;
std::shared_ptr<arrow::Schema> ArrowSchemaWithSpecials() const;
std::shared_ptr<arrow::Schema> AddColumns(const std::shared_ptr<arrow::Schema>& schema,
const TVector<TString>& columns) const;
- std::shared_ptr<arrow::Schema> ArrowSchema(const TVector<ui32>& columnIds) const;
+ std::shared_ptr<arrow::Schema> ArrowSchema(const TVector<ui32>& columnIds, bool withSpecials = false) const;
std::shared_ptr<arrow::Schema> ArrowSchema(const TVector<TString>& columnNames) const;
std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const;
std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const TString& metadata,
@@ -160,7 +162,7 @@ private:
TCompression DefaultCompression;
};
-std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids);
+std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids, bool withSpecials = false);
/// Extracts columns with the specific ids from the schema.
TVector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const TVector<ui32>& ids);
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index c371f446aa..bd956646bc 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -291,7 +291,8 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
bool Insert(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap,
TVector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) {
- TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
+ TColumnEngineForLogs engine(0, TestLimits());
+ engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -328,7 +329,8 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange,
TString>&& blobs, ui32& step, const TExpected& expected) {
- TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
+ TColumnEngineForLogs engine(0, TestLimits());
+ engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
return Compact(engine, db, snap, std::move(blobs), step, expected);
@@ -410,7 +412,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// load
- TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0);
+ TColumnEngineForLogs engine(0);
+ engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -509,7 +512,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// load
- TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
+ TColumnEngineForLogs engine(0, TestLimits());
+ engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -579,7 +583,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// inserts
ui64 planStep = 1;
- TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
+ TColumnEngineForLogs engine(0, TestLimits());
+ engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -610,7 +615,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
UNIT_ASSERT(overload);
{ // check it's overloaded after reload
- TColumnEngineForLogs tmpEngine(TIndexInfo(tableInfo), 0, TestLimits());
+ TColumnEngineForLogs tmpEngine(0, TestLimits());
+ tmpEngine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
tmpEngine.Load(db, lostBlobs);
UNIT_ASSERT(tmpEngine.GetOverloadedGranules(pathId));
}
@@ -641,7 +647,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
{ // check it's not overloaded after reload
- TColumnEngineForLogs tmpEngine(TIndexInfo(tableInfo), 0, TestLimits());
+ TColumnEngineForLogs tmpEngine(0, TestLimits());
+ tmpEngine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
tmpEngine.Load(db, lostBlobs);
UNIT_ASSERT(!tmpEngine.GetOverloadedGranules(pathId));
}
@@ -682,7 +689,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// load
- TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
+ TColumnEngineForLogs engine(0, TestLimits());
+ engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index c96c4c3d9e..f0cc7d8fcc 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -272,12 +272,11 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi
void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema) {
NOlap::TSnapshot snapshot{version.Step, version.TxId};
NOlap::TIndexInfo indexInfo = ConvertSchema(schema);
-
+ indexInfo.SetAllKeys();
if (!PrimaryIndex) {
- PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletId);
- } else {
- PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo));
+ PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId);
}
+ PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo));
for (auto& columnName : Ttl.TtlColumns()) {
PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName);