diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-22 16:08:15 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-22 16:25:15 +0300 |
commit | d40a08d163c0ec4b3cf546ce187466c8fc8af0b7 (patch) | |
tree | 68f5dce9cc173ee67e58f301d8457a8152161c70 | |
parent | 12d1c39c9c20ca46bd4a788422bb653ebdfdef58 (diff) | |
download | ydb-d40a08d163c0ec4b3cf546ce187466c8fc8af0b7.tar.gz |
KIKIMR-19091: remove granules. currently granule == path in shard
46 files changed, 343 insertions, 985 deletions
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h index c5f9de0fddc..6fef5323074 100644 --- a/ydb/core/formats/arrow/reader/read_filter_merger.h +++ b/ydb/core/formats/arrow/reader/read_filter_merger.h @@ -103,31 +103,6 @@ public: } }; - template <class TContainer> - class TAssociatedContainerIterator { - private: - typename TContainer::const_iterator Current; - typename TContainer::const_iterator End; - public: - TAssociatedContainerIterator(const TContainer& container) - : Current(container.begin()) - , End(container.end()) - { - } - - bool IsValid() const { - return Current != End; - } - - void Next() { - ++Current; - } - - const auto& CurrentPosition() const { - return Current->first; - } - }; - // (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf) template <class TBordersIterator> static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBorders(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, TBordersIterator& it) { @@ -174,11 +149,65 @@ public: } template <class TContainer> + class TAssociatedContainerIterator { + private: + typename TContainer::const_iterator Current; + typename TContainer::const_iterator End; + public: + TAssociatedContainerIterator(const TContainer& container) + : Current(container.begin()) + , End(container.end()) { + } + + bool IsValid() const { + return Current != End; + } + + void Next() { + ++Current; + } + + const auto& CurrentPosition() const { + return Current->first; + } + }; + + template <class TContainer> static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInAssociativeContainer(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TContainer& container) { TAssociatedContainerIterator<TContainer> it(container); return SplitByBorders(batch, columnNames, it); } + template <class TContainer> + class TSequentialContainerIterator { + private: + typename TContainer::const_iterator Current; + typename TContainer::const_iterator End; + public: + TSequentialContainerIterator(const TContainer& container) + : Current(container.begin()) + , End(container.end()) { + } + + bool IsValid() const { + return Current != End; + } + + void Next() { + ++Current; + } + + const auto& CurrentPosition() const { + return *Current; + } + }; + + template <class TContainer> + static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInSequentialContainer(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TContainer& container) { + TSequentialContainerIterator<TContainer> it(container); + return SplitByBorders(batch, columnNames, it); + } + static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition); TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound); diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index a14d2817421..365d8e6d024 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -204,8 +204,6 @@ void TColumnShard::UpdateIndexCounters() { auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats(); SetCounter(COUNTER_INDEX_TABLES, stats.Tables); - SetCounter(COUNTER_INDEX_GRANULES, stats.Granules); - SetCounter(COUNTER_INDEX_EMPTY_GRANULES, stats.EmptyGranules); SetCounter(COUNTER_INDEX_COLUMN_RECORDS, stats.ColumnRecords); SetCounter(COUNTER_INDEX_COLUMN_METADATA_BYTES, stats.ColumnMetadataBytes); SetCounter(COUNTER_INSERTED_PORTIONS, stats.GetInsertedStats().Portions); @@ -235,7 +233,6 @@ void TColumnShard::UpdateIndexCounters() { SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.GetEvictedStats().RawBytes); LOG_S_DEBUG("Index: tables " << stats.Tables - << " granules " << stats.Granules << " (empty " << stats.EmptyGranules << ")" << " inserted " << stats.GetInsertedStats().DebugString() << " compacted " << stats.GetCompactedStats().DebugString() << " s-compacted " << stats.GetSplitCompactedStats().DebugString() diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index fb7e6705b8a..d7fc92abe3b 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -41,7 +41,6 @@ void TTxInit::SetDefaults() { Self->OwnerPath.clear(); Self->AltersInFlight.clear(); Self->CommitsInFlight.clear(); - Self->TablesManager.Clear(); Self->LongTxWrites.clear(); Self->LongTxWritesByUniqueId.clear(); } @@ -131,13 +130,12 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) } { ACFL_INFO("step", "TTablesManager::Load_Start"); - TTablesManager tManagerLocal(Self->StoragesManager); - THashSet<TUnifiedBlobId> lostEvictions; - if (!tManagerLocal.InitFromDB(db, Self->TabletID())) { + TTablesManager tManagerLocal(Self->StoragesManager, Self->TabletID()); + if (!tManagerLocal.InitFromDB(db)) { ACFL_ERROR("step", "TTablesManager::InitFromDB_Fails"); return false; } - if (!tManagerLocal.LoadIndex(dbTable, lostEvictions)) { + if (!tManagerLocal.LoadIndex(dbTable)) { ACFL_ERROR("step", "TTablesManager::Load_Fails"); return false; } diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index a88639a7543..c2816e27e84 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -133,7 +133,6 @@ void TTxRead::Complete(const TActorContext& ctx) { std::static_pointer_cast<const NOlap::TReadMetadataBase>(ReadMetadata)); auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta(); - Self->IncCounter(COUNTER_READ_INDEX_GRANULES, statsDelta.Granules); Self->IncCounter(COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions); Self->IncCounter(COUNTER_READ_INDEX_BLOBS, statsDelta.Blobs); Self->IncCounter(COUNTER_READ_INDEX_ROWS, statsDelta.Rows); diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 59b6d22c169..40ad858f28d 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -959,7 +959,6 @@ void TTxScan::Complete(const TActorContext& ctx) { ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRanges); auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta(); - Self->IncCounter(COUNTER_READ_INDEX_GRANULES, statsDelta.Granules); Self->IncCounter(COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions); Self->IncCounter(COUNTER_READ_INDEX_BLOBS, statsDelta.Blobs); Self->IncCounter(COUNTER_READ_INDEX_ROWS, statsDelta.Rows); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index da061fff0c7..e30f6e32db7 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -158,7 +158,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , ProgressTxController(std::make_unique<TTxController>(*this)) , StoragesManager(std::make_shared<TStoragesManager>(*this)) , InFlightReadsTracker(StoragesManager) - , TablesManager(StoragesManager) + , TablesManager(StoragesManager, info->TabletID) , PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig())) , InsertTable(std::make_unique<NOlap::TInsertTable>()) , SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>()) diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 8959782e98b..34ffa4062a7 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -9,7 +9,7 @@ bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* ds while (!rowset.EndOfSet()) { NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty(); - portion.SetGranule(rowset.GetValue<IndexColumns::Granule>()); + portion.SetPathId(rowset.GetValue<IndexColumns::PathId>()); portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()); portion.SetPortion(rowset.GetValue<IndexColumns::Portion>()); diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 209ab2c275f..f92ecc667d4 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -6,7 +6,6 @@ #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/protos/tx_columnshard.pb.h> #include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h> -#include <ydb/core/tx/columnshard/engines/granules_table.h> #include <ydb/core/tx/columnshard/engines/columns_table.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <ydb/core/tx/columnshard/operations/write.h> @@ -32,12 +31,10 @@ struct Schema : NIceDb::Schema { using TSettings = SchemaSettings<EmptySettings>; using TInsertedData = NOlap::TInsertedData; - using TGranuleRecord = NOlap::TGranuleRecord; using TColumnRecord = NOlap::TColumnRecord; enum EIndexTables : ui32 { InsertTableId = 255, - GranulesTableId, ColumnsTableId, CountersTableId, OperationsTableId, @@ -218,22 +215,9 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion>; }; - struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> { - struct Index : Column<1, NScheme::NTypeIds::Uint32> {}; - struct PathId : Column<2, NScheme::NTypeIds::Uint64> {}; // Logical table (if many) - struct IndexKey : Column<3, NScheme::NTypeIds::String> {}; // Effective part of PK (serialized) - struct Granule : Column<4, NScheme::NTypeIds::Uint64> {}; // FK: {Index, Granule} -> TIndexColumns - struct PlanStep : Column<5, NScheme::NTypeIds::Uint64> {}; - struct TxId : Column<6, NScheme::NTypeIds::Uint64> {}; - struct Metadata : Column<7, NScheme::NTypeIds::String> {}; // NKikimrTxColumnShard.TIndexGranuleMeta - - using TKey = TableKey<Index, PathId, IndexKey>; - using TColumns = TableColumns<Index, PathId, IndexKey, Granule, PlanStep, TxId, Metadata>; - }; - struct IndexColumns : NIceDb::Schema::Table<ColumnsTableId> { struct Index : Column<1, NScheme::NTypeIds::Uint32> {}; - struct Granule : Column<2, NScheme::NTypeIds::Uint64> {}; + struct PathId : Column<2, NScheme::NTypeIds::Uint64> {}; struct ColumnIdx : Column<3, NScheme::NTypeIds::Uint32> {}; struct PlanStep : Column<4, NScheme::NTypeIds::Uint64> {}; struct TxId : Column<5, NScheme::NTypeIds::Uint64> {}; @@ -246,8 +230,8 @@ struct Schema : NIceDb::Schema { struct Offset : Column<12, NScheme::NTypeIds::Uint32> {}; struct Size : Column<13, NScheme::NTypeIds::Uint32> {}; - using TKey = TableKey<Index, Granule, ColumnIdx, PlanStep, TxId, Portion, Chunk>; - using TColumns = TableColumns<Index, Granule, ColumnIdx, PlanStep, TxId, Portion, Chunk, + using TKey = TableKey<Index, PathId, ColumnIdx, PlanStep, TxId, Portion, Chunk>; + using TColumns = TableColumns<Index, PathId, ColumnIdx, PlanStep, TxId, Portion, Chunk, XPlanStep, XTxId, Blob, Metadata, Offset, Size>; }; @@ -301,7 +285,6 @@ struct Schema : NIceDb::Schema { BlobsToKeep, BlobsToDelete, InsertTable, - IndexGranules, IndexColumns, IndexCounters, SmallBlobs, @@ -504,63 +487,6 @@ struct Schema : NIceDb::Schema { NOlap::TInsertTableAccessor& insertTable, const TInstant& loadTime); - // IndexGranules activities - - static void IndexGranules_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine, - const TGranuleRecord& row) { - TString metaStr; - const auto& indexInfo = engine.GetVersionedIndex().GetLastSchema()->GetIndexInfo(); - - NKikimrTxColumnShard::TIndexGranuleMeta meta; - Y_ABORT_UNLESS(indexInfo.GetIndexKey()); - meta.SetMarkSize(indexInfo.GetIndexKey()->num_fields()); - Y_ABORT_UNLESS(meta.SerializeToString(&metaStr)); - - db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Update( - NIceDb::TUpdate<IndexGranules::Granule>(row.Granule), - NIceDb::TUpdate<IndexGranules::PlanStep>(row.GetCreatedAt().GetPlanStep()), - NIceDb::TUpdate<IndexGranules::TxId>(row.GetCreatedAt().GetTxId()), - NIceDb::TUpdate<IndexGranules::Metadata>(metaStr) - ); - } - - static void IndexGranules_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine, - const TGranuleRecord& row) { - db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Delete(); - } - - static bool IndexGranules_Load(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine, - const std::function<void(const TGranuleRecord&)>& callback) { - auto rowset = db.Table<IndexGranules>().Prefix(index).Select(); - if (!rowset.IsReady()) - return false; - - while (!rowset.EndOfSet()) { - ui64 pathId = rowset.GetValue<IndexGranules::PathId>(); - TString indexKey = rowset.GetValue<IndexGranules::IndexKey>(); - ui64 granule = rowset.GetValue<IndexGranules::Granule>(); - ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>(); - ui64 txId = rowset.GetValue<IndexGranules::TxId>(); - TString metaStr = rowset.GetValue<IndexGranules::Metadata>(); - - std::optional<ui32> markNumKeys; - if (metaStr.size()) { - NKikimrTxColumnShard::TIndexGranuleMeta meta; - Y_ABORT_UNLESS(meta.ParseFromString(metaStr)); - if (meta.HasMarkSize()) { - markNumKeys = meta.GetMarkSize(); - } - } - - callback(TGranuleRecord(pathId, granule, NOlap::TSnapshot(planStep, txId), - engine.DeserializeMark(indexKey, markNumKeys))); - - if (!rowset.Next()) - return false; - } - return true; - } - // IndexColumns activities static void IndexColumns_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { @@ -569,7 +495,7 @@ struct Schema : NIceDb::Schema { if (proto) { *rowProto.MutablePortionMeta() = std::move(*proto); } - db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId, + db.Table<IndexColumns>().Key(index, portion.GetPathId(), row.ColumnId, portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update( NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()), NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()), @@ -581,7 +507,7 @@ struct Schema : NIceDb::Schema { } static void IndexColumns_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { - db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId, + db.Table<IndexColumns>().Key(index, portion.GetPathId(), row.ColumnId, portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete(); } diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt index bab0224d428..8cc56b5bc31 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt @@ -30,7 +30,6 @@ target_sources(columnshard-engines-changes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp ) diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt index 96d8d37c049..9c190e512ec 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt @@ -31,7 +31,6 @@ target_sources(columnshard-engines-changes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp ) diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt index 96d8d37c049..9c190e512ec 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt @@ -31,7 +31,6 @@ target_sources(columnshard-engines-changes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp ) diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt index bab0224d428..8cc56b5bc31 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt @@ -30,7 +30,6 @@ target_sources(columnshard-engines-changes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp ) diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp index 213d088db1b..e3e2a73955d 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp @@ -1,5 +1,4 @@ #include "compaction_info.h" -#include <ydb/core/tx/columnshard/engines/storage/granule.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp index 18952183ed2..542943c9ca4 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp @@ -2,6 +2,7 @@ #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> #include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> +#include <ydb/core/tx/columnshard/columnshard_schema.h> namespace NKikimr::NOlap { @@ -14,16 +15,28 @@ void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const { } } -void TCleanupColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& /*context*/) { +void TCleanupColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { self.IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size()); THashSet<TUnifiedBlobId> blobIds; + THashSet<ui64> pathIds; for (auto&& p : PortionsToDrop) { auto removing = BlobsAction.GetRemoving(p); for (auto&& r : p.Records) { removing->DeclareRemove(r.BlobRange.BlobId); } + pathIds.emplace(p.GetPathId()); self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum()); } + for (auto&& p: pathIds) { + auto itDrop = self.TablesManager.GetPathsToDrop().find(p); + if (itDrop != self.TablesManager.GetPathsToDrop().end()) { + if (!self.TablesManager.GetPrimaryIndexSafe().HasDataInPathId(p)) { + self.TablesManager.MutablePathsToDrop().erase(itDrop); + NIceDb::TNiceDb db(context.Txc.DB); + NColumnShard::Schema::EraseTableInfo(db, p); + } + } + } } bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 2335394095a..8c23b15af2a 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -1,5 +1,4 @@ #include "indexation.h" -#include "mark_granules.h" #include <ydb/core/tx/columnshard/blob_cache.h> #include <ydb/core/protos/counters_columnshard.pb.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> @@ -25,19 +24,6 @@ void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, } } -std::optional<ui64> TInsertColumnEngineChanges::AddPathIfNotExists(ui64 pathId) { - if (PathToGranule.contains(pathId)) { - return {}; - } - - Y_ABORT_UNLESS(FirstGranuleId); - ui64 granule = FirstGranuleId; - ++FirstGranuleId; - - NewGranules.emplace(granule, std::make_pair(pathId, DefaultMark)); - return granule; -} - void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); Y_ABORT_UNLESS(DataToIndex.size()); @@ -111,7 +97,6 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont Y_ABORT_UNLESS(Blobs.empty()); const std::vector<std::string> comparableColumns = resultSchema->GetIndexInfo().GetReplaceKey()->field_names(); for (auto& [pathId, batches] : pathBatches) { - auto newGranuleId = AddPathIfNotExists(pathId); NIndexedReader::TMergePartialStream stream(resultSchema->GetIndexInfo().GetReplaceKey(), resultSchema->GetIndexInfo().ArrowSchemaWithSpecials(), false); THashMap<std::string, ui64> fieldSizes; ui64 rowsCount = 0; @@ -127,37 +112,22 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont stream.SetPossibleSameVersion(true); stream.DrainAll(builder); - THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> batchChunks; - if (PathToGranule[pathId].empty()) { - AFL_VERIFY(newGranuleId); - batchChunks[*newGranuleId].emplace_back(builder.Finalize()); - } else { - auto& markers = PathToGranule[pathId]; - std::vector<std::shared_ptr<arrow::RecordBatch>> result = NIndexedReader::TSortableBatchPosition::SplitByBordersInAssociativeContainer(builder.Finalize(), comparableColumns, markers); - AFL_VERIFY(result.size() == markers.size() + 1)("result", result.size())("markers", markers.size()); - ui32 idx = 0; - for (auto&& i : markers) { - auto chunk = result[++idx]; - if (chunk) { - batchChunks[i.second].emplace_back(chunk); - } + auto itGranule = PathToGranule.find(pathId); + AFL_VERIFY(itGranule != PathToGranule.end()); + std::vector<std::shared_ptr<arrow::RecordBatch>> result = NIndexedReader::TSortableBatchPosition::SplitByBordersInSequentialContainer(builder.Finalize(), comparableColumns, itGranule->second); + for (auto&& b : result) { + if (!b) { + continue; } - if (result.front()) { - batchChunks[markers.begin()->second].emplace_back(result.front()); + if (b->num_rows() < 100) { + SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::UNCOMPRESSED)); + } else { + SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::LZ4_FRAME)); } - } - for (auto&& g : batchChunks) { - for (auto&& b : g.second) { - if (b->num_rows() < 100) { - SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::UNCOMPRESSED)); - } else { - SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::LZ4_FRAME)); - } - auto portions = MakeAppendedPortions(b, g.first, maxSnapshot, nullptr, context); - Y_ABORT_UNLESS(portions.size()); - for (auto& portion : portions) { - AppendedPortions.emplace_back(std::move(portion)); - } + auto portions = MakeAppendedPortions(b, pathId, maxSnapshot, nullptr, context); + Y_ABORT_UNLESS(portions.size()); + for (auto& portion : portions) { + AppendedPortions.emplace_back(std::move(portion)); } } } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index f28ddfdd77e..0a89d493034 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -23,7 +23,7 @@ protected: virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override; public: const TMark DefaultMark; - THashMap<ui64, std::map<NIndexedReader::TSortableBatchPosition, ui64>> PathToGranule; // pathId -> {pos, granule} + THashMap<ui64, std::vector<NIndexedReader::TSortableBatchPosition>> PathToGranule; // pathId -> positions (sorted by pk) public: TInsertColumnEngineChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& dataToIndex, const TSplitSettings& splitSettings, const TSaverContext& saverContext) : TBase(splitSettings, saverContext, StaticTypeName()) diff --git a/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp b/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp deleted file mode 100644 index 55b8534441a..00000000000 --- a/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp +++ /dev/null @@ -1,105 +0,0 @@ -#include "mark_granules.h" - -namespace NKikimr::NOlap { -TMarksGranules::TMarksGranules(std::vector<TPair>&& marks) noexcept - : Marks(std::move(marks)) { - Y_DEBUG_ABORT_UNLESS(std::is_sorted(Marks.begin(), Marks.end())); -} - -TMarksGranules::TMarksGranules(std::vector<TMark>&& points) { - std::sort(points.begin(), points.end()); - - Marks.reserve(points.size()); - - for (size_t i = 0, end = points.size(); i != end; ++i) { - Marks.emplace_back(std::move(points[i]), i + 1); - } -} - -TMarksGranules::TMarksGranules(const TSelectInfo& selectInfo) { - Marks.reserve(selectInfo.Granules.size()); - - for (const auto& rec : selectInfo.Granules) { - Marks.emplace_back(std::make_pair(rec.Mark, rec.Granule)); - } - - std::sort(Marks.begin(), Marks.end(), [](const TPair& a, const TPair& b) { - return a.first < b.first; - }); -} - -bool TMarksGranules::MakePrecedingMark(const TIndexInfo& indexInfo) { - ui64 minGranule = 0; - TMark minMark(TMark::MinBorder(indexInfo.GetIndexKey())); - if (Marks.empty()) { - Marks.emplace_back(std::move(minMark), minGranule); - return true; - } - - if (minMark < Marks[0].first) { - std::vector<TPair> copy; - copy.reserve(Marks.size() + 1); - copy.emplace_back(std::move(minMark), minGranule); - for (auto&& [mark, granule] : Marks) { - copy.emplace_back(std::move(mark), granule); - } - Marks.swap(copy); - return true; - } - return false; -} - -THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> TMarksGranules::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::map<NArrow::TReplaceKey, ui64>& granules, const TIndexInfo& indexInfo) { - Y_ABORT_UNLESS(batch); - if (batch->num_rows() == 0) { - return {}; - } - THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> out; - - if (granules.size() == 1) { - out[granules.begin()->second].emplace_back(batch); - } else { - const auto effKey = GetEffectiveKey(batch, indexInfo); - Y_ABORT_UNLESS(effKey->num_columns() && effKey->num_rows()); - - std::vector<NArrow::TRawReplaceKey> keys; - { - const auto& columns = effKey->columns(); - keys.reserve(effKey->num_rows()); - for (i64 i = 0; i < effKey->num_rows(); ++i) { - keys.emplace_back(NArrow::TRawReplaceKey(&columns, i)); - } - } - - i64 offset = 0; - auto itNext = granules.begin(); - ++itNext; - for (auto it = granules.begin(); it != granules.end() && offset < effKey->num_rows(); ++it) { - const i64 end = (itNext == granules.end()) - // Just take the number of elements in the key column for the last granule. - ? effKey->num_rows() - // Locate position of the next granule in the key. - : NArrow::TReplaceKeyHelper::LowerBound(keys, itNext->first, offset); - - if (const i64 size = end - offset) { - out[it->second].emplace_back(batch->Slice(offset, size)); - } - - offset = end; - if (itNext != granules.end()) { - ++itNext; - } - } - } - return out; -} - -std::shared_ptr<arrow::RecordBatch> TMarksGranules::GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch, const TIndexInfo& indexInfo) { - const auto& key = indexInfo.GetIndexKey(); - auto resBatch = NArrow::ExtractColumns(batch, key); - Y_VERIFY_S(resBatch, "Cannot extract effective key " << key->ToString() - << " from batch " << batch->schema()->ToString()); - return resBatch; -} - -} diff --git a/ydb/core/tx/columnshard/engines/changes/mark_granules.h b/ydb/core/tx/columnshard/engines/changes/mark_granules.h deleted file mode 100644 index b1026bf5776..00000000000 --- a/ydb/core/tx/columnshard/engines/changes/mark_granules.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once -#include "abstract/mark.h" -#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> -#include <ydb/core/tx/columnshard/engines/column_engine.h> - -namespace NKikimr::NOlap { - -class TMarksGranules { -public: - using TPair = std::pair<TMark, ui64>; - - TMarksGranules() = default; - TMarksGranules(std::vector<TPair>&& marks) noexcept; - TMarksGranules(std::vector<TMark>&& points); - TMarksGranules(const TSelectInfo& selectInfo); - - const std::vector<TPair>& GetOrderedMarks() const noexcept { - return Marks; - } - - bool Empty() const noexcept { - return Marks.empty(); - } - - bool MakePrecedingMark(const TIndexInfo& indexInfo); - - static THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::map<NArrow::TReplaceKey, ui64>& granules, - const TIndexInfo& indexInfo); - - static std::shared_ptr<arrow::RecordBatch> GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch, - const TIndexInfo& indexInfo); - -private: - std::vector<TPair> Marks; -}; - -} diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 33d8e6ba65a..5d28cc9b8ae 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -55,36 +55,7 @@ void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIn } bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) { - // Save new granules - std::map<ui64, ui64> remapGranules; - for (auto& [granule, p] : NewGranules) { - ui64 pathId = p.first; - TMark mark = p.second; - TGranuleRecord rec(pathId, granule, context.Snapshot, mark.GetBorder()); - auto oldGranuleId = self.NewGranule(rec); - if (!oldGranuleId) { - self.GranulesTable->Write(context.DB, rec); - } else { - remapGranules.emplace(rec.Granule, *oldGranuleId); - } - } // Save new portions (their column records) - - THashMap<ui64, std::shared_ptr<TGranuleMeta>> granules; - for (auto& portionInfoWithBlobs : AppendedPortions) { - auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); - Y_ABORT_UNLESS(!portionInfo.Empty()); - auto it = remapGranules.find(portionInfo.GetGranule()); - if (it != remapGranules.end()) { - granules.emplace(it->second, self.GetGranulePtrVerified(it->second)); - } else { - granules.emplace(portionInfo.GetGranule(), self.GetGranulePtrVerified(portionInfo.GetGranule())); - } - } - for (auto& [_, portionInfo] : PortionsToRemove) { - granules.emplace(portionInfo.GetGranule(), self.GetGranulePtrVerified(portionInfo.GetGranule())); - } - NJson::TJsonValue sbJson = NJson::JSON_MAP; { auto g = self.GranulesStorage->StartPackModification(); @@ -92,10 +63,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange Y_ABORT_UNLESS(!portionInfo.Empty()); Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot()); - const ui64 granule = portionInfo.GetGranule(); - const ui64 portion = portionInfo.GetPortion(); - - const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion); + const TPortionInfo& oldInfo = self.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion()); self.UpsertPortion(portionInfo, &oldInfo); @@ -106,10 +74,6 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange for (auto& portionInfoWithBlobs : AppendedPortions) { auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); Y_ABORT_UNLESS(!portionInfo.Empty()); - auto it = remapGranules.find(portionInfo.GetGranule()); - if (it != remapGranules.end()) { - portionInfo.SetGranule(it->second); - } self.UpsertPortion(portionInfo); for (auto& record : portionInfo.Records) { self.ColumnsTable->Write(context.DB, portionInfo, record); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index 912ca123e79..bfc9ec09b4a 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -46,8 +46,6 @@ public: THashMap<TPortionAddress, TPortionInfo> PortionsToRemove; std::vector<TPortionInfoWithBlobs> AppendedPortions; - THashMap<ui64, std::pair<ui64, TMark>> NewGranules; - ui64 FirstGranuleId = 0; virtual ui32 GetWritePortionsCount() const override { return AppendedPortions.size(); } diff --git a/ydb/core/tx/columnshard/engines/changes/ya.make b/ydb/core/tx/columnshard/engines/changes/ya.make index fb399e43a85..faf74a7c05b 100644 --- a/ydb/core/tx/columnshard/engines/changes/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/ya.make @@ -5,7 +5,6 @@ SRCS( ttl.cpp indexation.cpp cleanup.cpp - mark_granules.cpp with_appended.cpp general_compaction.cpp ) diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 2c856247e2a..faad27bcb60 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -1,5 +1,5 @@ #pragma once -#include "granules_table.h" +#include "db_wrapper.h" #include "portions/portion_info.h" #include "scheme/snapshot_scheme.h" #include "predicate/filter.h" @@ -21,7 +21,6 @@ class TCleanupColumnEngineChanges; struct TSelectInfo { struct TStats { - size_t Granules{}; size_t Portions{}; size_t Records{}; size_t Blobs{}; @@ -29,7 +28,6 @@ struct TSelectInfo { size_t Bytes{}; const TStats& operator += (const TStats& stats) { - Granules += stats.Granules; Portions += stats.Portions; Records += stats.Records; Blobs += stats.Blobs; @@ -39,17 +37,12 @@ struct TSelectInfo { } }; - std::vector<TGranuleRecord> Granules; // ordered by key (ascending) std::vector<std::shared_ptr<TPortionInfo>> PortionsOrderedPK; NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>> GetPortionsOrdered(const bool reverse) const { return NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>>(PortionsOrderedPK, reverse); } - NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>> GetGranulesOrdered(const bool reverse) const { - return NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>>(Granules, reverse); - } - size_t NumChunks() const { size_t records = 0; for (auto& portionInfo : PortionsOrderedPK) { @@ -60,7 +53,6 @@ struct TSelectInfo { TStats Stats() const { TStats out; - out.Granules = Granules.size(); out.Portions = PortionsOrderedPK.size(); THashSet<TUnifiedBlobId> uniqBlob; @@ -79,13 +71,6 @@ struct TSelectInfo { } friend IOutputStream& operator << (IOutputStream& out, const TSelectInfo& info) { - if (info.Granules.size()) { - out << "granules:"; - for (auto& rec : info.Granules) { - out << " " << rec; - } - out << "; "; - } if (info.PortionsOrderedPK.size()) { out << "portions:"; for (auto& portionInfo : info.PortionsOrderedPK) { @@ -170,8 +155,6 @@ public: }; i64 Tables{}; - i64 Granules{}; - i64 EmptyGranules{}; i64 ColumnRecords{}; i64 ColumnMetadataBytes{}; THashMap<TPortionMeta::EProduced, TPortionsStats> StatsByType; @@ -362,6 +345,8 @@ public: class IColumnEngine { +protected: + virtual void DoRegisterTable(const ui64 pathId) = 0; public: virtual ~IColumnEngine() = default; @@ -373,10 +358,13 @@ public: virtual TString SerializeMark(const NArrow::TReplaceKey& key) const = 0; virtual NArrow::TReplaceKey DeserializeMark(const TString& key, std::optional<ui32> markNumKeys) const = 0; - virtual bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) = 0; - + virtual bool HasDataInPathId(const ui64 pathId) const = 0; + virtual bool Load(IDbWrapper& db) = 0; + void RegisterTable(const ui64 pathId) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RegisterTable")("path_id", pathId); + return DoRegisterTable(pathId); + } virtual std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, - const THashSet<ui32>& columnIds, const TPKRangesFilter& pkRangesFilter) const = 0; virtual std::shared_ptr<TInsertColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 1cbf1132704..ccfe9715e5e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -29,7 +29,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimit ui64 TColumnEngineForLogs::MemoryUsage() const { auto numPortions = Counters.GetPortionsCount(); - return Counters.Granules * (sizeof(TGranuleMeta) + sizeof(ui64)) + + return Counters.Tables * (sizeof(TGranuleMeta) + sizeof(ui64)) + numPortions * (sizeof(TPortionInfo) + sizeof(ui64)) + Counters.ColumnRecords * sizeof(TColumnRecord) + Counters.ColumnMetadataBytes; @@ -40,9 +40,7 @@ const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& TColumnEngineForLogs::Get } const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() { - Counters.Tables = PathGranules.size(); - Counters.Granules = Granules.size(); - Counters.EmptyGranules = EmptyGranules.size(); + Counters.Tables = Tables.size(); return Counters; } @@ -51,10 +49,7 @@ void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, E const TPortionInfo* exPortionInfo) { UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo); - const ui64 granule = portionInfo.GetGranule(); - Y_ABORT_UNLESS(granule); - Y_ABORT_UNLESS(Granules.contains(granule)); - ui64 pathId = Granules[granule]->PathId(); + const ui64 pathId = portionInfo.GetGranule(); Y_ABORT_UNLESS(pathId); if (!PathStats.contains(pathId)) { auto& stats = PathStats[pathId]; @@ -132,24 +127,21 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c } void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) { - if (!GranulesTable) { + if (!ColumnsTable) { ui32 indexId = info.GetId(); - GranulesTable = std::make_shared<TGranulesTable>(*this, indexId); ColumnsTable = std::make_shared<TColumnsTable>(indexId); CountersTable = std::make_shared<TCountersTable>(indexId); } VersionedIndex.AddIndex(snapshot, std::move(info)); } -bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) { +bool TColumnEngineForLogs::Load(IDbWrapper& db) { Y_ABORT_UNLESS(!Loaded); Loaded = true; + THashMap<ui64, ui64> granuleToPathIdDecoder; { auto guard = GranulesStorage->StartPackModification(); - if (!LoadGranules(db)) { - return false; - } - if (!LoadColumns(db, lostBlobs)) { + if (!LoadColumns(db)) { return false; } if (!LoadCounters(db)) { @@ -157,12 +149,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl } } - THashSet<ui64> emptyGranulePaths; - for (const auto& [granule, spg] : Granules) { - if (spg->Empty()) { - EmptyGranules.insert(granule); - emptyGranulePaths.insert(spg->PathId()); - } + for (const auto& [pathId, spg] : Tables) { for (const auto& [_, portionInfo] : spg->GetPortions()) { UpdatePortionStats(*portionInfo, EStatsUpdateType::ADD); if (portionInfo->CheckForCleanup()) { @@ -171,40 +158,12 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl } } - // Cleanup empty granules - for (auto& pathId : emptyGranulePaths) { - for (auto& emptyGranules : EmptyGranuleTracks(pathId)) { - // keep first one => merge, keep nothing => drop. - bool keepFirst = !pathsToDrop.contains(pathId); - for (auto& [mark, granule] : emptyGranules) { - if (keepFirst) { - keepFirst = false; - continue; - } - - Y_ABORT_UNLESS(Granules.contains(granule)); - auto spg = Granules[granule]; - Y_ABORT_UNLESS(spg); - GranulesTable->Erase(db, spg->Record); - EraseGranule(pathId, granule, mark); - } - } - } - Y_ABORT_UNLESS(!(LastPortion >> 63), "near to int overflow"); Y_ABORT_UNLESS(!(LastGranule >> 63), "near to int overflow"); return true; } -bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) { - auto callback = [&](const TGranuleRecord& rec) { - SetGranule(rec); - }; - - return GranulesTable->Load(db, callback); -} - -bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) { +bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { TSnapshot lastSnapshot(0, 0); const TIndexInfo* currentIndexInfo = nullptr; auto result = ColumnsTable->Load(db, [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { @@ -213,13 +172,11 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lastSnapshot = portion.GetMinSnapshot(); } Y_ABORT_UNLESS(portion.ValidSnapshotInfo()); - // Do not count the blob as lost since it exists in the index. - lostBlobs.erase(loadContext.GetBlobRange().BlobId); // Locate granule and append the record. TColumnRecord rec(loadContext, *currentIndexInfo); - GetGranulePtrVerified(portion.GetGranule())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta()); + GetGranulePtrVerified(portion.GetPathId())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta()); }); - for (auto&& i : Granules) { + for (auto&& i : Tables) { i.second->OnAfterPortionsLoad(); } return result; @@ -252,7 +209,6 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st TSaverContext saverContext(StoragesManager->GetInsertOperator(), StoragesManager); auto changes = std::make_shared<TInsertColumnEngineChanges>(DefaultMark(), std::move(dataToIndex), TSplitSettings(), saverContext); - ui32 reserveGranules = 0; auto pkSchema = VersionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey(); for (const auto& data : changes->GetDataToIndex()) { @@ -261,32 +217,14 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st if (changes->PathToGranule.contains(pathId)) { continue; } - - if (PathGranules.contains(pathId)) { - const auto& src = PathGranules[pathId]; - for (auto&& i : src) { - NIndexedReader::TSortableBatchPosition pos(i.first.GetBorder().ToBatch(pkSchema), 0, pkSchema->field_names(), {}, false); - changes->PathToGranule[pathId].emplace(pos, i.second); - for (auto&& pos : GetGranulePtrVerified(i.second)->GetBucketPositions()) { - changes->PathToGranule[pathId].emplace(pos, i.second); - } - } - } else { - // It could reserve more than needed in case of the same pathId in DataToIndex - ++reserveGranules; - } - } - - if (reserveGranules) { - changes->FirstGranuleId = LastGranule + 1; - LastGranule += reserveGranules; + changes->PathToGranule[pathId] = GetGranulePtrVerified(pathId)->GetBucketPositions(); } return changes; } std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept { - auto granule = GranulesStorage->GetGranuleForCompaction(Granules); + auto granule = GranulesStorage->GetGranuleForCompaction(Tables); if (!granule) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction"); return nullptr; @@ -310,30 +248,21 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( THashSet<ui64> dropPortions; THashSet<ui64> emptyPaths; for (ui64 pathId : pathsToDrop) { - if (!PathGranules.contains(pathId)) { + auto itTable = Tables.find(pathId); + if (itTable == Tables.end()) { emptyPaths.insert(pathId); continue; } - for (const auto& [_, granule]: PathGranules[pathId]) { - Y_ABORT_UNLESS(Granules.contains(granule)); - auto spg = Granules[granule]; - Y_ABORT_UNLESS(spg); - for (auto& [portion, info] : spg->GetPortions()) { - affectedRecords += info->NumChunks(); - changes->PortionsToDrop.push_back(*info); - dropPortions.insert(portion); - } + for (auto& [portion, info] : itTable->second->GetPortions()) { + affectedRecords += info->NumChunks(); + changes->PortionsToDrop.push_back(*info); + dropPortions.insert(portion); if (affectedRecords > maxRecords) { break; } } - - if (affectedRecords > maxRecords) { - changes->NeedRepeat = true; - break; - } } for (ui64 pathId : emptyPaths) { pathsToDrop.erase(pathId); @@ -375,8 +304,8 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering Y_ABORT_UNLESS(context.Changes->Tiering.emplace(pathId, ttl).second); TDuration dWaiting = NYDBTest::TControllers::GetColumnShardController()->GetTTLDefaultWaitingDuration(TDuration::Minutes(1)); - auto itGranules = PathGranules.find(pathId); - if (itGranules == PathGranules.end()) { + auto itTable = Tables.find(pathId); + if (itTable == Tables.end()) { return dWaiting; } @@ -388,85 +317,79 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering auto ttlColumnNames = ttl.GetTtlColumns(); Y_ABORT_UNLESS(ttlColumnNames.size() == 1); // TODO: support different ttl columns ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin()); - for (const auto& [ts, granule] : itGranules->second) { - auto itGranule = Granules.find(granule); - auto spg = itGranule->second; - Y_ABORT_UNLESS(spg); + for (auto& [portion, info] : itTable->second->GetPortions()) { + if (info->HasRemoveSnapshot()) { + continue; + } + if (context.BusyPortions.contains(info->GetAddress())) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip ttl through busy portion")("portion_id", info->GetAddress().DebugString()); + continue; + } - for (auto& [portion, info] : spg->GetPortions()) { - if (info->HasRemoveSnapshot()) { - continue; - } - if (context.BusyPortions.contains(info->GetAddress())) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip ttl through busy portion")("portion_id", info->GetAddress().DebugString()); - continue; + context.AllowEviction = (evictionSize <= context.MaxEvictBytes); + context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE); + const bool tryEvictPortion = context.AllowEviction && ttl.HasTiers(); + + if (auto max = info->MaxValue(ttlColumnId)) { + bool keep = !expireTimestampOpt; + if (expireTimestampOpt) { + auto mpiOpt = ttl.Ttl->ScalarToInstant(max); + Y_ABORT_UNLESS(mpiOpt); + const TInstant maxTtlPortionInstant = *mpiOpt; + const TDuration d = maxTtlPortionInstant - *expireTimestampOpt; + keep = !!d; + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds()); + if (d && dWaiting > d) { + dWaiting = d; + } } - context.AllowEviction = (evictionSize <= context.MaxEvictBytes); - context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE); - const bool tryEvictPortion = context.AllowEviction && ttl.HasTiers(); - - if (auto max = info->MaxValue(ttlColumnId)) { - bool keep = !expireTimestampOpt; - if (expireTimestampOpt) { - auto mpiOpt = ttl.Ttl->ScalarToInstant(max); - Y_ABORT_UNLESS(mpiOpt); - const TInstant maxTtlPortionInstant = *mpiOpt; - const TDuration d = maxTtlPortionInstant - *expireTimestampOpt; - keep = !!d; - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds()); - if (d && dWaiting > d) { - dWaiting = d; + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop); + if (keep && tryEvictPortion) { + const TString currentTierName = info->GetMeta().GetTierName() ? info->GetMeta().GetTierName() : IStoragesManager::DefaultStorageId; + TString tierName = ""; + for (auto& tierRef : ttl.GetOrderedTiers()) { + auto& tierInfo = tierRef.Get(); + if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { + SignalCounters.OnPortionNoTtlColumn(info->BlobsBytes()); + continue; } - } - - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop); - if (keep && tryEvictPortion) { - const TString currentTierName = info->GetMeta().GetTierName() ? info->GetMeta().GetTierName() : IStoragesManager::DefaultStorageId; - TString tierName = ""; - for (auto& tierRef : ttl.GetOrderedTiers()) { - auto& tierInfo = tierRef.Get(); - if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { - SignalCounters.OnPortionNoTtlColumn(info->BlobsBytes()); - continue; - } - auto mpiOpt = tierInfo.ScalarToInstant(max); - Y_ABORT_UNLESS(mpiOpt); - const TInstant maxTieringPortionInstant = *mpiOpt; - - const TDuration d = tierInfo.GetEvictInstant(context.Now) - maxTieringPortionInstant; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering_choosing")("max", maxTieringPortionInstant.Seconds()) - ("evict", tierInfo.GetEvictInstant(context.Now).Seconds())("tier_name", tierInfo.GetName())("d", d); - if (d) { - tierName = tierInfo.GetName(); - break; - } else { - auto dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(context.Now); - if (dWaiting > dWaitLocal) { - dWaiting = dWaitLocal; - } + auto mpiOpt = tierInfo.ScalarToInstant(max); + Y_ABORT_UNLESS(mpiOpt); + const TInstant maxTieringPortionInstant = *mpiOpt; + + const TDuration d = tierInfo.GetEvictInstant(context.Now) - maxTieringPortionInstant; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering_choosing")("max", maxTieringPortionInstant.Seconds()) + ("evict", tierInfo.GetEvictInstant(context.Now).Seconds())("tier_name", tierInfo.GetName())("d", d); + if (d) { + tierName = tierInfo.GetName(); + break; + } else { + auto dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(context.Now); + if (dWaiting > dWaitLocal) { + dWaiting = dWaitLocal; } } - if (!tierName) { - tierName = IStoragesManager::DefaultStorageId; - } - if (currentTierName != tierName) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", currentTierName)("to", tierName); - evictionSize += info->BlobsSizes().first; - context.Changes->AddPortionToEvict(*info, TPortionEvictionFeatures(tierName, pathId, StoragesManager->GetOperator(tierName))); - SignalCounters.OnPortionToEvict(info->BlobsBytes()); - } } - if (!keep && context.AllowDrop) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString()); - dropBlobs += info->NumBlobs(); - AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second); - SignalCounters.OnPortionToDrop(info->BlobsBytes()); + if (!tierName) { + tierName = IStoragesManager::DefaultStorageId; + } + if (currentTierName != tierName) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", currentTierName)("to", tierName); + evictionSize += info->BlobsSizes().first; + context.Changes->AddPortionToEvict(*info, TPortionEvictionFeatures(tierName, pathId, StoragesManager->GetOperator(tierName))); + SignalCounters.OnPortionToEvict(info->BlobsBytes()); } - } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); - SignalCounters.OnPortionNoBorder(info->BlobsBytes()); } + if (!keep && context.AllowDrop) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString()); + dropBlobs += info->NumBlobs(); + AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second); + SignalCounters.OnPortionToDrop(info->BlobsBytes()); + } + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); + SignalCounters.OnPortionNoBorder(info->BlobsBytes()); } } if (dWaiting > TDuration::MilliSeconds(500) && (!context.AllowEviction || !context.AllowDrop)) { @@ -537,31 +460,6 @@ std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const TH return changes; } -std::vector<std::vector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks(ui64 pathId) const { - Y_ABORT_UNLESS(PathGranules.contains(pathId)); - const auto& pathGranules = PathGranules.find(pathId)->second; - - std::vector<std::vector<std::pair<TMark, ui64>>> emptyGranules; - ui64 emptyStart = 0; - for (const auto& [mark, granule]: pathGranules) { - Y_ABORT_UNLESS(Granules.contains(granule)); - auto spg = Granules.find(granule)->second; - Y_ABORT_UNLESS(spg); - - if (spg->Empty()) { - if (!emptyStart) { - emptyGranules.push_back({}); - emptyStart = granule; - } - emptyGranules.back().emplace_back(mark, granule); - } else if (emptyStart) { - emptyStart = 0; - } - } - - return emptyGranules; -} - bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept { { TFinalizationContext context(LastGranule, LastPortion, snapshot); @@ -582,37 +480,11 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE return true; } -void TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_granule")("granule", rec.DebugString()); - const TMark mark(rec.Mark); - - AFL_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second)("event", "marker_duplication")("granule_id", rec.Granule)("old_granule_id", PathGranules[rec.PathId][mark]); - AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second)("event", "granule_duplication") - ("rec_path_id", rec.PathId)("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString()); -} - -std::optional<ui64> TColumnEngineForLogs::NewGranule(const TGranuleRecord& rec) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_granule")("granule", rec.DebugString()); - const TMark mark(rec.Mark); - - auto insertInfo = PathGranules[rec.PathId].emplace(mark, rec.Granule); - if (insertInfo.second) { - AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second)("event", "granule_duplication") - ("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString()); - return {}; - } else { - return insertInfo.first->second; - } -} - -void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) { - Y_ABORT_UNLESS(PathGranules.contains(pathId)); - auto it = Granules.find(granule); - Y_ABORT_UNLESS(it != Granules.end()); +void TColumnEngineForLogs::EraseTable(const ui64 pathId) { + auto it = Tables.find(pathId); + Y_ABORT_UNLESS(it != Tables.end()); Y_ABORT_UNLESS(it->second->IsErasable()); - Granules.erase(it); - EmptyGranules.erase(granule); - PathGranules[pathId].erase(mark); + Tables.erase(it); } void TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo) { @@ -622,15 +494,13 @@ void TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, const UpdatePortionStats(portionInfo, EStatsUpdateType::ADD); } - GetGranulePtrVerified(portionInfo.GetGranule())->UpsertPortion(portionInfo); + GetGranulePtrVerified(portionInfo.GetPathId())->UpsertPortion(portionInfo); } bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool updateStats) { Y_ABORT_UNLESS(!portionInfo.Empty()); const ui64 portion = portionInfo.GetPortion(); - auto it = Granules.find(portionInfo.GetGranule()); - Y_ABORT_UNLESS(it != Granules.end()); - auto& spg = it->second; + auto spg = GetGranulePtrVerified(portionInfo.GetPathId()); Y_ABORT_UNLESS(spg); auto p = spg->GetPortionPtr(portion); @@ -647,80 +517,28 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up } std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot snapshot, - const THashSet<ui32>& /*columnIds*/, - const TPKRangesFilter& pkRangesFilter) const -{ + const TPKRangesFilter& pkRangesFilter) const { auto out = std::make_shared<TSelectInfo>(); - if (!PathGranules.contains(pathId)) { + auto itTable = Tables.find(pathId); + if (itTable == Tables.end()) { return out; } - auto& pathGranules = PathGranules.find(pathId)->second; - if (pathGranules.empty()) { - return out; - } - out->Granules.reserve(pathGranules.size()); - std::optional<TMarksMap::const_iterator> previousIterator; - - for (auto&& filter : pkRangesFilter) { - std::optional<NArrow::TReplaceKey> indexKeyFrom = filter.KeyFrom(GetIndexKey()); - std::optional<NArrow::TReplaceKey> indexKeyTo = filter.KeyTo(GetIndexKey()); - - std::shared_ptr<arrow::Scalar> keyFrom; - std::shared_ptr<arrow::Scalar> keyTo; - if (indexKeyFrom) { - keyFrom = NArrow::TReplaceKey::ToScalar(*indexKeyFrom, 0); - } - if (indexKeyTo) { - keyTo = NArrow::TReplaceKey::ToScalar(*indexKeyTo, 0); - } - - auto it = pathGranules.begin(); - if (keyFrom) { - it = pathGranules.lower_bound(*keyFrom); - if (it != pathGranules.begin()) { - --it; - } - } - - if (previousIterator && (previousIterator == pathGranules.end() || it->first < (*previousIterator)->first)) { - it = *previousIterator; - } - for (; it != pathGranules.end(); ++it) { - auto& mark = it->first; - ui64 granule = it->second; - if (keyTo && mark > *keyTo) { - break; - } - - auto it = Granules.find(granule); - Y_ABORT_UNLESS(it != Granules.end()); - auto& spg = it->second; - Y_ABORT_UNLESS(spg); - bool granuleHasDataForSnaphsot = false; - - for (const auto& [_, keyPortions] : spg->GroupOrderedPortionsByPK()) { - for (auto&& [_, portionInfo] : keyPortions) { - if (!portionInfo->IsVisible(snapshot)) { - continue; - } - Y_ABORT_UNLESS(portionInfo->Produced()); - const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo()); - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected") - ("granule", granule)("portion", portionInfo->DebugString()); - if (skipPortion) { - continue; - } - out->PortionsOrderedPK.emplace_back(portionInfo); - granuleHasDataForSnaphsot = true; - } + auto spg = itTable->second; + for (const auto& [indexKey, keyPortions] : spg->GroupOrderedPortionsByPK()) { + for (auto&& [_, portionInfo] : keyPortions) { + if (!portionInfo->IsVisible(snapshot)) { + continue; } - - if (granuleHasDataForSnaphsot) { - out->Granules.push_back(spg->Record); + Y_ABORT_UNLESS(portionInfo->Produced()); + const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected") + ("pathId", pathId)("portion", portionInfo->DebugString()); + if (skipPortion) { + continue; } + out->PortionsOrderedPK.emplace_back(portionInfo); } - previousIterator = it; } return out; @@ -739,6 +557,10 @@ void TColumnEngineForLogs::OnTieringModified(std::shared_ptr<NColumnShard::TTier } +void TColumnEngineForLogs::DoRegisterTable(const ui64 pathId) { + AFL_VERIFY(Tables.emplace(pathId, std::make_shared<TGranuleMeta>(pathId, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second); +} + TColumnEngineForLogs::TTieringProcessContext::TTieringProcessContext(const ui64 maxEvictBytes, std::shared_ptr<TTTLColumnEngineChanges> changes, const THashSet<TPortionAddress>& busyPortions) : Now(TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now()) , MaxEvictBytes(maxEvictBytes) diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 53a2dbbf8f8..fff06d8a05d 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -143,8 +143,9 @@ public: ui64 MemoryUsage() const override; TSnapshot LastUpdate() const override { return LastSnapshot; } + virtual void DoRegisterTable(const ui64 pathId) override; public: - bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override; + bool Load(IDbWrapper& db) override; std::shared_ptr<TInsertColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept override; std::shared_ptr<TColumnEngineChanges> StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept override; @@ -160,36 +161,46 @@ public: std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, - const THashSet<ui32>& columnIds, const TPKRangesFilter& pkRangesFilter) const override; - bool IsPortionExists(const ui64 granuleId, const ui64 portionId) const { - auto it = Granules.find(granuleId); - if (it == Granules.end()) { + bool IsPortionExists(const ui64 pathId, const ui64 portionId) const { + auto it = Tables.find(pathId); + if (it == Tables.end()) { return false; } return !!it->second->GetPortionPtr(portionId); } - bool IsGranuleExists(const ui64 granuleId) const { - return !!GetGranuleOptional(granuleId); + virtual bool HasDataInPathId(const ui64 pathId) const override { + auto g = GetGranuleOptional(pathId); + if (!g) { + return false; + } + if (g->GetPortions().size()) { + return false; + } + return true; + } + + bool IsGranuleExists(const ui64 pathId) const { + return !!GetGranuleOptional(pathId); } const TGranuleMeta& GetGranuleVerified(const ui64 granuleId) const { - auto it = Granules.find(granuleId); - AFL_VERIFY(it != Granules.end())("granule_id", granuleId)("count", Granules.size()); + auto it = Tables.find(granuleId); + AFL_VERIFY(it != Tables.end())("granule_id", granuleId)("count", Tables.size()); return *it->second; } - std::shared_ptr<TGranuleMeta> GetGranulePtrVerified(const ui64 granuleId) const { - auto result = GetGranuleOptional(granuleId); - Y_ABORT_UNLESS(result); + std::shared_ptr<TGranuleMeta> GetGranulePtrVerified(const ui64 pathId) const { + auto result = GetGranuleOptional(pathId); + AFL_VERIFY(result)("path_id", pathId); return result; } - std::shared_ptr<TGranuleMeta> GetGranuleOptional(const ui64 granuleId) const { - auto it = Granules.find(granuleId); - if (it == Granules.end()) { + std::shared_ptr<TGranuleMeta> GetGranuleOptional(const ui64 pathId) const { + auto it = Tables.find(pathId); + if (it == Tables.end()) { return nullptr; } return it->second; @@ -204,15 +215,10 @@ private: TVersionedIndex VersionedIndex; ui64 TabletId; - std::shared_ptr<TGranulesTable> GranulesTable; std::shared_ptr<TColumnsTable> ColumnsTable; std::shared_ptr<TCountersTable> CountersTable; - THashMap<ui64, std::shared_ptr<TGranuleMeta>> Granules; // granule -> meta - THashMap<ui64, TMarksMap> PathGranules; // path_id -> {mark, granule} + THashMap<ui64, std::shared_ptr<TGranuleMeta>> Tables; // pathId into Granule that equal to Table TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id - /// Set of empty granules. - /// Just for providing count of empty granules. - THashSet<ui64> EmptyGranules; std::map<TSnapshot, std::vector<TPortionInfo>> CleanupPortions; TColumnEngineStats Counters; ui64 LastPortion; @@ -232,15 +238,11 @@ private: return *CachedDefaultMark; } - bool LoadGranules(IDbWrapper& db); - bool LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs); + bool LoadColumns(IDbWrapper& db); bool LoadCounters(IDbWrapper& db); - void EraseGranule(ui64 pathId, ui64 granule, const TMark& mark); + void EraseTable(const ui64 pathId); - /// Insert granule or check if same granule was already inserted. - void SetGranule(const TGranuleRecord& rec); - std::optional<ui64> NewGranule(const TGranuleRecord& rec); void UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo = nullptr); bool ErasePortion(const TPortionInfo& portionInfo, bool updateStats = true); void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT, diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index 4bef7aa1408..ff825546b15 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -40,21 +40,6 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable, return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, insertTable, loadTime); } -void TDbWrapper::WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) { - NIceDb::TNiceDb db(Database); - NColumnShard::Schema::IndexGranules_Write(db, index, engine, row); -} - -void TDbWrapper::EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) { - NIceDb::TNiceDb db(Database); - NColumnShard::Schema::IndexGranules_Erase(db, index, engine, row); -} - -bool TDbWrapper::LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) { - NIceDb::TNiceDb db(Database); - return NColumnShard::Schema::IndexGranules_Load(db, index, engine, callback); -} - void TDbWrapper::WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { NIceDb::TNiceDb db(Database); NColumnShard::Schema::IndexColumns_Write(db, index, portion, row); diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index 26640f5bf11..c44bc713001 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -29,10 +29,6 @@ public: virtual bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) = 0; - virtual void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0; - virtual void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0; - virtual bool LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) = 0; - virtual void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0; virtual void EraseColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0; virtual bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0; @@ -58,10 +54,6 @@ public: bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) override; - void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override; - void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override; - bool LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) override; - void WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; void EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; bool LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override; diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h deleted file mode 100644 index 249ee1d7c99..00000000000 --- a/ydb/core/tx/columnshard/engines/granules_table.h +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once -#include "db_wrapper.h" -#include <ydb/core/formats/arrow/replace_key.h> - -namespace NKikimr::NOlap { - -struct TGranuleRecord { -private: - TSnapshot CreatedAt; -public: - ui64 PathId; - ui64 Granule; - NArrow::TStoreReplaceKey Mark; - - TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const NArrow::TReplaceKey& mark) - : CreatedAt(createdAt) - , PathId(pathId) - , Granule(granule) - , Mark(mark) - { - Y_ABORT_UNLESS(Mark.Size()); - } - - const TSnapshot& GetCreatedAt() const { - return CreatedAt; - } - - bool operator == (const TGranuleRecord& rec) const { - return (PathId == rec.PathId) && (Mark == rec.Mark); - } - - friend IOutputStream& operator << (IOutputStream& out, const TGranuleRecord& rec) { - out << '{'; - auto& snap = rec.CreatedAt; - out << rec.PathId << '#' << rec.Granule << ' ' - << snap; - out << '}'; - return out; - } - - TString DebugString() const { - return TStringBuilder() << *this; - } -}; - -class TGranulesTable { -public: - TGranulesTable(const IColumnEngine& engine, ui32 indexId) - : Engine(engine) - , IndexId(indexId) - {} - - void Write(IDbWrapper& db, const TGranuleRecord& row) { - db.WriteGranule(IndexId, Engine, row); - } - - void Erase(IDbWrapper& db, const TGranuleRecord& row) { - db.EraseGranule(IndexId, Engine, row); - } - - bool Load(IDbWrapper& db, const std::function<void(const TGranuleRecord&)>& callback) { - return db.LoadGranules(IndexId, Engine, callback); - } - -private: - const IColumnEngine& Engine; - ui32 IndexId; -}; - -} diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index b197f226e20..1b7f48b47e3 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -71,7 +71,7 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { } TPortionInfo TPortionInfo::CopyWithFilteredColumns(const THashSet<ui32>& columnIds) const { - TPortionInfo result(Granule, Portion, GetMinSnapshot(), BlobsOperator); + TPortionInfo result(PathId, Portion, GetMinSnapshot(), BlobsOperator); result.Meta = Meta; result.Records.reserve(columnIds.size()); @@ -112,7 +112,7 @@ int TPortionInfo::CompareMinByPk(const TPortionInfo& item, const TIndexInfo& inf TString TPortionInfo::DebugString(const bool withDetails) const { TStringBuilder sb; sb << "(portion_id:" << Portion << ";" << - "granule_id:" << Granule << ";records_count:" << NumRows() << ";" + "path_id:" << PathId << ";records_count:" << NumRows() << ";" "min_schema_snapshot:(" << MinSnapshot.DebugString() << ");"; if (withDetails) { sb << @@ -184,7 +184,7 @@ size_t TPortionInfo::NumBlobs() const { } bool TPortionInfo::IsEqualWithSnapshots(const TPortionInfo& item) const { - return Granule == item.Granule && MinSnapshot == item.MinSnapshot + return PathId == item.PathId && MinSnapshot == item.MinSnapshot && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot; } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 87a9af9f7cf..b407c9b7183 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -15,8 +15,8 @@ struct TIndexInfo; class TPortionInfo { private: TPortionInfo() = default; - ui64 Granule = 0; - ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in granule + ui64 PathId = 0; + ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in pathId TSnapshot MinSnapshot = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion} TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one) @@ -24,6 +24,10 @@ private: TPortionMeta Meta; std::shared_ptr<NOlap::IBlobsStorageOperator> BlobsOperator; public: + ui64 GetPathId() const { + return PathId; + } + bool OlderThen(const TPortionInfo& info) const { return RecordSnapshotMin() < info.RecordSnapshotMin(); } @@ -123,8 +127,8 @@ public: bool Empty() const { return Records.empty(); } bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; } - bool Valid() const { return MinSnapshot.Valid() && Granule && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } - bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && Granule && Portion; } + bool Valid() const { return MinSnapshot.Valid() && PathId && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } + bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && PathId && Portion; } bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; } bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; } bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ } @@ -140,8 +144,8 @@ public: return TPortionInfo(); } - TPortionInfo(const ui64 granuleId, const ui64 portionId, const TSnapshot& minSnapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& blobsOperator) - : Granule(granuleId) + TPortionInfo(const ui64 pathId, const ui64 portionId, const TSnapshot& minSnapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& blobsOperator) + : PathId(pathId) , Portion(portionId) , MinSnapshot(minSnapshot) , BlobsOperator(blobsOperator) @@ -176,15 +180,15 @@ public: } ui64 GetGranule() const { - return Granule; + return PathId; } TPortionAddress GetAddress() const { - return TPortionAddress(Granule, Portion); + return TPortionAddress(PathId, Portion); } - void SetGranule(const ui64 granule) { - Granule = granule; + void SetPathId(const ui64 pathId) { + PathId = pathId; } void SetPortion(const ui64 portion) { diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index 3f60021c20a..d7778fbc5f9 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -33,7 +33,6 @@ target_link_libraries(columnshard-engines-reader PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index 27028241125..6b2ab750e53 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -34,7 +34,6 @@ target_link_libraries(columnshard-engines-reader PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index 27028241125..6b2ab750e53 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -34,7 +34,6 @@ target_link_libraries(columnshard-engines-reader PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index 3f60021c20a..d7778fbc5f9 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -33,7 +33,6 @@ target_link_libraries(columnshard-engines-reader PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/common.cpp b/ydb/core/tx/columnshard/engines/reader/common.cpp deleted file mode 100644 index 46168595a26..00000000000 --- a/ydb/core/tx/columnshard/engines/reader/common.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include "common.h" -#include <util/string/builder.h> - -namespace NKikimr::NOlap::NIndexedReader { - -TString TBatchAddress::ToString() const { - return TStringBuilder() << GranuleId << "," << BatchGranuleIdx; -} - -TBatchAddress::TBatchAddress(const ui32 granuleId, const ui32 batchGranuleIdx) - : GranuleId(granuleId) - , BatchGranuleIdx(batchGranuleIdx) -{ - -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/common.h b/ydb/core/tx/columnshard/engines/reader/common.h deleted file mode 100644 index 30ad233b2e2..00000000000 --- a/ydb/core/tx/columnshard/engines/reader/common.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once -#include <ydb/library/accessor/accessor.h> -#include <util/system/types.h> -#include <util/generic/string.h> - -namespace NKikimr::NOlap::NIndexedReader { - -class TBatchAddress { -private: - ui32 GranuleId = 0; - ui32 BatchGranuleIdx = 0; -public: - TString ToString() const; - - TBatchAddress(const ui32 granuleId, const ui32 batchGranuleIdx); - - ui32 GetGranuleId() const { - return GranuleId; - } - - ui32 GetBatchGranuleIdx() const { - return BatchGranuleIdx; - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index df6f8e6ad35..df02e8933ef 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -13,13 +13,12 @@ TDataStorageAccessor::TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertT , Index(index) {} -std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const { +std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& /*columnIds*/) const { if (readDescription.ReadNothing) { return std::make_shared<NOlap::TSelectInfo>(); } return Index->Select(readDescription.PathId, readDescription.GetSnapshot(), - columnIds, readDescription.PKRangesFilter); } diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make index 2ca09c00d53..f673de8200e 100644 --- a/ydb/core/tx/columnshard/engines/reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/ya.make @@ -1,7 +1,6 @@ LIBRARY() SRCS( - common.cpp conveyor_task.cpp description.cpp queue.cpp diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 843c3a098da..12dbd0a9919 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -17,9 +17,9 @@ ui64 TGranuleMeta::Size() const { } void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("granule", GetGranuleId()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("path_id", GetPathId()); auto it = Portions.find(info.GetPortion()); - AFL_VERIFY(info.GetGranule() == GetGranuleId())("event", "incompatible_granule")("portion", info.DebugString())("granule", GetGranuleId()); + AFL_VERIFY(info.GetGranule() == GetPathId())("event", "incompatible_granule")("portion", info.DebugString())("path_id", GetPathId()); AFL_VERIFY(info.Valid())("event", "invalid_portion")("portion", info.DebugString()); AFL_VERIFY(info.ValidSnapshotInfo())("event", "incorrect_portion_snapshots")("portion", info.DebugString()); @@ -41,10 +41,10 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { bool TGranuleMeta::ErasePortion(const ui64 portion) { auto it = Portions.find(portion); if (it == Portions.end()) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased_already")("portion_id", portion)("pathId", Record.PathId); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased_already")("portion_id", portion)("pathId", PathId); return false; } else { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second->DebugString())("pathId", Record.PathId); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second->DebugString())("pathId", PathId); } OnBeforeChangePortion(it->second); Portions.erase(it); @@ -160,14 +160,14 @@ const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary( return *AdditiveSummaryCache; } -TGranuleMeta::TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex) - : Owner(owner) +TGranuleMeta::TGranuleMeta(const ui64 pathId, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex) + : PathId(pathId) + , Owner(owner) , Counters(counters) , PortionInfoGuard(Owner->GetCounters().BuildPortionBlobsGuard()) - , Record(rec) { Y_ABORT_UNLESS(Owner); - OptimizerPlanner = std::make_shared<NStorageOptimizer::NLevels::TLevelsOptimizerPlanner>(rec.Granule, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey()); + OptimizerPlanner = std::make_shared<NStorageOptimizer::NLevels::TLevelsOptimizerPlanner>(PathId, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey()); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 550d9ea0daf..d7ec376cc9a 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -164,6 +164,7 @@ private: std::set<EActivity> Activity; mutable bool AllowInsertionFlag = false; + const ui64 PathId; std::shared_ptr<TGranulesStorage> Owner; const NColumnShard::TGranuleDataCounters Counters; NColumnShard::TEngineLogsCounters::TPortionsInfoGuard PortionInfoGuard; @@ -229,7 +230,7 @@ public: bool NeedCompaction(const TCompactionLimits& limits) const { if (InCompaction() || Empty()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_skipped_by_state")("granule_id", GetGranuleId())("granule_size", Size()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_skipped_by_state")("path_id", GetPathId())("granule_size", Size()); return false; } return GetCompactionType(limits) != TGranuleAdditiveSummary::ECompactionClass::NoCompaction; @@ -238,7 +239,7 @@ public: bool InCompaction() const; bool IsErasable() const { - return Activity.empty(); + return Activity.empty() && Portions.empty(); } void OnCompactionStarted(); @@ -249,16 +250,14 @@ public: void UpsertPortion(const TPortionInfo& info); TString DebugString() const { - return TStringBuilder() << "(granule:" << GetGranuleId() << ";" - << "path_id:" << Record.PathId << ";" + return TStringBuilder() << "(granule:" << GetPathId() << ";" + << "path_id:" << GetPathId() << ";" << "size:" << GetAdditiveSummary().GetGranuleSize() << ";" << "portions_count:" << Portions.size() << ";" << ")" ; } - const TGranuleRecord Record; - void AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta); const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const { @@ -266,7 +265,11 @@ public: } ui64 GetPathId() const { - return Record.PathId; + return PathId; + } + + ui64 GetGranuleId() const { + return PathId; } const TPortionInfo& GetPortionVerified(const ui64 portion) const { @@ -285,12 +288,8 @@ public: bool ErasePortion(const ui64 portion); - explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex); + explicit TGranuleMeta(const ui64 pathId, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex); - ui64 GetGranuleId() const { - return Record.Granule; - } - ui64 PathId() const noexcept { return Record.PathId; } bool Empty() const noexcept { return Portions.empty(); } ui64 Size() const; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h index 91a5df954ba..db486568fc0 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h @@ -56,8 +56,7 @@ protected: virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) = 0; virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const = 0; virtual TOptimizationPriority DoGetUsefulMetric() const = 0; - virtual void DoActualize(const TInstant /*currentInstant*/) { - } + virtual void DoActualize(const TInstant currentInstant) = 0; virtual TString DoDebugString() const { return ""; } @@ -107,9 +106,7 @@ public: return DoDebugString(); } - virtual std::vector<NIndexedReader::TSortableBatchPosition> GetBucketPositions() const { - return {}; - } + virtual std::vector<NIndexedReader::TSortableBatchPosition> GetBucketPositions() const = 0; NJson::TJsonValue SerializeToJsonVisual() const { return DoSerializeToJsonVisual(); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h index b7c6933d572..e0f14588424 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h @@ -468,6 +468,10 @@ private: const std::shared_ptr<IStoragesManager> StoragesManager; std::shared_ptr<TCounters> Counters; protected: + virtual std::vector<NIndexedReader::TSortableBatchPosition> GetBucketPositions() const override { + return {}; + } + virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) override { const TInstant currentInstant = TInstant::Now(); for (auto&& i : add) { @@ -501,7 +505,9 @@ protected: virtual TString DoDebugString() const override { return ""; } + virtual void DoActualize(const TInstant /*currentInstant*/) override { + } public: TLevelsOptimizerPlanner(const ui64 granuleId, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<arrow::Schema>& primaryKeysSchema) : TBase(granuleId) diff --git a/ydb/core/tx/columnshard/engines/storage/storage.cpp b/ydb/core/tx/columnshard/engines/storage/storage.cpp index 543f042f527..02ce7fd8e15 100644 --- a/ydb/core/tx/columnshard/engines/storage/storage.cpp +++ b/ydb/core/tx/columnshard/engines/storage/storage.cpp @@ -29,31 +29,6 @@ std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCom return nullptr; } return granule; -/* - for (auto it = GranuleCompactionPrioritySorting.rbegin(); it != GranuleCompactionPrioritySorting.rend(); ++it) { - if (it->first.GetWeight().IsZero()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "zero_granule_reached"); - break; - } - Y_ABORT_UNLESS(it->second.size()); - for (auto&& i : it->second) { - auto itGranule = granules.find(i); - Y_ABORT_UNLESS(itGranule != granules.end()); - if (it->first.GetWeight().GetInternalLevelWeight() > 0 * 1024 * 1024) { - -// if (it->first.GetWeight().GetInternalLevelWeight() / 10000000 > 100 || -// it->first.GetWeight().GetInternalLevelWeight() % 10000000 > 100000) { - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule")("granule_stats", it->first.DebugString())("granule_id", i); - return itGranule->second; - } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule_skipped")("granule_stats", it->first.DebugString())("granule_id", i)("skip_reason", "too_early_and_low_critical"); - break; - } - } - } - return {}; -*/ } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index fdb9d385931..5f0848436ae 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -27,10 +27,6 @@ public: return true; } - void WriteGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {} - void EraseGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {} - bool LoadGranules(ui32, const IColumnEngine&, const std::function<void(const TGranuleRecord&)>&) override { return true; } - void WriteColumn(ui32, const TPortionInfo&, const TColumnRecord&) override {} void EraseColumn(ui32, const TPortionInfo&, const TColumnRecord&) override {} bool LoadColumns(ui32, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>&) override { return true; } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 85aead27a33..2f97cca3ce5 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -22,8 +22,7 @@ private: std::map<TPortionAddress, std::map<TChunkAddress, TColumnChunkLoadContext>> LoadContexts; public: struct TIndex { - THashMap<ui64, std::vector<TGranuleRecord>> Granules; // pathId -> granule - THashMap<ui64, THashMap<ui64, TPortionInfo>> Columns; // granule -> portions + THashMap<ui64, THashMap<ui64, TPortionInfo>> Columns; // pathId -> portions THashMap<ui32, ui64> Counters; }; @@ -68,45 +67,6 @@ public: return true; } - void WriteGranule(ui32 index, const IColumnEngine&, const TGranuleRecord& row) override { - auto& granules = Indices[index].Granules[row.PathId]; - - bool replaced = false; - for (auto& rec : granules) { - if (rec == row) { - rec = row; - replaced = true; - break; - } - } - if (!replaced) { - granules.push_back(row); - } - } - - void EraseGranule(ui32 index, const IColumnEngine&, const TGranuleRecord& row) override { - auto& pathGranules = Indices[index].Granules[row.PathId]; - - std::vector<TGranuleRecord> filtered; - filtered.reserve(pathGranules.size()); - for (const TGranuleRecord& rec : pathGranules) { - if (rec.Granule != row.Granule) { - filtered.push_back(rec); - } - } - pathGranules.swap(filtered); - } - - bool LoadGranules(ui32 index, const IColumnEngine&, const std::function<void(const TGranuleRecord&)>& callback) override { - auto& granules = Indices[index].Granules; - for (auto& [pathId, vec] : granules) { - for (const auto& rec : vec) { - callback(rec); - } - } - return true; - } - void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) override { auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk); auto rowProto = row.GetMeta().SerializeToProto(); @@ -439,8 +399,10 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); TSnapshot indexSnaphot(1, 1); engine.UpdateDefaultSchema(indexSnaphot, TIndexInfo(tableInfo)); - THashSet<TUnifiedBlobId> lostBlobs; - engine.Load(db, lostBlobs); + for (auto&& i : paths) { + engine.RegisterTable(i); + } + engine.Load(db); std::vector<TInsertedData> dataToIndex = { TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, 0, {}), @@ -469,21 +431,21 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // select from snap before insert ui64 planStep = 1; ui64 txId = 0; - auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); } { // select from snap between insert (greater txId) ui64 planStep = 1; ui64 txId = 2; - auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); } { // select from snap after insert (greater planStep) ui64 planStep = 2; ui64 txId = 1; - auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSpecialColumnNames().size()); } @@ -491,7 +453,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // select another pathId ui64 planStep = 2; ui64 txId = 1; - auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); } } @@ -516,14 +478,14 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TTestDbWrapper db; TIndexInfo tableInfo = NColumnShard::BuildTableInfo(ydbSchema, key); + ui64 pathId = 1; + ui32 step = 1000; + TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); - THashSet<TUnifiedBlobId> lostBlobs; - engine.Load(db, lostBlobs); - - ui64 pathId = 1; - ui32 step = 1000; + engine.RegisterTable(pathId); + engine.Load(db); // insert ui64 planStep = 1; @@ -562,7 +524,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); } @@ -576,7 +538,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } NOlap::TPKRangesFilter pkFilter(false); Y_ABORT_UNLESS(pkFilter.Add(gt10k, nullptr, nullptr)); - auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); } @@ -588,7 +550,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } NOlap::TPKRangesFilter pkFilter(false); Y_ABORT_UNLESS(pkFilter.Add(nullptr, lt10k, nullptr)); - auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9); } } @@ -621,8 +583,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); TSnapshot indexSnapshot(1, 1); engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); - THashSet<TUnifiedBlobId> lostBlobs; - engine.Load(db, lostBlobs); + engine.RegisterTable(pathId); + engine.Load(db); ui64 numRows = 1000; ui64 rowPos = 0; @@ -648,7 +610,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's overloaded after reload TColumnEngineForLogs tmpEngine(0, TestLimits(), CommonStoragesManager); tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); - tmpEngine.Load(db, lostBlobs); + tmpEngine.RegisterTable(pathId); + tmpEngine.Load(db); } // compact @@ -678,7 +641,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's not overloaded after reload TColumnEngineForLogs tmpEngine(0, TestLimits(), CommonStoragesManager); tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); - tmpEngine.Load(db, lostBlobs); + tmpEngine.RegisterTable(pathId); + tmpEngine.Load(db); } } @@ -691,12 +655,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // insert ui64 planStep = 1; - THashSet<TUnifiedBlobId> lostBlobs; TSnapshot indexSnapshot(1, 1); { TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); - engine.Load(db, lostBlobs); + engine.RegisterTable(pathId); + engine.Load(db); ui64 numRows = 1000; ui64 rowPos = 0; @@ -729,7 +693,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); } @@ -738,7 +702,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); } @@ -754,7 +718,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); } } @@ -762,15 +726,15 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); - engine.Load(db, lostBlobs); - UNIT_ASSERT_VALUES_EQUAL(engine.GetTotalStats().EmptyGranules, 0); + engine.RegisterTable(pathId); + engine.Load(db); const TIndexInfo& indexInfo = engine.GetVersionedIndex().GetLastSchema()->GetIndexInfo(); THashSet<ui32> oneColumnId = {indexInfo.GetColumnId(testColumns[0].first)}; { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); } } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 403141f6127..f1fb371120f 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -13,8 +13,7 @@ void TSchemaPreset::Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& Name = presetProto.GetName(); } -bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId) { - TabletId = tabletId; +bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { { auto rowset = db.Table<Schema::TableInfo>().Select(); if (!rowset.IsReady()) { @@ -138,22 +137,15 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId) { return true; } -bool TTablesManager::LoadIndex(NOlap::TDbWrapper& idxDB, THashSet<NOlap::TUnifiedBlobId>& lostEvictions) { +bool TTablesManager::LoadIndex(NOlap::TDbWrapper& idxDB) { if (PrimaryIndex) { - if (!PrimaryIndex->Load(idxDB, lostEvictions, PathsToDrop)) { + if (!PrimaryIndex->Load(idxDB)) { return false; } } return true; } -void TTablesManager::Clear() { - Tables.clear(); - SchemaPresets.clear(); - PathsToDrop.clear(); - PrimaryIndex.reset(); -} - bool TTablesManager::HasTable(const ui64 pathId) const { auto it = Tables.find(pathId); if (it == Tables.end() || it->second.IsDropped()) { @@ -210,7 +202,11 @@ void TTablesManager::RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db) { Y_ABORT_UNLESS(table.IsEmpty()); Schema::SaveTableInfo(db, table.GetPathId(), table.GetTieringUsage()); - Tables.insert_or_assign(table.GetPathId(), std::move(table)); + const ui64 pathId = table.GetPathId(); + Tables.insert_or_assign(pathId, std::move(table)); + if (PrimaryIndex) { + PrimaryIndex->RegisterTable(pathId); + } } bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db) { @@ -280,6 +276,7 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim NOlap::TSnapshot snapshot{version.Step, version.TxId}; NOlap::TIndexInfo indexInfo = DeserializeIndexInfoFromProto(schema); indexInfo.SetAllKeys(); + const bool isFirstPrimaryIndexInitialization = !PrimaryIndex; if (!PrimaryIndex) { PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, NOlap::TCompactionLimits(), StoragesManager); } else { @@ -288,6 +285,11 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim Y_ABORT_UNLESS(lastIndexInfo.GetIndexKey()->Equals(indexInfo.GetIndexKey())); } PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo)); + if (isFirstPrimaryIndexInitialization) { + for (auto&& i : Tables) { + PrimaryIndex->RegisterTable(i.first); + } + } PrimaryIndex->OnTieringModified(nullptr, Ttl); } @@ -296,4 +298,11 @@ NOlap::TIndexInfo TTablesManager::DeserializeIndexInfoFromProto(const NKikimrSch Y_ABORT_UNLESS(indexInfo); return *indexInfo; } + +TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId) + : StoragesManager(storagesManager) + , TabletId(tabletId) +{ +} + } diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 8f8a4779fee..99db7751835 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -139,13 +139,9 @@ private: TTtl Ttl; std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; std::shared_ptr<NOlap::IStoragesManager> StoragesManager; - ui64 TabletId; + ui64 TabletId = 0; public: - TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager) - : StoragesManager(storagesManager) - { - - } + TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId); const TTtl& GetTtl() const { return Ttl; @@ -194,10 +190,8 @@ public: return *PrimaryIndex; } - bool InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId); - bool LoadIndex(NOlap::TDbWrapper& db, THashSet<NOlap::TUnifiedBlobId>& lostEvictions); - - void Clear(); + bool InitFromDB(NIceDb::TNiceDb& db); + bool LoadIndex(NOlap::TDbWrapper& db); const TTableInfo& GetTable(const ui64 pathId) const; ui64 GetMemoryUsage() const; |