aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-22 16:08:15 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-22 16:25:15 +0300
commitd40a08d163c0ec4b3cf546ce187466c8fc8af0b7 (patch)
tree68f5dce9cc173ee67e58f301d8457a8152161c70
parent12d1c39c9c20ca46bd4a788422bb653ebdfdef58 (diff)
downloadydb-d40a08d163c0ec4b3cf546ce187466c8fc8af0b7.tar.gz
KIKIMR-19091: remove granules. currently granule == path in shard
-rw-r--r--ydb/core/formats/arrow/reader/read_filter_merger.h79
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h84
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/cleanup.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp58
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/mark_granules.cpp105
-rw-r--r--ydb/core/tx/columnshard/engines/changes/mark_granules.h38
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp38
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h30
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp398
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h56
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.h8
-rw-r--r--ydb/core/tx/columnshard/engines/granules_table.h70
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common.h26
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h23
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h7
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/storage.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp98
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp33
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h14
46 files changed, 343 insertions, 985 deletions
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h
index c5f9de0fddc..6fef5323074 100644
--- a/ydb/core/formats/arrow/reader/read_filter_merger.h
+++ b/ydb/core/formats/arrow/reader/read_filter_merger.h
@@ -103,31 +103,6 @@ public:
}
};
- template <class TContainer>
- class TAssociatedContainerIterator {
- private:
- typename TContainer::const_iterator Current;
- typename TContainer::const_iterator End;
- public:
- TAssociatedContainerIterator(const TContainer& container)
- : Current(container.begin())
- , End(container.end())
- {
- }
-
- bool IsValid() const {
- return Current != End;
- }
-
- void Next() {
- ++Current;
- }
-
- const auto& CurrentPosition() const {
- return Current->first;
- }
- };
-
// (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf)
template <class TBordersIterator>
static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBorders(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, TBordersIterator& it) {
@@ -174,11 +149,65 @@ public:
}
template <class TContainer>
+ class TAssociatedContainerIterator {
+ private:
+ typename TContainer::const_iterator Current;
+ typename TContainer::const_iterator End;
+ public:
+ TAssociatedContainerIterator(const TContainer& container)
+ : Current(container.begin())
+ , End(container.end()) {
+ }
+
+ bool IsValid() const {
+ return Current != End;
+ }
+
+ void Next() {
+ ++Current;
+ }
+
+ const auto& CurrentPosition() const {
+ return Current->first;
+ }
+ };
+
+ template <class TContainer>
static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInAssociativeContainer(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TContainer& container) {
TAssociatedContainerIterator<TContainer> it(container);
return SplitByBorders(batch, columnNames, it);
}
+ template <class TContainer>
+ class TSequentialContainerIterator {
+ private:
+ typename TContainer::const_iterator Current;
+ typename TContainer::const_iterator End;
+ public:
+ TSequentialContainerIterator(const TContainer& container)
+ : Current(container.begin())
+ , End(container.end()) {
+ }
+
+ bool IsValid() const {
+ return Current != End;
+ }
+
+ void Next() {
+ ++Current;
+ }
+
+ const auto& CurrentPosition() const {
+ return *Current;
+ }
+ };
+
+ template <class TContainer>
+ static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInSequentialContainer(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TContainer& container) {
+ TSequentialContainerIterator<TContainer> it(container);
+ return SplitByBorders(batch, columnNames, it);
+ }
+
static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition);
TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound);
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index a14d2817421..365d8e6d024 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -204,8 +204,6 @@ void TColumnShard::UpdateIndexCounters() {
auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats();
SetCounter(COUNTER_INDEX_TABLES, stats.Tables);
- SetCounter(COUNTER_INDEX_GRANULES, stats.Granules);
- SetCounter(COUNTER_INDEX_EMPTY_GRANULES, stats.EmptyGranules);
SetCounter(COUNTER_INDEX_COLUMN_RECORDS, stats.ColumnRecords);
SetCounter(COUNTER_INDEX_COLUMN_METADATA_BYTES, stats.ColumnMetadataBytes);
SetCounter(COUNTER_INSERTED_PORTIONS, stats.GetInsertedStats().Portions);
@@ -235,7 +233,6 @@ void TColumnShard::UpdateIndexCounters() {
SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.GetEvictedStats().RawBytes);
LOG_S_DEBUG("Index: tables " << stats.Tables
- << " granules " << stats.Granules << " (empty " << stats.EmptyGranules << ")"
<< " inserted " << stats.GetInsertedStats().DebugString()
<< " compacted " << stats.GetCompactedStats().DebugString()
<< " s-compacted " << stats.GetSplitCompactedStats().DebugString()
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index fb7e6705b8a..d7fc92abe3b 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -41,7 +41,6 @@ void TTxInit::SetDefaults() {
Self->OwnerPath.clear();
Self->AltersInFlight.clear();
Self->CommitsInFlight.clear();
- Self->TablesManager.Clear();
Self->LongTxWrites.clear();
Self->LongTxWritesByUniqueId.clear();
}
@@ -131,13 +130,12 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
{
ACFL_INFO("step", "TTablesManager::Load_Start");
- TTablesManager tManagerLocal(Self->StoragesManager);
- THashSet<TUnifiedBlobId> lostEvictions;
- if (!tManagerLocal.InitFromDB(db, Self->TabletID())) {
+ TTablesManager tManagerLocal(Self->StoragesManager, Self->TabletID());
+ if (!tManagerLocal.InitFromDB(db)) {
ACFL_ERROR("step", "TTablesManager::InitFromDB_Fails");
return false;
}
- if (!tManagerLocal.LoadIndex(dbTable, lostEvictions)) {
+ if (!tManagerLocal.LoadIndex(dbTable)) {
ACFL_ERROR("step", "TTablesManager::Load_Fails");
return false;
}
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index a88639a7543..c2816e27e84 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -133,7 +133,6 @@ void TTxRead::Complete(const TActorContext& ctx) {
std::static_pointer_cast<const NOlap::TReadMetadataBase>(ReadMetadata));
auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta();
- Self->IncCounter(COUNTER_READ_INDEX_GRANULES, statsDelta.Granules);
Self->IncCounter(COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions);
Self->IncCounter(COUNTER_READ_INDEX_BLOBS, statsDelta.Blobs);
Self->IncCounter(COUNTER_READ_INDEX_ROWS, statsDelta.Rows);
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 59b6d22c169..40ad858f28d 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -959,7 +959,6 @@ void TTxScan::Complete(const TActorContext& ctx) {
ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRanges);
auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta();
- Self->IncCounter(COUNTER_READ_INDEX_GRANULES, statsDelta.Granules);
Self->IncCounter(COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions);
Self->IncCounter(COUNTER_READ_INDEX_BLOBS, statsDelta.Blobs);
Self->IncCounter(COUNTER_READ_INDEX_ROWS, statsDelta.Rows);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index da061fff0c7..e30f6e32db7 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -158,7 +158,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, ProgressTxController(std::make_unique<TTxController>(*this))
, StoragesManager(std::make_shared<TStoragesManager>(*this))
, InFlightReadsTracker(StoragesManager)
- , TablesManager(StoragesManager)
+ , TablesManager(StoragesManager, info->TabletID)
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
, SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>())
diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp
index 8959782e98b..34ffa4062a7 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/columnshard_schema.cpp
@@ -9,7 +9,7 @@ bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* ds
while (!rowset.EndOfSet()) {
NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
- portion.SetGranule(rowset.GetValue<IndexColumns::Granule>());
+ portion.SetPathId(rowset.GetValue<IndexColumns::PathId>());
portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
portion.SetPortion(rowset.GetValue<IndexColumns::Portion>());
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 209ab2c275f..f92ecc667d4 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -6,7 +6,6 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h>
-#include <ydb/core/tx/columnshard/engines/granules_table.h>
#include <ydb/core/tx/columnshard/engines/columns_table.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/operations/write.h>
@@ -32,12 +31,10 @@ struct Schema : NIceDb::Schema {
using TSettings = SchemaSettings<EmptySettings>;
using TInsertedData = NOlap::TInsertedData;
- using TGranuleRecord = NOlap::TGranuleRecord;
using TColumnRecord = NOlap::TColumnRecord;
enum EIndexTables : ui32 {
InsertTableId = 255,
- GranulesTableId,
ColumnsTableId,
CountersTableId,
OperationsTableId,
@@ -218,22 +215,9 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion>;
};
- struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
- struct Index : Column<1, NScheme::NTypeIds::Uint32> {};
- struct PathId : Column<2, NScheme::NTypeIds::Uint64> {}; // Logical table (if many)
- struct IndexKey : Column<3, NScheme::NTypeIds::String> {}; // Effective part of PK (serialized)
- struct Granule : Column<4, NScheme::NTypeIds::Uint64> {}; // FK: {Index, Granule} -> TIndexColumns
- struct PlanStep : Column<5, NScheme::NTypeIds::Uint64> {};
- struct TxId : Column<6, NScheme::NTypeIds::Uint64> {};
- struct Metadata : Column<7, NScheme::NTypeIds::String> {}; // NKikimrTxColumnShard.TIndexGranuleMeta
-
- using TKey = TableKey<Index, PathId, IndexKey>;
- using TColumns = TableColumns<Index, PathId, IndexKey, Granule, PlanStep, TxId, Metadata>;
- };
-
struct IndexColumns : NIceDb::Schema::Table<ColumnsTableId> {
struct Index : Column<1, NScheme::NTypeIds::Uint32> {};
- struct Granule : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct PathId : Column<2, NScheme::NTypeIds::Uint64> {};
struct ColumnIdx : Column<3, NScheme::NTypeIds::Uint32> {};
struct PlanStep : Column<4, NScheme::NTypeIds::Uint64> {};
struct TxId : Column<5, NScheme::NTypeIds::Uint64> {};
@@ -246,8 +230,8 @@ struct Schema : NIceDb::Schema {
struct Offset : Column<12, NScheme::NTypeIds::Uint32> {};
struct Size : Column<13, NScheme::NTypeIds::Uint32> {};
- using TKey = TableKey<Index, Granule, ColumnIdx, PlanStep, TxId, Portion, Chunk>;
- using TColumns = TableColumns<Index, Granule, ColumnIdx, PlanStep, TxId, Portion, Chunk,
+ using TKey = TableKey<Index, PathId, ColumnIdx, PlanStep, TxId, Portion, Chunk>;
+ using TColumns = TableColumns<Index, PathId, ColumnIdx, PlanStep, TxId, Portion, Chunk,
XPlanStep, XTxId, Blob, Metadata, Offset, Size>;
};
@@ -301,7 +285,6 @@ struct Schema : NIceDb::Schema {
BlobsToKeep,
BlobsToDelete,
InsertTable,
- IndexGranules,
IndexColumns,
IndexCounters,
SmallBlobs,
@@ -504,63 +487,6 @@ struct Schema : NIceDb::Schema {
NOlap::TInsertTableAccessor& insertTable,
const TInstant& loadTime);
- // IndexGranules activities
-
- static void IndexGranules_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine,
- const TGranuleRecord& row) {
- TString metaStr;
- const auto& indexInfo = engine.GetVersionedIndex().GetLastSchema()->GetIndexInfo();
-
- NKikimrTxColumnShard::TIndexGranuleMeta meta;
- Y_ABORT_UNLESS(indexInfo.GetIndexKey());
- meta.SetMarkSize(indexInfo.GetIndexKey()->num_fields());
- Y_ABORT_UNLESS(meta.SerializeToString(&metaStr));
-
- db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Update(
- NIceDb::TUpdate<IndexGranules::Granule>(row.Granule),
- NIceDb::TUpdate<IndexGranules::PlanStep>(row.GetCreatedAt().GetPlanStep()),
- NIceDb::TUpdate<IndexGranules::TxId>(row.GetCreatedAt().GetTxId()),
- NIceDb::TUpdate<IndexGranules::Metadata>(metaStr)
- );
- }
-
- static void IndexGranules_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine,
- const TGranuleRecord& row) {
- db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Delete();
- }
-
- static bool IndexGranules_Load(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine,
- const std::function<void(const TGranuleRecord&)>& callback) {
- auto rowset = db.Table<IndexGranules>().Prefix(index).Select();
- if (!rowset.IsReady())
- return false;
-
- while (!rowset.EndOfSet()) {
- ui64 pathId = rowset.GetValue<IndexGranules::PathId>();
- TString indexKey = rowset.GetValue<IndexGranules::IndexKey>();
- ui64 granule = rowset.GetValue<IndexGranules::Granule>();
- ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>();
- ui64 txId = rowset.GetValue<IndexGranules::TxId>();
- TString metaStr = rowset.GetValue<IndexGranules::Metadata>();
-
- std::optional<ui32> markNumKeys;
- if (metaStr.size()) {
- NKikimrTxColumnShard::TIndexGranuleMeta meta;
- Y_ABORT_UNLESS(meta.ParseFromString(metaStr));
- if (meta.HasMarkSize()) {
- markNumKeys = meta.GetMarkSize();
- }
- }
-
- callback(TGranuleRecord(pathId, granule, NOlap::TSnapshot(planStep, txId),
- engine.DeserializeMark(indexKey, markNumKeys)));
-
- if (!rowset.Next())
- return false;
- }
- return true;
- }
-
// IndexColumns activities
static void IndexColumns_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
@@ -569,7 +495,7 @@ struct Schema : NIceDb::Schema {
if (proto) {
*rowProto.MutablePortionMeta() = std::move(*proto);
}
- db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId,
+ db.Table<IndexColumns>().Key(index, portion.GetPathId(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update(
NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()),
NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()),
@@ -581,7 +507,7 @@ struct Schema : NIceDb::Schema {
}
static void IndexColumns_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
- db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId,
+ db.Table<IndexColumns>().Key(index, portion.GetPathId(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
}
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
index bab0224d428..8cc56b5bc31 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
@@ -30,7 +30,6 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
index 96d8d37c049..9c190e512ec 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
@@ -31,7 +31,6 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
index 96d8d37c049..9c190e512ec 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
@@ -31,7 +31,6 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
index bab0224d428..8cc56b5bc31 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
@@ -30,7 +30,6 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp
index 213d088db1b..e3e2a73955d 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/compaction_info.cpp
@@ -1,5 +1,4 @@
#include "compaction_info.h"
-#include <ydb/core/tx/columnshard/engines/storage/granule.h>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
index 18952183ed2..542943c9ca4 100644
--- a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
@@ -2,6 +2,7 @@
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
namespace NKikimr::NOlap {
@@ -14,16 +15,28 @@ void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const {
}
}
-void TCleanupColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& /*context*/) {
+void TCleanupColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
self.IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
THashSet<TUnifiedBlobId> blobIds;
+ THashSet<ui64> pathIds;
for (auto&& p : PortionsToDrop) {
auto removing = BlobsAction.GetRemoving(p);
for (auto&& r : p.Records) {
removing->DeclareRemove(r.BlobRange.BlobId);
}
+ pathIds.emplace(p.GetPathId());
self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum());
}
+ for (auto&& p: pathIds) {
+ auto itDrop = self.TablesManager.GetPathsToDrop().find(p);
+ if (itDrop != self.TablesManager.GetPathsToDrop().end()) {
+ if (!self.TablesManager.GetPrimaryIndexSafe().HasDataInPathId(p)) {
+ self.TablesManager.MutablePathsToDrop().erase(itDrop);
+ NIceDb::TNiceDb db(context.Txc.DB);
+ NColumnShard::Schema::EraseTableInfo(db, p);
+ }
+ }
+ }
}
bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 2335394095a..8c23b15af2a 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -1,5 +1,4 @@
#include "indexation.h"
-#include "mark_granules.h"
#include <ydb/core/tx/columnshard/blob_cache.h>
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
@@ -25,19 +24,6 @@ void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self,
}
}
-std::optional<ui64> TInsertColumnEngineChanges::AddPathIfNotExists(ui64 pathId) {
- if (PathToGranule.contains(pathId)) {
- return {};
- }
-
- Y_ABORT_UNLESS(FirstGranuleId);
- ui64 granule = FirstGranuleId;
- ++FirstGranuleId;
-
- NewGranules.emplace(granule, std::make_pair(pathId, DefaultMark));
- return granule;
-}
-
void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
TBase::DoStart(self);
Y_ABORT_UNLESS(DataToIndex.size());
@@ -111,7 +97,6 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
Y_ABORT_UNLESS(Blobs.empty());
const std::vector<std::string> comparableColumns = resultSchema->GetIndexInfo().GetReplaceKey()->field_names();
for (auto& [pathId, batches] : pathBatches) {
- auto newGranuleId = AddPathIfNotExists(pathId);
NIndexedReader::TMergePartialStream stream(resultSchema->GetIndexInfo().GetReplaceKey(), resultSchema->GetIndexInfo().ArrowSchemaWithSpecials(), false);
THashMap<std::string, ui64> fieldSizes;
ui64 rowsCount = 0;
@@ -127,37 +112,22 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
stream.SetPossibleSameVersion(true);
stream.DrainAll(builder);
- THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> batchChunks;
- if (PathToGranule[pathId].empty()) {
- AFL_VERIFY(newGranuleId);
- batchChunks[*newGranuleId].emplace_back(builder.Finalize());
- } else {
- auto& markers = PathToGranule[pathId];
- std::vector<std::shared_ptr<arrow::RecordBatch>> result = NIndexedReader::TSortableBatchPosition::SplitByBordersInAssociativeContainer(builder.Finalize(), comparableColumns, markers);
- AFL_VERIFY(result.size() == markers.size() + 1)("result", result.size())("markers", markers.size());
- ui32 idx = 0;
- for (auto&& i : markers) {
- auto chunk = result[++idx];
- if (chunk) {
- batchChunks[i.second].emplace_back(chunk);
- }
+ auto itGranule = PathToGranule.find(pathId);
+ AFL_VERIFY(itGranule != PathToGranule.end());
+ std::vector<std::shared_ptr<arrow::RecordBatch>> result = NIndexedReader::TSortableBatchPosition::SplitByBordersInSequentialContainer(builder.Finalize(), comparableColumns, itGranule->second);
+ for (auto&& b : result) {
+ if (!b) {
+ continue;
}
- if (result.front()) {
- batchChunks[markers.begin()->second].emplace_back(result.front());
+ if (b->num_rows() < 100) {
+ SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::UNCOMPRESSED));
+ } else {
+ SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::LZ4_FRAME));
}
- }
- for (auto&& g : batchChunks) {
- for (auto&& b : g.second) {
- if (b->num_rows() < 100) {
- SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::UNCOMPRESSED));
- } else {
- SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::LZ4_FRAME));
- }
- auto portions = MakeAppendedPortions(b, g.first, maxSnapshot, nullptr, context);
- Y_ABORT_UNLESS(portions.size());
- for (auto& portion : portions) {
- AppendedPortions.emplace_back(std::move(portion));
- }
+ auto portions = MakeAppendedPortions(b, pathId, maxSnapshot, nullptr, context);
+ Y_ABORT_UNLESS(portions.size());
+ for (auto& portion : portions) {
+ AppendedPortions.emplace_back(std::move(portion));
}
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h
index f28ddfdd77e..0a89d493034 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.h
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.h
@@ -23,7 +23,7 @@ protected:
virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override;
public:
const TMark DefaultMark;
- THashMap<ui64, std::map<NIndexedReader::TSortableBatchPosition, ui64>> PathToGranule; // pathId -> {pos, granule}
+ THashMap<ui64, std::vector<NIndexedReader::TSortableBatchPosition>> PathToGranule; // pathId -> positions (sorted by pk)
public:
TInsertColumnEngineChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& dataToIndex, const TSplitSettings& splitSettings, const TSaverContext& saverContext)
: TBase(splitSettings, saverContext, StaticTypeName())
diff --git a/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp b/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
deleted file mode 100644
index 55b8534441a..00000000000
--- a/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
+++ /dev/null
@@ -1,105 +0,0 @@
-#include "mark_granules.h"
-
-namespace NKikimr::NOlap {
-TMarksGranules::TMarksGranules(std::vector<TPair>&& marks) noexcept
- : Marks(std::move(marks)) {
- Y_DEBUG_ABORT_UNLESS(std::is_sorted(Marks.begin(), Marks.end()));
-}
-
-TMarksGranules::TMarksGranules(std::vector<TMark>&& points) {
- std::sort(points.begin(), points.end());
-
- Marks.reserve(points.size());
-
- for (size_t i = 0, end = points.size(); i != end; ++i) {
- Marks.emplace_back(std::move(points[i]), i + 1);
- }
-}
-
-TMarksGranules::TMarksGranules(const TSelectInfo& selectInfo) {
- Marks.reserve(selectInfo.Granules.size());
-
- for (const auto& rec : selectInfo.Granules) {
- Marks.emplace_back(std::make_pair(rec.Mark, rec.Granule));
- }
-
- std::sort(Marks.begin(), Marks.end(), [](const TPair& a, const TPair& b) {
- return a.first < b.first;
- });
-}
-
-bool TMarksGranules::MakePrecedingMark(const TIndexInfo& indexInfo) {
- ui64 minGranule = 0;
- TMark minMark(TMark::MinBorder(indexInfo.GetIndexKey()));
- if (Marks.empty()) {
- Marks.emplace_back(std::move(minMark), minGranule);
- return true;
- }
-
- if (minMark < Marks[0].first) {
- std::vector<TPair> copy;
- copy.reserve(Marks.size() + 1);
- copy.emplace_back(std::move(minMark), minGranule);
- for (auto&& [mark, granule] : Marks) {
- copy.emplace_back(std::move(mark), granule);
- }
- Marks.swap(copy);
- return true;
- }
- return false;
-}
-
-THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> TMarksGranules::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::map<NArrow::TReplaceKey, ui64>& granules, const TIndexInfo& indexInfo) {
- Y_ABORT_UNLESS(batch);
- if (batch->num_rows() == 0) {
- return {};
- }
- THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> out;
-
- if (granules.size() == 1) {
- out[granules.begin()->second].emplace_back(batch);
- } else {
- const auto effKey = GetEffectiveKey(batch, indexInfo);
- Y_ABORT_UNLESS(effKey->num_columns() && effKey->num_rows());
-
- std::vector<NArrow::TRawReplaceKey> keys;
- {
- const auto& columns = effKey->columns();
- keys.reserve(effKey->num_rows());
- for (i64 i = 0; i < effKey->num_rows(); ++i) {
- keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
- }
- }
-
- i64 offset = 0;
- auto itNext = granules.begin();
- ++itNext;
- for (auto it = granules.begin(); it != granules.end() && offset < effKey->num_rows(); ++it) {
- const i64 end = (itNext == granules.end())
- // Just take the number of elements in the key column for the last granule.
- ? effKey->num_rows()
- // Locate position of the next granule in the key.
- : NArrow::TReplaceKeyHelper::LowerBound(keys, itNext->first, offset);
-
- if (const i64 size = end - offset) {
- out[it->second].emplace_back(batch->Slice(offset, size));
- }
-
- offset = end;
- if (itNext != granules.end()) {
- ++itNext;
- }
- }
- }
- return out;
-}
-
-std::shared_ptr<arrow::RecordBatch> TMarksGranules::GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch, const TIndexInfo& indexInfo) {
- const auto& key = indexInfo.GetIndexKey();
- auto resBatch = NArrow::ExtractColumns(batch, key);
- Y_VERIFY_S(resBatch, "Cannot extract effective key " << key->ToString()
- << " from batch " << batch->schema()->ToString());
- return resBatch;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/changes/mark_granules.h b/ydb/core/tx/columnshard/engines/changes/mark_granules.h
deleted file mode 100644
index b1026bf5776..00000000000
--- a/ydb/core/tx/columnshard/engines/changes/mark_granules.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-#include "abstract/mark.h"
-#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
-#include <ydb/core/tx/columnshard/engines/column_engine.h>
-
-namespace NKikimr::NOlap {
-
-class TMarksGranules {
-public:
- using TPair = std::pair<TMark, ui64>;
-
- TMarksGranules() = default;
- TMarksGranules(std::vector<TPair>&& marks) noexcept;
- TMarksGranules(std::vector<TMark>&& points);
- TMarksGranules(const TSelectInfo& selectInfo);
-
- const std::vector<TPair>& GetOrderedMarks() const noexcept {
- return Marks;
- }
-
- bool Empty() const noexcept {
- return Marks.empty();
- }
-
- bool MakePrecedingMark(const TIndexInfo& indexInfo);
-
- static THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::map<NArrow::TReplaceKey, ui64>& granules,
- const TIndexInfo& indexInfo);
-
- static std::shared_ptr<arrow::RecordBatch> GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch,
- const TIndexInfo& indexInfo);
-
-private:
- std::vector<TPair> Marks;
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 33d8e6ba65a..5d28cc9b8ae 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -55,36 +55,7 @@ void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIn
}
bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
- // Save new granules
- std::map<ui64, ui64> remapGranules;
- for (auto& [granule, p] : NewGranules) {
- ui64 pathId = p.first;
- TMark mark = p.second;
- TGranuleRecord rec(pathId, granule, context.Snapshot, mark.GetBorder());
- auto oldGranuleId = self.NewGranule(rec);
- if (!oldGranuleId) {
- self.GranulesTable->Write(context.DB, rec);
- } else {
- remapGranules.emplace(rec.Granule, *oldGranuleId);
- }
- }
// Save new portions (their column records)
-
- THashMap<ui64, std::shared_ptr<TGranuleMeta>> granules;
- for (auto& portionInfoWithBlobs : AppendedPortions) {
- auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
- Y_ABORT_UNLESS(!portionInfo.Empty());
- auto it = remapGranules.find(portionInfo.GetGranule());
- if (it != remapGranules.end()) {
- granules.emplace(it->second, self.GetGranulePtrVerified(it->second));
- } else {
- granules.emplace(portionInfo.GetGranule(), self.GetGranulePtrVerified(portionInfo.GetGranule()));
- }
- }
- for (auto& [_, portionInfo] : PortionsToRemove) {
- granules.emplace(portionInfo.GetGranule(), self.GetGranulePtrVerified(portionInfo.GetGranule()));
- }
- NJson::TJsonValue sbJson = NJson::JSON_MAP;
{
auto g = self.GranulesStorage->StartPackModification();
@@ -92,10 +63,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
Y_ABORT_UNLESS(!portionInfo.Empty());
Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot());
- const ui64 granule = portionInfo.GetGranule();
- const ui64 portion = portionInfo.GetPortion();
-
- const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion);
+ const TPortionInfo& oldInfo = self.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion());
self.UpsertPortion(portionInfo, &oldInfo);
@@ -106,10 +74,6 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
for (auto& portionInfoWithBlobs : AppendedPortions) {
auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
Y_ABORT_UNLESS(!portionInfo.Empty());
- auto it = remapGranules.find(portionInfo.GetGranule());
- if (it != remapGranules.end()) {
- portionInfo.SetGranule(it->second);
- }
self.UpsertPortion(portionInfo);
for (auto& record : portionInfo.Records) {
self.ColumnsTable->Write(context.DB, portionInfo, record);
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index 912ca123e79..bfc9ec09b4a 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -46,8 +46,6 @@ public:
THashMap<TPortionAddress, TPortionInfo> PortionsToRemove;
std::vector<TPortionInfoWithBlobs> AppendedPortions;
- THashMap<ui64, std::pair<ui64, TMark>> NewGranules;
- ui64 FirstGranuleId = 0;
virtual ui32 GetWritePortionsCount() const override {
return AppendedPortions.size();
}
diff --git a/ydb/core/tx/columnshard/engines/changes/ya.make b/ydb/core/tx/columnshard/engines/changes/ya.make
index fb399e43a85..faf74a7c05b 100644
--- a/ydb/core/tx/columnshard/engines/changes/ya.make
+++ b/ydb/core/tx/columnshard/engines/changes/ya.make
@@ -5,7 +5,6 @@ SRCS(
ttl.cpp
indexation.cpp
cleanup.cpp
- mark_granules.cpp
with_appended.cpp
general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 2c856247e2a..faad27bcb60 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -1,5 +1,5 @@
#pragma once
-#include "granules_table.h"
+#include "db_wrapper.h"
#include "portions/portion_info.h"
#include "scheme/snapshot_scheme.h"
#include "predicate/filter.h"
@@ -21,7 +21,6 @@ class TCleanupColumnEngineChanges;
struct TSelectInfo {
struct TStats {
- size_t Granules{};
size_t Portions{};
size_t Records{};
size_t Blobs{};
@@ -29,7 +28,6 @@ struct TSelectInfo {
size_t Bytes{};
const TStats& operator += (const TStats& stats) {
- Granules += stats.Granules;
Portions += stats.Portions;
Records += stats.Records;
Blobs += stats.Blobs;
@@ -39,17 +37,12 @@ struct TSelectInfo {
}
};
- std::vector<TGranuleRecord> Granules; // ordered by key (ascending)
std::vector<std::shared_ptr<TPortionInfo>> PortionsOrderedPK;
NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>> GetPortionsOrdered(const bool reverse) const {
return NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>>(PortionsOrderedPK, reverse);
}
- NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>> GetGranulesOrdered(const bool reverse) const {
- return NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>>(Granules, reverse);
- }
-
size_t NumChunks() const {
size_t records = 0;
for (auto& portionInfo : PortionsOrderedPK) {
@@ -60,7 +53,6 @@ struct TSelectInfo {
TStats Stats() const {
TStats out;
- out.Granules = Granules.size();
out.Portions = PortionsOrderedPK.size();
THashSet<TUnifiedBlobId> uniqBlob;
@@ -79,13 +71,6 @@ struct TSelectInfo {
}
friend IOutputStream& operator << (IOutputStream& out, const TSelectInfo& info) {
- if (info.Granules.size()) {
- out << "granules:";
- for (auto& rec : info.Granules) {
- out << " " << rec;
- }
- out << "; ";
- }
if (info.PortionsOrderedPK.size()) {
out << "portions:";
for (auto& portionInfo : info.PortionsOrderedPK) {
@@ -170,8 +155,6 @@ public:
};
i64 Tables{};
- i64 Granules{};
- i64 EmptyGranules{};
i64 ColumnRecords{};
i64 ColumnMetadataBytes{};
THashMap<TPortionMeta::EProduced, TPortionsStats> StatsByType;
@@ -362,6 +345,8 @@ public:
class IColumnEngine {
+protected:
+ virtual void DoRegisterTable(const ui64 pathId) = 0;
public:
virtual ~IColumnEngine() = default;
@@ -373,10 +358,13 @@ public:
virtual TString SerializeMark(const NArrow::TReplaceKey& key) const = 0;
virtual NArrow::TReplaceKey DeserializeMark(const TString& key, std::optional<ui32> markNumKeys) const = 0;
- virtual bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) = 0;
-
+ virtual bool HasDataInPathId(const ui64 pathId) const = 0;
+ virtual bool Load(IDbWrapper& db) = 0;
+ void RegisterTable(const ui64 pathId) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RegisterTable")("path_id", pathId);
+ return DoRegisterTable(pathId);
+ }
virtual std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot,
- const THashSet<ui32>& columnIds,
const TPKRangesFilter& pkRangesFilter) const = 0;
virtual std::shared_ptr<TInsertColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 1cbf1132704..ccfe9715e5e 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -29,7 +29,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimit
ui64 TColumnEngineForLogs::MemoryUsage() const {
auto numPortions = Counters.GetPortionsCount();
- return Counters.Granules * (sizeof(TGranuleMeta) + sizeof(ui64)) +
+ return Counters.Tables * (sizeof(TGranuleMeta) + sizeof(ui64)) +
numPortions * (sizeof(TPortionInfo) + sizeof(ui64)) +
Counters.ColumnRecords * sizeof(TColumnRecord) +
Counters.ColumnMetadataBytes;
@@ -40,9 +40,7 @@ const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& TColumnEngineForLogs::Get
}
const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() {
- Counters.Tables = PathGranules.size();
- Counters.Granules = Granules.size();
- Counters.EmptyGranules = EmptyGranules.size();
+ Counters.Tables = Tables.size();
return Counters;
}
@@ -51,10 +49,7 @@ void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, E
const TPortionInfo* exPortionInfo) {
UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo);
- const ui64 granule = portionInfo.GetGranule();
- Y_ABORT_UNLESS(granule);
- Y_ABORT_UNLESS(Granules.contains(granule));
- ui64 pathId = Granules[granule]->PathId();
+ const ui64 pathId = portionInfo.GetGranule();
Y_ABORT_UNLESS(pathId);
if (!PathStats.contains(pathId)) {
auto& stats = PathStats[pathId];
@@ -132,24 +127,21 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
}
void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) {
- if (!GranulesTable) {
+ if (!ColumnsTable) {
ui32 indexId = info.GetId();
- GranulesTable = std::make_shared<TGranulesTable>(*this, indexId);
ColumnsTable = std::make_shared<TColumnsTable>(indexId);
CountersTable = std::make_shared<TCountersTable>(indexId);
}
VersionedIndex.AddIndex(snapshot, std::move(info));
}
-bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) {
+bool TColumnEngineForLogs::Load(IDbWrapper& db) {
Y_ABORT_UNLESS(!Loaded);
Loaded = true;
+ THashMap<ui64, ui64> granuleToPathIdDecoder;
{
auto guard = GranulesStorage->StartPackModification();
- if (!LoadGranules(db)) {
- return false;
- }
- if (!LoadColumns(db, lostBlobs)) {
+ if (!LoadColumns(db)) {
return false;
}
if (!LoadCounters(db)) {
@@ -157,12 +149,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
}
}
- THashSet<ui64> emptyGranulePaths;
- for (const auto& [granule, spg] : Granules) {
- if (spg->Empty()) {
- EmptyGranules.insert(granule);
- emptyGranulePaths.insert(spg->PathId());
- }
+ for (const auto& [pathId, spg] : Tables) {
for (const auto& [_, portionInfo] : spg->GetPortions()) {
UpdatePortionStats(*portionInfo, EStatsUpdateType::ADD);
if (portionInfo->CheckForCleanup()) {
@@ -171,40 +158,12 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
}
}
- // Cleanup empty granules
- for (auto& pathId : emptyGranulePaths) {
- for (auto& emptyGranules : EmptyGranuleTracks(pathId)) {
- // keep first one => merge, keep nothing => drop.
- bool keepFirst = !pathsToDrop.contains(pathId);
- for (auto& [mark, granule] : emptyGranules) {
- if (keepFirst) {
- keepFirst = false;
- continue;
- }
-
- Y_ABORT_UNLESS(Granules.contains(granule));
- auto spg = Granules[granule];
- Y_ABORT_UNLESS(spg);
- GranulesTable->Erase(db, spg->Record);
- EraseGranule(pathId, granule, mark);
- }
- }
- }
-
Y_ABORT_UNLESS(!(LastPortion >> 63), "near to int overflow");
Y_ABORT_UNLESS(!(LastGranule >> 63), "near to int overflow");
return true;
}
-bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) {
- auto callback = [&](const TGranuleRecord& rec) {
- SetGranule(rec);
- };
-
- return GranulesTable->Load(db, callback);
-}
-
-bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) {
+bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
TSnapshot lastSnapshot(0, 0);
const TIndexInfo* currentIndexInfo = nullptr;
auto result = ColumnsTable->Load(db, [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
@@ -213,13 +172,11 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>&
lastSnapshot = portion.GetMinSnapshot();
}
Y_ABORT_UNLESS(portion.ValidSnapshotInfo());
- // Do not count the blob as lost since it exists in the index.
- lostBlobs.erase(loadContext.GetBlobRange().BlobId);
// Locate granule and append the record.
TColumnRecord rec(loadContext, *currentIndexInfo);
- GetGranulePtrVerified(portion.GetGranule())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta());
+ GetGranulePtrVerified(portion.GetPathId())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta());
});
- for (auto&& i : Granules) {
+ for (auto&& i : Tables) {
i.second->OnAfterPortionsLoad();
}
return result;
@@ -252,7 +209,6 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st
TSaverContext saverContext(StoragesManager->GetInsertOperator(), StoragesManager);
auto changes = std::make_shared<TInsertColumnEngineChanges>(DefaultMark(), std::move(dataToIndex), TSplitSettings(), saverContext);
- ui32 reserveGranules = 0;
auto pkSchema = VersionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey();
for (const auto& data : changes->GetDataToIndex()) {
@@ -261,32 +217,14 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st
if (changes->PathToGranule.contains(pathId)) {
continue;
}
-
- if (PathGranules.contains(pathId)) {
- const auto& src = PathGranules[pathId];
- for (auto&& i : src) {
- NIndexedReader::TSortableBatchPosition pos(i.first.GetBorder().ToBatch(pkSchema), 0, pkSchema->field_names(), {}, false);
- changes->PathToGranule[pathId].emplace(pos, i.second);
- for (auto&& pos : GetGranulePtrVerified(i.second)->GetBucketPositions()) {
- changes->PathToGranule[pathId].emplace(pos, i.second);
- }
- }
- } else {
- // It could reserve more than needed in case of the same pathId in DataToIndex
- ++reserveGranules;
- }
- }
-
- if (reserveGranules) {
- changes->FirstGranuleId = LastGranule + 1;
- LastGranule += reserveGranules;
+ changes->PathToGranule[pathId] = GetGranulePtrVerified(pathId)->GetBucketPositions();
}
return changes;
}
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept {
- auto granule = GranulesStorage->GetGranuleForCompaction(Granules);
+ auto granule = GranulesStorage->GetGranuleForCompaction(Tables);
if (!granule) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction");
return nullptr;
@@ -310,30 +248,21 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
THashSet<ui64> dropPortions;
THashSet<ui64> emptyPaths;
for (ui64 pathId : pathsToDrop) {
- if (!PathGranules.contains(pathId)) {
+ auto itTable = Tables.find(pathId);
+ if (itTable == Tables.end()) {
emptyPaths.insert(pathId);
continue;
}
- for (const auto& [_, granule]: PathGranules[pathId]) {
- Y_ABORT_UNLESS(Granules.contains(granule));
- auto spg = Granules[granule];
- Y_ABORT_UNLESS(spg);
- for (auto& [portion, info] : spg->GetPortions()) {
- affectedRecords += info->NumChunks();
- changes->PortionsToDrop.push_back(*info);
- dropPortions.insert(portion);
- }
+ for (auto& [portion, info] : itTable->second->GetPortions()) {
+ affectedRecords += info->NumChunks();
+ changes->PortionsToDrop.push_back(*info);
+ dropPortions.insert(portion);
if (affectedRecords > maxRecords) {
break;
}
}
-
- if (affectedRecords > maxRecords) {
- changes->NeedRepeat = true;
- break;
- }
}
for (ui64 pathId : emptyPaths) {
pathsToDrop.erase(pathId);
@@ -375,8 +304,8 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
Y_ABORT_UNLESS(context.Changes->Tiering.emplace(pathId, ttl).second);
TDuration dWaiting = NYDBTest::TControllers::GetColumnShardController()->GetTTLDefaultWaitingDuration(TDuration::Minutes(1));
- auto itGranules = PathGranules.find(pathId);
- if (itGranules == PathGranules.end()) {
+ auto itTable = Tables.find(pathId);
+ if (itTable == Tables.end()) {
return dWaiting;
}
@@ -388,85 +317,79 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
auto ttlColumnNames = ttl.GetTtlColumns();
Y_ABORT_UNLESS(ttlColumnNames.size() == 1); // TODO: support different ttl columns
ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin());
- for (const auto& [ts, granule] : itGranules->second) {
- auto itGranule = Granules.find(granule);
- auto spg = itGranule->second;
- Y_ABORT_UNLESS(spg);
+ for (auto& [portion, info] : itTable->second->GetPortions()) {
+ if (info->HasRemoveSnapshot()) {
+ continue;
+ }
+ if (context.BusyPortions.contains(info->GetAddress())) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip ttl through busy portion")("portion_id", info->GetAddress().DebugString());
+ continue;
+ }
- for (auto& [portion, info] : spg->GetPortions()) {
- if (info->HasRemoveSnapshot()) {
- continue;
- }
- if (context.BusyPortions.contains(info->GetAddress())) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip ttl through busy portion")("portion_id", info->GetAddress().DebugString());
- continue;
+ context.AllowEviction = (evictionSize <= context.MaxEvictBytes);
+ context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
+ const bool tryEvictPortion = context.AllowEviction && ttl.HasTiers();
+
+ if (auto max = info->MaxValue(ttlColumnId)) {
+ bool keep = !expireTimestampOpt;
+ if (expireTimestampOpt) {
+ auto mpiOpt = ttl.Ttl->ScalarToInstant(max);
+ Y_ABORT_UNLESS(mpiOpt);
+ const TInstant maxTtlPortionInstant = *mpiOpt;
+ const TDuration d = maxTtlPortionInstant - *expireTimestampOpt;
+ keep = !!d;
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds());
+ if (d && dWaiting > d) {
+ dWaiting = d;
+ }
}
- context.AllowEviction = (evictionSize <= context.MaxEvictBytes);
- context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
- const bool tryEvictPortion = context.AllowEviction && ttl.HasTiers();
-
- if (auto max = info->MaxValue(ttlColumnId)) {
- bool keep = !expireTimestampOpt;
- if (expireTimestampOpt) {
- auto mpiOpt = ttl.Ttl->ScalarToInstant(max);
- Y_ABORT_UNLESS(mpiOpt);
- const TInstant maxTtlPortionInstant = *mpiOpt;
- const TDuration d = maxTtlPortionInstant - *expireTimestampOpt;
- keep = !!d;
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds());
- if (d && dWaiting > d) {
- dWaiting = d;
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop);
+ if (keep && tryEvictPortion) {
+ const TString currentTierName = info->GetMeta().GetTierName() ? info->GetMeta().GetTierName() : IStoragesManager::DefaultStorageId;
+ TString tierName = "";
+ for (auto& tierRef : ttl.GetOrderedTiers()) {
+ auto& tierInfo = tierRef.Get();
+ if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
+ SignalCounters.OnPortionNoTtlColumn(info->BlobsBytes());
+ continue;
}
- }
-
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop);
- if (keep && tryEvictPortion) {
- const TString currentTierName = info->GetMeta().GetTierName() ? info->GetMeta().GetTierName() : IStoragesManager::DefaultStorageId;
- TString tierName = "";
- for (auto& tierRef : ttl.GetOrderedTiers()) {
- auto& tierInfo = tierRef.Get();
- if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
- SignalCounters.OnPortionNoTtlColumn(info->BlobsBytes());
- continue;
- }
- auto mpiOpt = tierInfo.ScalarToInstant(max);
- Y_ABORT_UNLESS(mpiOpt);
- const TInstant maxTieringPortionInstant = *mpiOpt;
-
- const TDuration d = tierInfo.GetEvictInstant(context.Now) - maxTieringPortionInstant;
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering_choosing")("max", maxTieringPortionInstant.Seconds())
- ("evict", tierInfo.GetEvictInstant(context.Now).Seconds())("tier_name", tierInfo.GetName())("d", d);
- if (d) {
- tierName = tierInfo.GetName();
- break;
- } else {
- auto dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(context.Now);
- if (dWaiting > dWaitLocal) {
- dWaiting = dWaitLocal;
- }
+ auto mpiOpt = tierInfo.ScalarToInstant(max);
+ Y_ABORT_UNLESS(mpiOpt);
+ const TInstant maxTieringPortionInstant = *mpiOpt;
+
+ const TDuration d = tierInfo.GetEvictInstant(context.Now) - maxTieringPortionInstant;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering_choosing")("max", maxTieringPortionInstant.Seconds())
+ ("evict", tierInfo.GetEvictInstant(context.Now).Seconds())("tier_name", tierInfo.GetName())("d", d);
+ if (d) {
+ tierName = tierInfo.GetName();
+ break;
+ } else {
+ auto dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(context.Now);
+ if (dWaiting > dWaitLocal) {
+ dWaiting = dWaitLocal;
}
}
- if (!tierName) {
- tierName = IStoragesManager::DefaultStorageId;
- }
- if (currentTierName != tierName) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", currentTierName)("to", tierName);
- evictionSize += info->BlobsSizes().first;
- context.Changes->AddPortionToEvict(*info, TPortionEvictionFeatures(tierName, pathId, StoragesManager->GetOperator(tierName)));
- SignalCounters.OnPortionToEvict(info->BlobsBytes());
- }
}
- if (!keep && context.AllowDrop) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString());
- dropBlobs += info->NumBlobs();
- AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second);
- SignalCounters.OnPortionToDrop(info->BlobsBytes());
+ if (!tierName) {
+ tierName = IStoragesManager::DefaultStorageId;
+ }
+ if (currentTierName != tierName) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", currentTierName)("to", tierName);
+ evictionSize += info->BlobsSizes().first;
+ context.Changes->AddPortionToEvict(*info, TPortionEvictionFeatures(tierName, pathId, StoragesManager->GetOperator(tierName)));
+ SignalCounters.OnPortionToEvict(info->BlobsBytes());
}
- } else {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max");
- SignalCounters.OnPortionNoBorder(info->BlobsBytes());
}
+ if (!keep && context.AllowDrop) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString());
+ dropBlobs += info->NumBlobs();
+ AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second);
+ SignalCounters.OnPortionToDrop(info->BlobsBytes());
+ }
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max");
+ SignalCounters.OnPortionNoBorder(info->BlobsBytes());
}
}
if (dWaiting > TDuration::MilliSeconds(500) && (!context.AllowEviction || !context.AllowDrop)) {
@@ -537,31 +460,6 @@ std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const TH
return changes;
}
-std::vector<std::vector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks(ui64 pathId) const {
- Y_ABORT_UNLESS(PathGranules.contains(pathId));
- const auto& pathGranules = PathGranules.find(pathId)->second;
-
- std::vector<std::vector<std::pair<TMark, ui64>>> emptyGranules;
- ui64 emptyStart = 0;
- for (const auto& [mark, granule]: pathGranules) {
- Y_ABORT_UNLESS(Granules.contains(granule));
- auto spg = Granules.find(granule)->second;
- Y_ABORT_UNLESS(spg);
-
- if (spg->Empty()) {
- if (!emptyStart) {
- emptyGranules.push_back({});
- emptyStart = granule;
- }
- emptyGranules.back().emplace_back(mark, granule);
- } else if (emptyStart) {
- emptyStart = 0;
- }
- }
-
- return emptyGranules;
-}
-
bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept {
{
TFinalizationContext context(LastGranule, LastPortion, snapshot);
@@ -582,37 +480,11 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
return true;
}
-void TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_granule")("granule", rec.DebugString());
- const TMark mark(rec.Mark);
-
- AFL_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second)("event", "marker_duplication")("granule_id", rec.Granule)("old_granule_id", PathGranules[rec.PathId][mark]);
- AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second)("event", "granule_duplication")
- ("rec_path_id", rec.PathId)("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString());
-}
-
-std::optional<ui64> TColumnEngineForLogs::NewGranule(const TGranuleRecord& rec) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_granule")("granule", rec.DebugString());
- const TMark mark(rec.Mark);
-
- auto insertInfo = PathGranules[rec.PathId].emplace(mark, rec.Granule);
- if (insertInfo.second) {
- AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second)("event", "granule_duplication")
- ("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString());
- return {};
- } else {
- return insertInfo.first->second;
- }
-}
-
-void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) {
- Y_ABORT_UNLESS(PathGranules.contains(pathId));
- auto it = Granules.find(granule);
- Y_ABORT_UNLESS(it != Granules.end());
+void TColumnEngineForLogs::EraseTable(const ui64 pathId) {
+ auto it = Tables.find(pathId);
+ Y_ABORT_UNLESS(it != Tables.end());
Y_ABORT_UNLESS(it->second->IsErasable());
- Granules.erase(it);
- EmptyGranules.erase(granule);
- PathGranules[pathId].erase(mark);
+ Tables.erase(it);
}
void TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo) {
@@ -622,15 +494,13 @@ void TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, const
UpdatePortionStats(portionInfo, EStatsUpdateType::ADD);
}
- GetGranulePtrVerified(portionInfo.GetGranule())->UpsertPortion(portionInfo);
+ GetGranulePtrVerified(portionInfo.GetPathId())->UpsertPortion(portionInfo);
}
bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool updateStats) {
Y_ABORT_UNLESS(!portionInfo.Empty());
const ui64 portion = portionInfo.GetPortion();
- auto it = Granules.find(portionInfo.GetGranule());
- Y_ABORT_UNLESS(it != Granules.end());
- auto& spg = it->second;
+ auto spg = GetGranulePtrVerified(portionInfo.GetPathId());
Y_ABORT_UNLESS(spg);
auto p = spg->GetPortionPtr(portion);
@@ -647,80 +517,28 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up
}
std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot snapshot,
- const THashSet<ui32>& /*columnIds*/,
- const TPKRangesFilter& pkRangesFilter) const
-{
+ const TPKRangesFilter& pkRangesFilter) const {
auto out = std::make_shared<TSelectInfo>();
- if (!PathGranules.contains(pathId)) {
+ auto itTable = Tables.find(pathId);
+ if (itTable == Tables.end()) {
return out;
}
- auto& pathGranules = PathGranules.find(pathId)->second;
- if (pathGranules.empty()) {
- return out;
- }
- out->Granules.reserve(pathGranules.size());
- std::optional<TMarksMap::const_iterator> previousIterator;
-
- for (auto&& filter : pkRangesFilter) {
- std::optional<NArrow::TReplaceKey> indexKeyFrom = filter.KeyFrom(GetIndexKey());
- std::optional<NArrow::TReplaceKey> indexKeyTo = filter.KeyTo(GetIndexKey());
-
- std::shared_ptr<arrow::Scalar> keyFrom;
- std::shared_ptr<arrow::Scalar> keyTo;
- if (indexKeyFrom) {
- keyFrom = NArrow::TReplaceKey::ToScalar(*indexKeyFrom, 0);
- }
- if (indexKeyTo) {
- keyTo = NArrow::TReplaceKey::ToScalar(*indexKeyTo, 0);
- }
-
- auto it = pathGranules.begin();
- if (keyFrom) {
- it = pathGranules.lower_bound(*keyFrom);
- if (it != pathGranules.begin()) {
- --it;
- }
- }
-
- if (previousIterator && (previousIterator == pathGranules.end() || it->first < (*previousIterator)->first)) {
- it = *previousIterator;
- }
- for (; it != pathGranules.end(); ++it) {
- auto& mark = it->first;
- ui64 granule = it->second;
- if (keyTo && mark > *keyTo) {
- break;
- }
-
- auto it = Granules.find(granule);
- Y_ABORT_UNLESS(it != Granules.end());
- auto& spg = it->second;
- Y_ABORT_UNLESS(spg);
- bool granuleHasDataForSnaphsot = false;
-
- for (const auto& [_, keyPortions] : spg->GroupOrderedPortionsByPK()) {
- for (auto&& [_, portionInfo] : keyPortions) {
- if (!portionInfo->IsVisible(snapshot)) {
- continue;
- }
- Y_ABORT_UNLESS(portionInfo->Produced());
- const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo());
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")
- ("granule", granule)("portion", portionInfo->DebugString());
- if (skipPortion) {
- continue;
- }
- out->PortionsOrderedPK.emplace_back(portionInfo);
- granuleHasDataForSnaphsot = true;
- }
+ auto spg = itTable->second;
+ for (const auto& [indexKey, keyPortions] : spg->GroupOrderedPortionsByPK()) {
+ for (auto&& [_, portionInfo] : keyPortions) {
+ if (!portionInfo->IsVisible(snapshot)) {
+ continue;
}
-
- if (granuleHasDataForSnaphsot) {
- out->Granules.push_back(spg->Record);
+ Y_ABORT_UNLESS(portionInfo->Produced());
+ const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")
+ ("pathId", pathId)("portion", portionInfo->DebugString());
+ if (skipPortion) {
+ continue;
}
+ out->PortionsOrderedPK.emplace_back(portionInfo);
}
- previousIterator = it;
}
return out;
@@ -739,6 +557,10 @@ void TColumnEngineForLogs::OnTieringModified(std::shared_ptr<NColumnShard::TTier
}
+void TColumnEngineForLogs::DoRegisterTable(const ui64 pathId) {
+ AFL_VERIFY(Tables.emplace(pathId, std::make_shared<TGranuleMeta>(pathId, GranulesStorage, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex)).second);
+}
+
TColumnEngineForLogs::TTieringProcessContext::TTieringProcessContext(const ui64 maxEvictBytes, std::shared_ptr<TTTLColumnEngineChanges> changes, const THashSet<TPortionAddress>& busyPortions)
: Now(TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now())
, MaxEvictBytes(maxEvictBytes)
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 53a2dbbf8f8..fff06d8a05d 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -143,8 +143,9 @@ public:
ui64 MemoryUsage() const override;
TSnapshot LastUpdate() const override { return LastSnapshot; }
+ virtual void DoRegisterTable(const ui64 pathId) override;
public:
- bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override;
+ bool Load(IDbWrapper& db) override;
std::shared_ptr<TInsertColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept override;
std::shared_ptr<TColumnEngineChanges> StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept override;
@@ -160,36 +161,46 @@ public:
std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot,
- const THashSet<ui32>& columnIds,
const TPKRangesFilter& pkRangesFilter) const override;
- bool IsPortionExists(const ui64 granuleId, const ui64 portionId) const {
- auto it = Granules.find(granuleId);
- if (it == Granules.end()) {
+ bool IsPortionExists(const ui64 pathId, const ui64 portionId) const {
+ auto it = Tables.find(pathId);
+ if (it == Tables.end()) {
return false;
}
return !!it->second->GetPortionPtr(portionId);
}
- bool IsGranuleExists(const ui64 granuleId) const {
- return !!GetGranuleOptional(granuleId);
+ virtual bool HasDataInPathId(const ui64 pathId) const override {
+ auto g = GetGranuleOptional(pathId);
+ if (!g) {
+ return false;
+ }
+ if (g->GetPortions().size()) {
+ return false;
+ }
+ return true;
+ }
+
+ bool IsGranuleExists(const ui64 pathId) const {
+ return !!GetGranuleOptional(pathId);
}
const TGranuleMeta& GetGranuleVerified(const ui64 granuleId) const {
- auto it = Granules.find(granuleId);
- AFL_VERIFY(it != Granules.end())("granule_id", granuleId)("count", Granules.size());
+ auto it = Tables.find(granuleId);
+ AFL_VERIFY(it != Tables.end())("granule_id", granuleId)("count", Tables.size());
return *it->second;
}
- std::shared_ptr<TGranuleMeta> GetGranulePtrVerified(const ui64 granuleId) const {
- auto result = GetGranuleOptional(granuleId);
- Y_ABORT_UNLESS(result);
+ std::shared_ptr<TGranuleMeta> GetGranulePtrVerified(const ui64 pathId) const {
+ auto result = GetGranuleOptional(pathId);
+ AFL_VERIFY(result)("path_id", pathId);
return result;
}
- std::shared_ptr<TGranuleMeta> GetGranuleOptional(const ui64 granuleId) const {
- auto it = Granules.find(granuleId);
- if (it == Granules.end()) {
+ std::shared_ptr<TGranuleMeta> GetGranuleOptional(const ui64 pathId) const {
+ auto it = Tables.find(pathId);
+ if (it == Tables.end()) {
return nullptr;
}
return it->second;
@@ -204,15 +215,10 @@ private:
TVersionedIndex VersionedIndex;
ui64 TabletId;
- std::shared_ptr<TGranulesTable> GranulesTable;
std::shared_ptr<TColumnsTable> ColumnsTable;
std::shared_ptr<TCountersTable> CountersTable;
- THashMap<ui64, std::shared_ptr<TGranuleMeta>> Granules; // granule -> meta
- THashMap<ui64, TMarksMap> PathGranules; // path_id -> {mark, granule}
+ THashMap<ui64, std::shared_ptr<TGranuleMeta>> Tables; // pathId into Granule that equal to Table
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
- /// Set of empty granules.
- /// Just for providing count of empty granules.
- THashSet<ui64> EmptyGranules;
std::map<TSnapshot, std::vector<TPortionInfo>> CleanupPortions;
TColumnEngineStats Counters;
ui64 LastPortion;
@@ -232,15 +238,11 @@ private:
return *CachedDefaultMark;
}
- bool LoadGranules(IDbWrapper& db);
- bool LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs);
+ bool LoadColumns(IDbWrapper& db);
bool LoadCounters(IDbWrapper& db);
- void EraseGranule(ui64 pathId, ui64 granule, const TMark& mark);
+ void EraseTable(const ui64 pathId);
- /// Insert granule or check if same granule was already inserted.
- void SetGranule(const TGranuleRecord& rec);
- std::optional<ui64> NewGranule(const TGranuleRecord& rec);
void UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo = nullptr);
bool ErasePortion(const TPortionInfo& portionInfo, bool updateStats = true);
void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT,
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index 4bef7aa1408..ff825546b15 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -40,21 +40,6 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable,
return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, insertTable, loadTime);
}
-void TDbWrapper::WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) {
- NIceDb::TNiceDb db(Database);
- NColumnShard::Schema::IndexGranules_Write(db, index, engine, row);
-}
-
-void TDbWrapper::EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) {
- NIceDb::TNiceDb db(Database);
- NColumnShard::Schema::IndexGranules_Erase(db, index, engine, row);
-}
-
-bool TDbWrapper::LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) {
- NIceDb::TNiceDb db(Database);
- return NColumnShard::Schema::IndexGranules_Load(db, index, engine, callback);
-}
-
void TDbWrapper::WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
NColumnShard::Schema::IndexColumns_Write(db, index, portion, row);
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h
index 26640f5bf11..c44bc713001 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.h
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.h
@@ -29,10 +29,6 @@ public:
virtual bool Load(TInsertTableAccessor& insertTable,
const TInstant& loadTime) = 0;
- virtual void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0;
- virtual void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0;
- virtual bool LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) = 0;
-
virtual void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual void EraseColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0;
@@ -58,10 +54,6 @@ public:
bool Load(TInsertTableAccessor& insertTable,
const TInstant& loadTime) override;
- void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override;
- void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override;
- bool LoadGranules(ui32 index, const IColumnEngine& engine, const std::function<void(const TGranuleRecord&)>& callback) override;
-
void WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
void EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
bool LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override;
diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h
deleted file mode 100644
index 249ee1d7c99..00000000000
--- a/ydb/core/tx/columnshard/engines/granules_table.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#pragma once
-#include "db_wrapper.h"
-#include <ydb/core/formats/arrow/replace_key.h>
-
-namespace NKikimr::NOlap {
-
-struct TGranuleRecord {
-private:
- TSnapshot CreatedAt;
-public:
- ui64 PathId;
- ui64 Granule;
- NArrow::TStoreReplaceKey Mark;
-
- TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const NArrow::TReplaceKey& mark)
- : CreatedAt(createdAt)
- , PathId(pathId)
- , Granule(granule)
- , Mark(mark)
- {
- Y_ABORT_UNLESS(Mark.Size());
- }
-
- const TSnapshot& GetCreatedAt() const {
- return CreatedAt;
- }
-
- bool operator == (const TGranuleRecord& rec) const {
- return (PathId == rec.PathId) && (Mark == rec.Mark);
- }
-
- friend IOutputStream& operator << (IOutputStream& out, const TGranuleRecord& rec) {
- out << '{';
- auto& snap = rec.CreatedAt;
- out << rec.PathId << '#' << rec.Granule << ' '
- << snap;
- out << '}';
- return out;
- }
-
- TString DebugString() const {
- return TStringBuilder() << *this;
- }
-};
-
-class TGranulesTable {
-public:
- TGranulesTable(const IColumnEngine& engine, ui32 indexId)
- : Engine(engine)
- , IndexId(indexId)
- {}
-
- void Write(IDbWrapper& db, const TGranuleRecord& row) {
- db.WriteGranule(IndexId, Engine, row);
- }
-
- void Erase(IDbWrapper& db, const TGranuleRecord& row) {
- db.EraseGranule(IndexId, Engine, row);
- }
-
- bool Load(IDbWrapper& db, const std::function<void(const TGranuleRecord&)>& callback) {
- return db.LoadGranules(IndexId, Engine, callback);
- }
-
-private:
- const IColumnEngine& Engine;
- ui32 IndexId;
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index b197f226e20..1b7f48b47e3 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -71,7 +71,7 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
}
TPortionInfo TPortionInfo::CopyWithFilteredColumns(const THashSet<ui32>& columnIds) const {
- TPortionInfo result(Granule, Portion, GetMinSnapshot(), BlobsOperator);
+ TPortionInfo result(PathId, Portion, GetMinSnapshot(), BlobsOperator);
result.Meta = Meta;
result.Records.reserve(columnIds.size());
@@ -112,7 +112,7 @@ int TPortionInfo::CompareMinByPk(const TPortionInfo& item, const TIndexInfo& inf
TString TPortionInfo::DebugString(const bool withDetails) const {
TStringBuilder sb;
sb << "(portion_id:" << Portion << ";" <<
- "granule_id:" << Granule << ";records_count:" << NumRows() << ";"
+ "path_id:" << PathId << ";records_count:" << NumRows() << ";"
"min_schema_snapshot:(" << MinSnapshot.DebugString() << ");";
if (withDetails) {
sb <<
@@ -184,7 +184,7 @@ size_t TPortionInfo::NumBlobs() const {
}
bool TPortionInfo::IsEqualWithSnapshots(const TPortionInfo& item) const {
- return Granule == item.Granule && MinSnapshot == item.MinSnapshot
+ return PathId == item.PathId && MinSnapshot == item.MinSnapshot
&& Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot;
}
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 87a9af9f7cf..b407c9b7183 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -15,8 +15,8 @@ struct TIndexInfo;
class TPortionInfo {
private:
TPortionInfo() = default;
- ui64 Granule = 0;
- ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in granule
+ ui64 PathId = 0;
+ ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in pathId
TSnapshot MinSnapshot = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion}
TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one)
@@ -24,6 +24,10 @@ private:
TPortionMeta Meta;
std::shared_ptr<NOlap::IBlobsStorageOperator> BlobsOperator;
public:
+ ui64 GetPathId() const {
+ return PathId;
+ }
+
bool OlderThen(const TPortionInfo& info) const {
return RecordSnapshotMin() < info.RecordSnapshotMin();
}
@@ -123,8 +127,8 @@ public:
bool Empty() const { return Records.empty(); }
bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; }
- bool Valid() const { return MinSnapshot.Valid() && Granule && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
- bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && Granule && Portion; }
+ bool Valid() const { return MinSnapshot.Valid() && PathId && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
+ bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && PathId && Portion; }
bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; }
bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; }
bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ }
@@ -140,8 +144,8 @@ public:
return TPortionInfo();
}
- TPortionInfo(const ui64 granuleId, const ui64 portionId, const TSnapshot& minSnapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& blobsOperator)
- : Granule(granuleId)
+ TPortionInfo(const ui64 pathId, const ui64 portionId, const TSnapshot& minSnapshot, const std::shared_ptr<NOlap::IBlobsStorageOperator>& blobsOperator)
+ : PathId(pathId)
, Portion(portionId)
, MinSnapshot(minSnapshot)
, BlobsOperator(blobsOperator)
@@ -176,15 +180,15 @@ public:
}
ui64 GetGranule() const {
- return Granule;
+ return PathId;
}
TPortionAddress GetAddress() const {
- return TPortionAddress(Granule, Portion);
+ return TPortionAddress(PathId, Portion);
}
- void SetGranule(const ui64 granule) {
- Granule = granule;
+ void SetPathId(const ui64 pathId) {
+ PathId = pathId;
}
void SetPortion(const ui64 portion) {
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index 3f60021c20a..d7778fbc5f9 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -33,7 +33,6 @@ target_link_libraries(columnshard-engines-reader PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index 27028241125..6b2ab750e53 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -34,7 +34,6 @@ target_link_libraries(columnshard-engines-reader PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index 27028241125..6b2ab750e53 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -34,7 +34,6 @@ target_link_libraries(columnshard-engines-reader PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index 3f60021c20a..d7778fbc5f9 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -33,7 +33,6 @@ target_link_libraries(columnshard-engines-reader PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/common.cpp b/ydb/core/tx/columnshard/engines/reader/common.cpp
deleted file mode 100644
index 46168595a26..00000000000
--- a/ydb/core/tx/columnshard/engines/reader/common.cpp
+++ /dev/null
@@ -1,17 +0,0 @@
-#include "common.h"
-#include <util/string/builder.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-TString TBatchAddress::ToString() const {
- return TStringBuilder() << GranuleId << "," << BatchGranuleIdx;
-}
-
-TBatchAddress::TBatchAddress(const ui32 granuleId, const ui32 batchGranuleIdx)
- : GranuleId(granuleId)
- , BatchGranuleIdx(batchGranuleIdx)
-{
-
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/common.h b/ydb/core/tx/columnshard/engines/reader/common.h
deleted file mode 100644
index 30ad233b2e2..00000000000
--- a/ydb/core/tx/columnshard/engines/reader/common.h
+++ /dev/null
@@ -1,26 +0,0 @@
-#pragma once
-#include <ydb/library/accessor/accessor.h>
-#include <util/system/types.h>
-#include <util/generic/string.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TBatchAddress {
-private:
- ui32 GranuleId = 0;
- ui32 BatchGranuleIdx = 0;
-public:
- TString ToString() const;
-
- TBatchAddress(const ui32 granuleId, const ui32 batchGranuleIdx);
-
- ui32 GetGranuleId() const {
- return GranuleId;
- }
-
- ui32 GetBatchGranuleIdx() const {
- return BatchGranuleIdx;
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index df6f8e6ad35..df02e8933ef 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -13,13 +13,12 @@ TDataStorageAccessor::TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertT
, Index(index)
{}
-std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const {
+std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& /*columnIds*/) const {
if (readDescription.ReadNothing) {
return std::make_shared<NOlap::TSelectInfo>();
}
return Index->Select(readDescription.PathId,
readDescription.GetSnapshot(),
- columnIds,
readDescription.PKRangesFilter);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make
index 2ca09c00d53..f673de8200e 100644
--- a/ydb/core/tx/columnshard/engines/reader/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/ya.make
@@ -1,7 +1,6 @@
LIBRARY()
SRCS(
- common.cpp
conveyor_task.cpp
description.cpp
queue.cpp
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 843c3a098da..12dbd0a9919 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -17,9 +17,9 @@ ui64 TGranuleMeta::Size() const {
}
void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("granule", GetGranuleId());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("path_id", GetPathId());
auto it = Portions.find(info.GetPortion());
- AFL_VERIFY(info.GetGranule() == GetGranuleId())("event", "incompatible_granule")("portion", info.DebugString())("granule", GetGranuleId());
+ AFL_VERIFY(info.GetGranule() == GetPathId())("event", "incompatible_granule")("portion", info.DebugString())("path_id", GetPathId());
AFL_VERIFY(info.Valid())("event", "invalid_portion")("portion", info.DebugString());
AFL_VERIFY(info.ValidSnapshotInfo())("event", "incorrect_portion_snapshots")("portion", info.DebugString());
@@ -41,10 +41,10 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
bool TGranuleMeta::ErasePortion(const ui64 portion) {
auto it = Portions.find(portion);
if (it == Portions.end()) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased_already")("portion_id", portion)("pathId", Record.PathId);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased_already")("portion_id", portion)("pathId", PathId);
return false;
} else {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second->DebugString())("pathId", Record.PathId);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second->DebugString())("pathId", PathId);
}
OnBeforeChangePortion(it->second);
Portions.erase(it);
@@ -160,14 +160,14 @@ const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary(
return *AdditiveSummaryCache;
}
-TGranuleMeta::TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex)
- : Owner(owner)
+TGranuleMeta::TGranuleMeta(const ui64 pathId, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex)
+ : PathId(pathId)
+ , Owner(owner)
, Counters(counters)
, PortionInfoGuard(Owner->GetCounters().BuildPortionBlobsGuard())
- , Record(rec)
{
Y_ABORT_UNLESS(Owner);
- OptimizerPlanner = std::make_shared<NStorageOptimizer::NLevels::TLevelsOptimizerPlanner>(rec.Granule, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey());
+ OptimizerPlanner = std::make_shared<NStorageOptimizer::NLevels::TLevelsOptimizerPlanner>(PathId, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey());
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 550d9ea0daf..d7ec376cc9a 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -164,6 +164,7 @@ private:
std::set<EActivity> Activity;
mutable bool AllowInsertionFlag = false;
+ const ui64 PathId;
std::shared_ptr<TGranulesStorage> Owner;
const NColumnShard::TGranuleDataCounters Counters;
NColumnShard::TEngineLogsCounters::TPortionsInfoGuard PortionInfoGuard;
@@ -229,7 +230,7 @@ public:
bool NeedCompaction(const TCompactionLimits& limits) const {
if (InCompaction() || Empty()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_skipped_by_state")("granule_id", GetGranuleId())("granule_size", Size());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_skipped_by_state")("path_id", GetPathId())("granule_size", Size());
return false;
}
return GetCompactionType(limits) != TGranuleAdditiveSummary::ECompactionClass::NoCompaction;
@@ -238,7 +239,7 @@ public:
bool InCompaction() const;
bool IsErasable() const {
- return Activity.empty();
+ return Activity.empty() && Portions.empty();
}
void OnCompactionStarted();
@@ -249,16 +250,14 @@ public:
void UpsertPortion(const TPortionInfo& info);
TString DebugString() const {
- return TStringBuilder() << "(granule:" << GetGranuleId() << ";"
- << "path_id:" << Record.PathId << ";"
+ return TStringBuilder() << "(granule:" << GetPathId() << ";"
+ << "path_id:" << GetPathId() << ";"
<< "size:" << GetAdditiveSummary().GetGranuleSize() << ";"
<< "portions_count:" << Portions.size() << ";"
<< ")"
;
}
- const TGranuleRecord Record;
-
void AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta);
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const {
@@ -266,7 +265,11 @@ public:
}
ui64 GetPathId() const {
- return Record.PathId;
+ return PathId;
+ }
+
+ ui64 GetGranuleId() const {
+ return PathId;
}
const TPortionInfo& GetPortionVerified(const ui64 portion) const {
@@ -285,12 +288,8 @@ public:
bool ErasePortion(const ui64 portion);
- explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex);
+ explicit TGranuleMeta(const ui64 pathId, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex);
- ui64 GetGranuleId() const {
- return Record.Granule;
- }
- ui64 PathId() const noexcept { return Record.PathId; }
bool Empty() const noexcept { return Portions.empty(); }
ui64 Size() const;
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
index 91a5df954ba..db486568fc0 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
@@ -56,8 +56,7 @@ protected:
virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) = 0;
virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const = 0;
virtual TOptimizationPriority DoGetUsefulMetric() const = 0;
- virtual void DoActualize(const TInstant /*currentInstant*/) {
- }
+ virtual void DoActualize(const TInstant currentInstant) = 0;
virtual TString DoDebugString() const {
return "";
}
@@ -107,9 +106,7 @@ public:
return DoDebugString();
}
- virtual std::vector<NIndexedReader::TSortableBatchPosition> GetBucketPositions() const {
- return {};
- }
+ virtual std::vector<NIndexedReader::TSortableBatchPosition> GetBucketPositions() const = 0;
NJson::TJsonValue SerializeToJsonVisual() const {
return DoSerializeToJsonVisual();
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h
index b7c6933d572..e0f14588424 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h
@@ -468,6 +468,10 @@ private:
const std::shared_ptr<IStoragesManager> StoragesManager;
std::shared_ptr<TCounters> Counters;
protected:
+ virtual std::vector<NIndexedReader::TSortableBatchPosition> GetBucketPositions() const override {
+ return {};
+ }
+
virtual void DoModifyPortions(const std::vector<std::shared_ptr<TPortionInfo>>& add, const std::vector<std::shared_ptr<TPortionInfo>>& remove) override {
const TInstant currentInstant = TInstant::Now();
for (auto&& i : add) {
@@ -501,7 +505,9 @@ protected:
virtual TString DoDebugString() const override {
return "";
}
+ virtual void DoActualize(const TInstant /*currentInstant*/) override {
+ }
public:
TLevelsOptimizerPlanner(const ui64 granuleId, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<arrow::Schema>& primaryKeysSchema)
: TBase(granuleId)
diff --git a/ydb/core/tx/columnshard/engines/storage/storage.cpp b/ydb/core/tx/columnshard/engines/storage/storage.cpp
index 543f042f527..02ce7fd8e15 100644
--- a/ydb/core/tx/columnshard/engines/storage/storage.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/storage.cpp
@@ -29,31 +29,6 @@ std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCom
return nullptr;
}
return granule;
-/*
- for (auto it = GranuleCompactionPrioritySorting.rbegin(); it != GranuleCompactionPrioritySorting.rend(); ++it) {
- if (it->first.GetWeight().IsZero()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "zero_granule_reached");
- break;
- }
- Y_ABORT_UNLESS(it->second.size());
- for (auto&& i : it->second) {
- auto itGranule = granules.find(i);
- Y_ABORT_UNLESS(itGranule != granules.end());
- if (it->first.GetWeight().GetInternalLevelWeight() > 0 * 1024 * 1024) {
-
-// if (it->first.GetWeight().GetInternalLevelWeight() / 10000000 > 100 ||
-// it->first.GetWeight().GetInternalLevelWeight() % 10000000 > 100000) {
-
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule")("granule_stats", it->first.DebugString())("granule_id", i);
- return itGranule->second;
- } else {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule_skipped")("granule_stats", it->first.DebugString())("granule_id", i)("skip_reason", "too_early_and_low_critical");
- break;
- }
- }
- }
- return {};
-*/
}
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index fdb9d385931..5f0848436ae 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -27,10 +27,6 @@ public:
return true;
}
- void WriteGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {}
- void EraseGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {}
- bool LoadGranules(ui32, const IColumnEngine&, const std::function<void(const TGranuleRecord&)>&) override { return true; }
-
void WriteColumn(ui32, const TPortionInfo&, const TColumnRecord&) override {}
void EraseColumn(ui32, const TPortionInfo&, const TColumnRecord&) override {}
bool LoadColumns(ui32, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>&) override { return true; }
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 85aead27a33..2f97cca3ce5 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -22,8 +22,7 @@ private:
std::map<TPortionAddress, std::map<TChunkAddress, TColumnChunkLoadContext>> LoadContexts;
public:
struct TIndex {
- THashMap<ui64, std::vector<TGranuleRecord>> Granules; // pathId -> granule
- THashMap<ui64, THashMap<ui64, TPortionInfo>> Columns; // granule -> portions
+ THashMap<ui64, THashMap<ui64, TPortionInfo>> Columns; // pathId -> portions
THashMap<ui32, ui64> Counters;
};
@@ -68,45 +67,6 @@ public:
return true;
}
- void WriteGranule(ui32 index, const IColumnEngine&, const TGranuleRecord& row) override {
- auto& granules = Indices[index].Granules[row.PathId];
-
- bool replaced = false;
- for (auto& rec : granules) {
- if (rec == row) {
- rec = row;
- replaced = true;
- break;
- }
- }
- if (!replaced) {
- granules.push_back(row);
- }
- }
-
- void EraseGranule(ui32 index, const IColumnEngine&, const TGranuleRecord& row) override {
- auto& pathGranules = Indices[index].Granules[row.PathId];
-
- std::vector<TGranuleRecord> filtered;
- filtered.reserve(pathGranules.size());
- for (const TGranuleRecord& rec : pathGranules) {
- if (rec.Granule != row.Granule) {
- filtered.push_back(rec);
- }
- }
- pathGranules.swap(filtered);
- }
-
- bool LoadGranules(ui32 index, const IColumnEngine&, const std::function<void(const TGranuleRecord&)>& callback) override {
- auto& granules = Indices[index].Granules;
- for (auto& [pathId, vec] : granules) {
- for (const auto& rec : vec) {
- callback(rec);
- }
- }
- return true;
- }
-
void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) override {
auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk);
auto rowProto = row.GetMeta().SerializeToProto();
@@ -439,8 +399,10 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
TSnapshot indexSnaphot(1, 1);
engine.UpdateDefaultSchema(indexSnaphot, TIndexInfo(tableInfo));
- THashSet<TUnifiedBlobId> lostBlobs;
- engine.Load(db, lostBlobs);
+ for (auto&& i : paths) {
+ engine.RegisterTable(i);
+ }
+ engine.Load(db);
std::vector<TInsertedData> dataToIndex = {
TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, 0, {}),
@@ -469,21 +431,21 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // select from snap before insert
ui64 planStep = 1;
ui64 txId = 0;
- auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
}
{ // select from snap between insert (greater txId)
ui64 planStep = 1;
ui64 txId = 2;
- auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
}
{ // select from snap after insert (greater planStep)
ui64 planStep = 2;
ui64 txId = 1;
- auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSpecialColumnNames().size());
}
@@ -491,7 +453,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // select another pathId
ui64 planStep = 2;
ui64 txId = 1;
- auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
}
}
@@ -516,14 +478,14 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TTestDbWrapper db;
TIndexInfo tableInfo = NColumnShard::BuildTableInfo(ydbSchema, key);
+ ui64 pathId = 1;
+ ui32 step = 1000;
+
TSnapshot indexSnapshot(1, 1);
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
- THashSet<TUnifiedBlobId> lostBlobs;
- engine.Load(db, lostBlobs);
-
- ui64 pathId = 1;
- ui32 step = 1000;
+ engine.RegisterTable(pathId);
+ engine.Load(db);
// insert
ui64 planStep = 1;
@@ -562,7 +524,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
}
@@ -576,7 +538,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
NOlap::TPKRangesFilter pkFilter(false);
Y_ABORT_UNLESS(pkFilter.Add(gt10k, nullptr, nullptr));
- auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter);
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
}
@@ -588,7 +550,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
NOlap::TPKRangesFilter pkFilter(false);
Y_ABORT_UNLESS(pkFilter.Add(nullptr, lt10k, nullptr));
- auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter);
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9);
}
}
@@ -621,8 +583,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
TSnapshot indexSnapshot(1, 1);
engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
- THashSet<TUnifiedBlobId> lostBlobs;
- engine.Load(db, lostBlobs);
+ engine.RegisterTable(pathId);
+ engine.Load(db);
ui64 numRows = 1000;
ui64 rowPos = 0;
@@ -648,7 +610,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // check it's overloaded after reload
TColumnEngineForLogs tmpEngine(0, TestLimits(), CommonStoragesManager);
tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
- tmpEngine.Load(db, lostBlobs);
+ tmpEngine.RegisterTable(pathId);
+ tmpEngine.Load(db);
}
// compact
@@ -678,7 +641,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // check it's not overloaded after reload
TColumnEngineForLogs tmpEngine(0, TestLimits(), CommonStoragesManager);
tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
- tmpEngine.Load(db, lostBlobs);
+ tmpEngine.RegisterTable(pathId);
+ tmpEngine.Load(db);
}
}
@@ -691,12 +655,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// insert
ui64 planStep = 1;
- THashSet<TUnifiedBlobId> lostBlobs;
TSnapshot indexSnapshot(1, 1);
{
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
- engine.Load(db, lostBlobs);
+ engine.RegisterTable(pathId);
+ engine.Load(db);
ui64 numRows = 1000;
ui64 rowPos = 0;
@@ -729,7 +693,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
}
@@ -738,7 +702,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
}
@@ -754,7 +718,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
}
}
@@ -762,15 +726,15 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// load
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
- engine.Load(db, lostBlobs);
- UNIT_ASSERT_VALUES_EQUAL(engine.GetTotalStats().EmptyGranules, 0);
+ engine.RegisterTable(pathId);
+ engine.Load(db);
const TIndexInfo& indexInfo = engine.GetVersionedIndex().GetLastSchema()->GetIndexInfo();
THashSet<ui32> oneColumnId = {indexInfo.GetColumnId(testColumns[0].first)};
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
}
}
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index 403141f6127..f1fb371120f 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -13,8 +13,7 @@ void TSchemaPreset::Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset&
Name = presetProto.GetName();
}
-bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId) {
- TabletId = tabletId;
+bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
{
auto rowset = db.Table<Schema::TableInfo>().Select();
if (!rowset.IsReady()) {
@@ -138,22 +137,15 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId) {
return true;
}
-bool TTablesManager::LoadIndex(NOlap::TDbWrapper& idxDB, THashSet<NOlap::TUnifiedBlobId>& lostEvictions) {
+bool TTablesManager::LoadIndex(NOlap::TDbWrapper& idxDB) {
if (PrimaryIndex) {
- if (!PrimaryIndex->Load(idxDB, lostEvictions, PathsToDrop)) {
+ if (!PrimaryIndex->Load(idxDB)) {
return false;
}
}
return true;
}
-void TTablesManager::Clear() {
- Tables.clear();
- SchemaPresets.clear();
- PathsToDrop.clear();
- PrimaryIndex.reset();
-}
-
bool TTablesManager::HasTable(const ui64 pathId) const {
auto it = Tables.find(pathId);
if (it == Tables.end() || it->second.IsDropped()) {
@@ -210,7 +202,11 @@ void TTablesManager::RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db) {
Y_ABORT_UNLESS(table.IsEmpty());
Schema::SaveTableInfo(db, table.GetPathId(), table.GetTieringUsage());
- Tables.insert_or_assign(table.GetPathId(), std::move(table));
+ const ui64 pathId = table.GetPathId();
+ Tables.insert_or_assign(pathId, std::move(table));
+ if (PrimaryIndex) {
+ PrimaryIndex->RegisterTable(pathId);
+ }
}
bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db) {
@@ -280,6 +276,7 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim
NOlap::TSnapshot snapshot{version.Step, version.TxId};
NOlap::TIndexInfo indexInfo = DeserializeIndexInfoFromProto(schema);
indexInfo.SetAllKeys();
+ const bool isFirstPrimaryIndexInitialization = !PrimaryIndex;
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, NOlap::TCompactionLimits(), StoragesManager);
} else {
@@ -288,6 +285,11 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim
Y_ABORT_UNLESS(lastIndexInfo.GetIndexKey()->Equals(indexInfo.GetIndexKey()));
}
PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo));
+ if (isFirstPrimaryIndexInitialization) {
+ for (auto&& i : Tables) {
+ PrimaryIndex->RegisterTable(i.first);
+ }
+ }
PrimaryIndex->OnTieringModified(nullptr, Ttl);
}
@@ -296,4 +298,11 @@ NOlap::TIndexInfo TTablesManager::DeserializeIndexInfoFromProto(const NKikimrSch
Y_ABORT_UNLESS(indexInfo);
return *indexInfo;
}
+
+TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId)
+ : StoragesManager(storagesManager)
+ , TabletId(tabletId)
+{
+}
+
}
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 8f8a4779fee..99db7751835 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -139,13 +139,9 @@ private:
TTtl Ttl;
std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
- ui64 TabletId;
+ ui64 TabletId = 0;
public:
- TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager)
- : StoragesManager(storagesManager)
- {
-
- }
+ TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId);
const TTtl& GetTtl() const {
return Ttl;
@@ -194,10 +190,8 @@ public:
return *PrimaryIndex;
}
- bool InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId);
- bool LoadIndex(NOlap::TDbWrapper& db, THashSet<NOlap::TUnifiedBlobId>& lostEvictions);
-
- void Clear();
+ bool InitFromDB(NIceDb::TNiceDb& db);
+ bool LoadIndex(NOlap::TDbWrapper& db);
const TTableInfo& GetTable(const ui64 pathId) const;
ui64 GetMemoryUsage() const;