diff options
author | nsofya <nsofya@yandex-team.com> | 2023-05-05 21:03:28 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-05-05 21:03:28 +0300 |
commit | de77cf349f6cbd1e0b7c70fa28234999233112fa (patch) | |
tree | 71cacbc61ee32b9066eedaf037fea1de9600e504 | |
parent | 3b5a0620ae4bf7b16da5934b17f2ae4b6851865c (diff) | |
download | ydb-de77cf349f6cbd1e0b7c70fa28234999233112fa.tar.gz |
Use UpdateDefaultSchema for index setup
Use UpdateDefaultSchema for index setup
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_costs.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_costs.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 33 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.cpp | 42 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.cpp | 7 |
8 files changed, 74 insertions, 54 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_costs.cpp b/ydb/core/tx/columnshard/columnshard_costs.cpp index f9447ff905..42a71f5928 100644 --- a/ydb/core/tx/columnshard/columnshard_costs.cpp +++ b/ydb/core/tx/columnshard/columnshard_costs.cpp @@ -67,10 +67,8 @@ bool TKeyRanges::DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCost return true; } -TKeyRangesBuilder::TKeyRangesBuilder(const TIndexInfo& indexInfo) - : IndexInfo(indexInfo) -{ - Constructor.InitColumns(NArrow::MakeArrowSchema(IndexInfo.GetPrimaryKey())); +TKeyRangesBuilder::TKeyRangesBuilder(const TIndexInfo& indexInfo) { + Constructor.InitColumns(NArrow::MakeArrowSchema(indexInfo.GetPrimaryKey())); } NKikimr::NOlap::NCosts::TKeyRanges TKeyRangesBuilder::Build() { diff --git a/ydb/core/tx/columnshard/columnshard_costs.h b/ydb/core/tx/columnshard/columnshard_costs.h index bd35ab29d1..cde8f1a220 100644 --- a/ydb/core/tx/columnshard/columnshard_costs.h +++ b/ydb/core/tx/columnshard/columnshard_costs.h @@ -95,7 +95,6 @@ private: NArrow::TRecordBatchConstructor Constructor; TVector<TMarkRangeFeatures> Features; - const TIndexInfo& IndexInfo; bool AddMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p); void AddMarkFromGranule(const TGranuleRecord& record); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index b691a939fb..9eeea89556 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -208,23 +208,12 @@ TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<ar } -TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits) - : IndexInfo(std::move(info)) - , Limits(limits) +TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits) + : Limits(limits) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { - /// @note Setting replace and sorting key to PK we are able to: - /// * apply REPLACE by MergeSort - /// * apply PK predicate before REPLACE - IndexInfo.SetAllKeys(); - MarkSchema = IndexInfo.GetEffectiveKey(); - - ui32 indexId = IndexInfo.GetId(); - GranulesTable = std::make_shared<TGranulesTable>(*this, indexId); - ColumnsTable = std::make_shared<TColumnsTable>(indexId); - CountersTable = std::make_shared<TCountersTable>(indexId); } ui64 TColumnEngineForLogs::MemoryUsage() const { @@ -352,11 +341,15 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c } void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) { - // TODO(chertus): use step/txId for keeping older schema versions for older snapshots Y_UNUSED(snapshot); + if (!GranulesTable) { + MarkSchema = info.GetEffectiveKey(); + ui32 indexId = info.GetId(); + GranulesTable = std::make_shared<TGranulesTable>(*this, indexId); + ColumnsTable = std::make_shared<TColumnsTable>(indexId); + CountersTable = std::make_shared<TCountersTable>(indexId); + } IndexInfo = std::move(info); - // copied from constructor above - IndexInfo.SetAllKeys(); } bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) { @@ -440,12 +433,13 @@ bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) { bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) { return ColumnsTable->Load(db, [&](const TColumnRecord& rec) { + auto& indexInfo = GetIndexInfo(); Y_VERIFY(rec.Valid()); // Do not count the blob as lost since it exists in the index. lostBlobs.erase(rec.BlobRange.BlobId); // Locate granule and append the record. if (const auto gi = Granules.find(rec.Granule); gi != Granules.end()) { - gi->second->Portions[rec.Portion].AddRecord(IndexInfo, rec); + gi->second->Portions[rec.Portion].AddRecord(indexInfo, rec); } else { #if 0 LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId); @@ -671,6 +665,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash ui64 dropBlobs = 0; bool allowDrop = true; + auto& indexInfo = GetIndexInfo(); for (const auto& [pathId, ttl] : pathEviction) { if (!PathGranules.contains(pathId)) { continue; // It's not an error: allow TTL over multiple shards with different pathIds presented @@ -681,7 +676,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash auto ttlColumnNames = ttl.GetTtlColumns(); Y_VERIFY(ttlColumnNames.size() == 1); // TODO: support different ttl columns - ui32 ttlColumnId = IndexInfo.GetColumnId(*ttlColumnNames.begin()); + ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin()); for (const auto& [ts, granule] : PathGranules[pathId]) { auto spg = Granules[granule]; @@ -704,7 +699,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash TString tierName; for (auto& tierRef : ttl.GetOrderedTiers()) { // TODO: lower/upper_bound + move into TEviction auto& tierInfo = tierRef.Get(); - if (!IndexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { + if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { continue; // Ignore tiers with bad ttl column } if (NArrow::ScalarLess(tierInfo.EvictScalar(schema), max)) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index d5f99fc98b..d63634bf9b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -170,9 +170,12 @@ public: EVICT }; - TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits = {}); + TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits = {}); - const TIndexInfo& GetIndexInfo() const override { return IndexInfo; } + const TIndexInfo& GetIndexInfo() const override { + Y_VERIFY(IndexInfo); + return *IndexInfo; + } const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override { if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { @@ -232,7 +235,7 @@ private: bool Empty() const noexcept { return Portions.empty(); } }; - TIndexInfo IndexInfo; + std::optional<TIndexInfo> IndexInfo; TCompactionLimits Limits; ui64 TabletId; std::shared_ptr<arrow::Schema> MarkSchema; diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 340e0c3a88..299fd63c45 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -237,22 +237,29 @@ std::shared_ptr<arrow::Schema> TIndexInfo::AddColumns( return std::make_shared<arrow::Schema>(std::move(fields)); } -std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const TVector<ui32>& columnIds) const { - return MakeArrowSchema(Columns, columnIds); +std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const TVector<ui32>& columnIds, bool withSpecials) const { + return MakeArrowSchema(Columns, columnIds, withSpecials); } -std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const TVector<TString>& names) const { +TVector<ui32> TIndexInfo::GetColumnIds(const TVector<TString>& columnNames) const { TVector<ui32> ids; - ids.reserve(names.size()); - for (auto& name : names) { - auto it = ColumnNames.find(name); - if (it == ColumnNames.end()) { + ids.reserve(columnNames.size()); + for (auto& name : columnNames) { + auto columnId = GetColumnIdOptional(name); + if (!columnId) { return {}; } - ids.emplace_back(it->second); + ids.emplace_back(*columnId); } + return ids; +} - return MakeArrowSchema(Columns, ids); +std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const TVector<TString>& names) const { + auto columnIds = GetColumnIds(names); + if (columnIds.empty()) { + return {}; + } + return MakeArrowSchema(Columns, columnIds); } std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const { @@ -260,6 +267,9 @@ std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const } void TIndexInfo::SetAllKeys() { + /// @note Setting replace and sorting key to PK we are able to: + /// * apply REPLACE by MergeSort + /// * apply PK predicate before REPLACE const auto& primaryKeyNames = NamesOnly(GetPrimaryKey()); // Update set of required columns with names from primary key. for (const auto& name: primaryKeyNames) { @@ -320,21 +330,27 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { return MinMaxIdxColumnsIds.contains(it->second); } -std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids) { +std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids, bool withSpecials) { std::vector<std::shared_ptr<arrow::Field>> fields; - fields.reserve(ids.size()); + fields.reserve(withSpecials ? ids.size() + 2 : ids.size()); + + if (withSpecials) { + // Place special fields at the beginning of the schema. + fields.push_back(arrow::field(TIndexInfo::SPEC_COL_PLAN_STEP, arrow::uint64())); + fields.push_back(arrow::field(TIndexInfo::SPEC_COL_TX_ID, arrow::uint64())); + } for (const ui32 id: ids) { auto it = columns.find(id); if (it == columns.end()) { - return {}; + continue; } const auto& column = it->second; std::string colName(column.Name.data(), column.Name.size()); fields.emplace_back(std::make_shared<arrow::Field>(colName, NArrow::GetArrowType(column.PType))); } - + return std::make_shared<arrow::Schema>(std::move(fields)); } diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 6c17099052..ec28d2dc63 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -113,12 +113,14 @@ public: Y_VERIFY(MinMaxIdxColumnsIds.contains(GetColumnId(ttlColumn))); } + TVector<ui32> GetColumnIds(const TVector<TString>& columnNames) const; + std::shared_ptr<arrow::Schema> ArrowSchema() const; std::shared_ptr<arrow::Schema> ArrowSchemaWithSpecials() const; std::shared_ptr<arrow::Schema> AddColumns(const std::shared_ptr<arrow::Schema>& schema, const TVector<TString>& columns) const; - std::shared_ptr<arrow::Schema> ArrowSchema(const TVector<ui32>& columnIds) const; + std::shared_ptr<arrow::Schema> ArrowSchema(const TVector<ui32>& columnIds, bool withSpecials = false) const; std::shared_ptr<arrow::Schema> ArrowSchema(const TVector<TString>& columnNames) const; std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const; std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const TString& metadata, @@ -160,7 +162,7 @@ private: TCompression DefaultCompression; }; -std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids); +std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids, bool withSpecials = false); /// Extracts columns with the specific ids from the schema. TVector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const TVector<ui32>& ids); diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index c371f446aa..bd956646bc 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -291,7 +291,8 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, bool Insert(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, TVector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) { - TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); + TColumnEngineForLogs engine(0, TestLimits()); + engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -328,7 +329,8 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step, const TExpected& expected) { - TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); + TColumnEngineForLogs engine(0, TestLimits()); + engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); return Compact(engine, db, snap, std::move(blobs), step, expected); @@ -410,7 +412,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load - TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0); + TColumnEngineForLogs engine(0); + engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -509,7 +512,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load - TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); + TColumnEngineForLogs engine(0, TestLimits()); + engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -579,7 +583,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // inserts ui64 planStep = 1; - TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); + TColumnEngineForLogs engine(0, TestLimits()); + engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -610,7 +615,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { UNIT_ASSERT(overload); { // check it's overloaded after reload - TColumnEngineForLogs tmpEngine(TIndexInfo(tableInfo), 0, TestLimits()); + TColumnEngineForLogs tmpEngine(0, TestLimits()); + tmpEngine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); tmpEngine.Load(db, lostBlobs); UNIT_ASSERT(tmpEngine.GetOverloadedGranules(pathId)); } @@ -641,7 +647,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } { // check it's not overloaded after reload - TColumnEngineForLogs tmpEngine(TIndexInfo(tableInfo), 0, TestLimits()); + TColumnEngineForLogs tmpEngine(0, TestLimits()); + tmpEngine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); tmpEngine.Load(db, lostBlobs); UNIT_ASSERT(!tmpEngine.GetOverloadedGranules(pathId)); } @@ -682,7 +689,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load - TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); + TColumnEngineForLogs engine(0, TestLimits()); + engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index c96c4c3d9e..f0cc7d8fcc 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -272,12 +272,11 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema) { NOlap::TSnapshot snapshot{version.Step, version.TxId}; NOlap::TIndexInfo indexInfo = ConvertSchema(schema); - + indexInfo.SetAllKeys(); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletId); - } else { - PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo)); + PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId); } + PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo)); for (auto& columnName : Ttl.TtlColumns()) { PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName); |