diff options
author | nsofya <[email protected]> | 2023-05-07 23:10:01 +0300 |
---|---|---|
committer | nsofya <[email protected]> | 2023-05-07 23:10:01 +0300 |
commit | 7516af0e3476946c5739294eb0ab41f465dea027 (patch) | |
tree | c8ee46dcc444ae17a94d302f9aeb3aa639e52082 | |
parent | 71fbfcec68afef3f4fbac197ad143759c320f11e (diff) |
Preparation for VersionedIndex usage
Preparation for VersionedIndex usage
28 files changed, 389 insertions, 200 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 096a68500c3..92d1295cb31 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -46,9 +46,9 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { txc.DB.NoMoreReadsForTx(); - const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(); auto& record = Proto(Ev->Get()); - + const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(NOlap::TSnapshot().SetPlanStep(record.GetPlanStep()).SetTxId(record.GetTxId())); + ui64 metaShard = record.GetTxInitiator(); NOlap::TReadDescription read(false); @@ -80,7 +80,7 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(read.PKRangesFilter.Add(fromPredicate, toPredicate, &indexInfo)); bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, - TIndexColumnResolver(Self->TablesManager.GetIndexInfo())); + TIndexColumnResolver(indexInfo)); std::shared_ptr<NOlap::TReadMetadata> metadata; if (parseResult) { diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp index fa3c6ac3e2c..550144ad25c 100644 --- a/ydb/core/tx/columnshard/columnshard__read_base.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp @@ -5,6 +5,7 @@ namespace NKikimr::NColumnShard { + std::shared_ptr<NOlap::TReadMetadata> TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDescription& read, const std::unique_ptr<NOlap::TInsertTable>& insertTable, @@ -22,103 +23,14 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDes return {}; } - const NOlap::TIndexInfo& indexInfo = index->GetIndexInfo(); - auto spOut = std::make_shared<NOlap::TReadMetadata>(indexInfo, isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC); - auto& out = *spOut; - - out.PlanStep = read.PlanStep; - out.TxId = read.TxId; - - // schemas - - out.BlobSchema = indexInfo.ArrowSchema(); - if (read.ColumnIds.size()) { - out.ResultSchema = indexInfo.ArrowSchema(read.ColumnIds); - } else if (read.ColumnNames.size()) { - out.ResultSchema = indexInfo.ArrowSchema(read.ColumnNames); - } else { - error = "Empty column list requested"; - return {}; - } - - if (!out.BlobSchema) { - error = "Could not get BlobSchema."; - return {}; - } - - if (!out.ResultSchema) { - error = "Could not get ResultSchema."; + NOlap::TDataStorageAccessor dataAccessor(insertTable, index, batchCache); + auto readSnapshot = NOlap::TSnapshot().SetPlanStep(read.PlanStep).SetTxId(read.TxId); + auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), readSnapshot, isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC); + + if (!readMetadata->Init(read, dataAccessor, error)) { return {}; } - - // insert table - - out.CommittedBlobs = insertTable->Read(read.PathId, read.PlanStep, read.TxId); - for (auto& cmt : out.CommittedBlobs) { - if (auto batch = batchCache.Get(cmt.BlobId)) { - out.CommittedBatches.emplace(cmt.BlobId, batch); - } - } - - // index - - /// @note We could have column name changes between schema versions: - /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads. - /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}. - /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones. - /// It's not possible for blobs with several columns. There should be a special logic for them. - TVector<TString> columns = read.ColumnNames; - if (!read.ColumnIds.empty()) { - columns = indexInfo.GetColumnNames(read.ColumnIds); - } - Y_VERIFY(!columns.empty(), "Empty column list"); - - { // Add more columns: snapshot, replace, predicate - // Key columns (replace, sort) - THashSet<TString> requiredColumns = indexInfo.GetRequiredColumns(); - - // Snapshot columns - requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_PLAN_STEP); - requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_TX_ID); - - for (auto&& i : read.PKRangesFilter.GetColumnNames()) { - requiredColumns.emplace(i); - } - out.SetPKRangesFilter(read.PKRangesFilter); - - for (auto& col : columns) { - requiredColumns.erase(col); - } - - for (auto& reqCol : requiredColumns) { - columns.push_back(reqCol); - } - } - - out.LoadSchema = indexInfo.AddColumns(out.ResultSchema, columns); - if (!out.LoadSchema) { - return {}; - } - - THashSet<ui32> columnIds; - for (auto& field : out.LoadSchema->fields()) { - TString column(field->name().data(), field->name().size()); - columnIds.insert(indexInfo.GetColumnId(column)); - } - - out.Program = std::move(read.Program); - if (out.Program) { - for (auto& [id, name] : out.Program->SourceColumns) { - columnIds.insert(id); - } - } - - if (read.ReadNothing) { - out.SelectInfo = std::make_shared<NOlap::TSelectInfo>(); - } else { - out.SelectInfo = index->Select(read.PathId, {read.PlanStep, read.TxId}, columnIds, out.GetPKRangesFilter()); - } - return spOut; + return readMetadata; } bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType, diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 8445f0d227d..279587bbdae 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -852,12 +852,17 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE); read.ColumnIds.assign(record.GetColumnTags().begin(), record.GetColumnTags().end()); + const NOlap::TIndexInfo* indexInfo = nullptr; + if (!isIndexStats) { + indexInfo = &(Self->TablesManager.GetIndexInfo(NOlap::TSnapshot().SetPlanStep(snapshot.GetStep()).SetTxId(snapshot.GetTxId()))); + } + // TODO: move this to CreateReadMetadata? if (read.ColumnIds.empty()) { // "SELECT COUNT(*)" requests empty column list but we need non-empty list for PrepareReadMetadata. // So we add first PK column to the request. if (!isIndexStats) { - read.ColumnIds.push_back(Self->TablesManager.GetIndexInfo().GetPKFirstColumnId()); + read.ColumnIds.push_back(indexInfo->GetPKFirstColumnId()); } else { read.ColumnIds.push_back(PrimaryIndexStatsSchema.KeyColumns.front()); } @@ -866,7 +871,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { bool parseResult; if (!isIndexStats) { - TIndexColumnResolver columnResolver(Self->TablesManager.GetIndexInfo()); + TIndexColumnResolver columnResolver(*indexInfo); parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver); } else { TStatsColumnResolver columnResolver; @@ -892,7 +897,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { auto ydbKey = isIndexStats ? NOlap::GetColumns(PrimaryIndexStatsSchema, PrimaryIndexStatsSchema.KeyColumns) : - Self->TablesManager.GetIndexInfo().GetPrimaryKey(); + indexInfo->GetPrimaryKey(); for (auto& range: record.GetRanges()) { if (!FillPredicatesFromRange(read, range, ydbKey, Self->TabletID(), isIndexStats ? nullptr : &Self->TablesManager.GetIndexInfo())) { diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index 29e8931d5b5..1c6e132e76c 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -1,6 +1,6 @@ #include "columnshard_impl.h" #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> -#include <ydb/core/tx/columnshard/engines/index_logic.h> +#include <ydb/core/tx/columnshard/engines/index_logic_logs.h> #include "blob_cache.h" namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt index 39c7a31a10f..27b30276865 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt @@ -35,7 +35,7 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt index 7d878829f2b..cb47e22ffae 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt @@ -36,7 +36,7 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt index 7d878829f2b..cb47e22ffae 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt @@ -36,7 +36,7 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt index 39c7a31a10f..27b30276865 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt @@ -35,7 +35,7 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 230e0c90bd9..9d1eb13e3d3 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -335,11 +335,40 @@ struct TColumnEngineStats { } }; +class TVersionedIndex { + std::map<TSnapshot, ISnapshotSchema::TPtr> Snapshots; +public: + ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const { + Y_UNUSED(version); + return GetLastSchema(); + /* + for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) { + if (it->first <= version) { + return it->second; + } + } + Y_VERIFY(false); + return nullptr; + */ + } + + ISnapshotSchema::TPtr GetLastSchema() const { + Y_VERIFY(!Snapshots.empty()); + return Snapshots.rbegin()->second; + } + + void AddIndex(const TSnapshot& version, TIndexInfo&& indexInfo) { + Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version)); + } +}; + + class IColumnEngine { public: virtual ~IColumnEngine() = default; virtual const TIndexInfo& GetIndexInfo() const = 0; + virtual const TVersionedIndex& GetVersionedIndex() const = 0; virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return GetIndexInfo().GetReplaceKey(); } virtual const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return GetIndexInfo().GetSortingKey(); } virtual const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return GetIndexInfo().GetIndexKey(); } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 28b6060c41d..2b1cee15799 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -1,6 +1,6 @@ #include "column_engine_logs.h" #include "indexed_read_data.h" -#include "index_logic.h" +#include "index_logic_logs.h" #include "filter.h" @@ -333,7 +333,6 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c } void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) { - Y_UNUSED(snapshot); if (!GranulesTable) { MarkSchema = info.GetEffectiveKey(); ui32 indexId = info.GetId(); @@ -341,7 +340,7 @@ void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndex ColumnsTable = std::make_shared<TColumnsTable>(indexId); CountersTable = std::make_shared<TCountersTable>(indexId); } - IndexInfo = std::move(info); + VersionedIndex.AddIndex(snapshot, std::move(info)); } bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 4bea5aa452e..b71585a8b8b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -173,8 +173,11 @@ public: TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits = {}); const TIndexInfo& GetIndexInfo() const override { - Y_VERIFY(IndexInfo); - return *IndexInfo; + return VersionedIndex.GetLastSchema()->GetIndexInfo(); + } + + const TVersionedIndex& GetVersionedIndex() const override { + return VersionedIndex; } const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override { @@ -234,7 +237,7 @@ private: bool Empty() const noexcept { return Portions.empty(); } }; - std::optional<TIndexInfo> IndexInfo; + TVersionedIndex VersionedIndex; TCompactionLimits Limits; ui64 TabletId; std::shared_ptr<arrow::Schema> MarkSchema; diff --git a/ydb/core/tx/columnshard/engines/defs.h b/ydb/core/tx/columnshard/engines/defs.h index 7c271b1888b..575b60a2599 100644 --- a/ydb/core/tx/columnshard/engines/defs.h +++ b/ydb/core/tx/columnshard/engines/defs.h @@ -16,6 +16,24 @@ struct TSnapshot { ui64 PlanStep{0}; ui64 TxId{0}; + ui64 GetPlanStep() const { + return PlanStep; + } + + ui64 GetTxId() const { + return TxId; + } + + TSnapshot& SetPlanStep(const ui64 value) { + PlanStep = value; + return *this; + } + + TSnapshot& SetTxId(const ui64 value) { + TxId = value; + return *this; + } + constexpr bool IsZero() const noexcept { return PlanStep == 0 && TxId == 0; } @@ -27,7 +45,7 @@ struct TSnapshot { constexpr auto operator <=> (const TSnapshot&) const noexcept = default; static constexpr TSnapshot Max() noexcept { - return TSnapshot{(ui64)-1ll, (ui64)-1ll}; + return TSnapshot().SetPlanStep(-1ll).SetTxId(-1ll); } friend IOutputStream& operator << (IOutputStream& out, const TSnapshot& s) { diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp index 4310c0eee31..2e04d3defc9 100644 --- a/ydb/core/tx/columnshard/engines/filter.cpp +++ b/ydb/core/tx/columnshard/engines/filter.cpp @@ -11,12 +11,10 @@ class TSnapshotGetter { private: const arrow::UInt64Array::value_type* RawSteps; const arrow::UInt64Array::value_type* RawIds; - const ui64 PlanStep; - const ui64 TxId; + const TSnapshot Snapshot; public: - TSnapshotGetter(std::shared_ptr<arrow::Array> steps, std::shared_ptr<arrow::Array> ids, const ui64 planStep, const ui64 txId) - : PlanStep(planStep) - , TxId(txId) + TSnapshotGetter(std::shared_ptr<arrow::Array> steps, std::shared_ptr<arrow::Array> ids, const TSnapshot& snapshot) + : Snapshot(snapshot) { Y_VERIFY(steps); Y_VERIFY(ids); @@ -28,20 +26,20 @@ public: } bool operator[](const ui32 idx) const { - return SnapLessOrEqual(RawSteps[idx], RawIds[idx], PlanStep, TxId); + return SnapLessOrEqual(RawSteps[idx], RawIds[idx], Snapshot.GetPlanStep(), Snapshot.GetTxId()); } }; NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& snapSchema, - ui64 planStep, ui64 txId) { + const TSnapshot& snapshot) { Y_VERIFY(batch); Y_VERIFY(snapSchema); Y_VERIFY(snapSchema->num_fields() == 2); auto steps = batch->GetColumnByName(snapSchema->fields()[0]->name()); auto ids = batch->GetColumnByName(snapSchema->fields()[1]->name()); NArrow::TColumnFilter result; - TSnapshotGetter getter(steps, ids, planStep, txId); + TSnapshotGetter getter(steps, ids, snapshot); result.Reset(steps->length(), std::move(getter)); return result; } @@ -82,9 +80,9 @@ NArrow::TColumnFilter MakeReplaceFilterLastWins(const std::shared_ptr<arrow::Rec NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata) { Y_VERIFY(portion); NArrow::TColumnFilter result = readMetadata.GetPKRangesFilter().BuildFilter(portion); - if (readMetadata.PlanStep) { + if (readMetadata.GetSnapshot().GetPlanStep()) { auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); - result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.PlanStep, readMetadata.TxId)); + result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot())); } return result; diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h index 45dc4ab596c..f9628fe90bc 100644 --- a/ydb/core/tx/columnshard/engines/filter.h +++ b/ydb/core/tx/columnshard/engines/filter.h @@ -1,5 +1,6 @@ #pragma once +#include "defs.h" #include <ydb/core/formats/arrow/program.h> #include <ydb/core/formats/arrow/replace_key.h> @@ -7,7 +8,7 @@ namespace NKikimr::NOlap { NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& snapSchema, - ui64 planStep, ui64 txId); + const TSnapshot& snapshot); NArrow::TColumnFilter MakeReplaceFilter(const std::shared_ptr<arrow::RecordBatch>& batch, THashSet<NArrow::TReplaceKey>& keys); NArrow::TColumnFilter MakeReplaceFilterLastWins(const std::shared_ptr<arrow::RecordBatch>& batch, THashSet<NArrow::TReplaceKey>& keys); diff --git a/ydb/core/tx/columnshard/engines/index_logic.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index e02dd2ebb6d..eed53c1896b 100644 --- a/ydb/core/tx/columnshard/engines/index_logic.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -1,4 +1,4 @@ -#include "index_logic.h" +#include "index_logic_logs.h" #include <span> diff --git a/ydb/core/tx/columnshard/engines/index_logic.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h index 86158ca4cfc..86158ca4cfc 100644 --- a/ydb/core/tx/columnshard/engines/index_logic.h +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index b85513d74c6..31ebe70139f 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -128,13 +128,11 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader:: } void TIndexedReadData::InitRead(ui32 inputBatch) { - Y_VERIFY(ReadMetadata->BlobSchema); - Y_VERIFY(ReadMetadata->LoadSchema); - Y_VERIFY(ReadMetadata->ResultSchema); - Y_VERIFY(IndexInfo().GetSortingKey()); - Y_VERIFY(IndexInfo().GetIndexKey() && IndexInfo().GetIndexKey()->num_fields()); + auto& indexInfo = ReadMetadata->GetIndexInfo(); + Y_VERIFY(indexInfo.GetSortingKey()); + Y_VERIFY(indexInfo.GetIndexKey() && indexInfo.GetIndexKey()->num_fields()); - SortReplaceDescription = IndexInfo().SortReplaceDescription(); + SortReplaceDescription = indexInfo.SortReplaceDescription(); NotIndexed.resize(inputBatch); @@ -156,7 +154,7 @@ void TIndexedReadData::InitRead(ui32 inputBatch) { stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size(); stats->IndexBatches = ReadMetadata->NumIndexedBlobs(); stats->CommittedBatches = ReadMetadata->CommittedBlobs.size(); - stats->SchemaColumns = ReadMetadata->LoadSchema->num_fields(); + stats->SchemaColumns = ReadMetadata->GetSchemaColumnsCount(); stats->FilterColumns = GranulesContext->GetEarlyFilterColumns().size(); stats->AdditionalColumns = GranulesContext->GetPostFilterColumns().size(); stats->PortionsBytes = portionsBytes; @@ -190,8 +188,9 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& // Extract columns (without check), filter, attach snapshot, extract columns with check // (do not filter snapshot columns) + auto loadSchema = ReadMetadata->GetLoadSchema(TSnapshot().SetPlanStep(planStep).SetTxId(txId)); - auto batch = NArrow::ExtractExistedColumns(srcBatch, ReadMetadata->LoadSchema); + auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema); Y_VERIFY(batch); auto filter = FilterNotIndexed(batch, *ReadMetadata); @@ -203,7 +202,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, planStep, txId); Y_VERIFY(preparedBatch); - preparedBatch = NArrow::ExtractColumns(preparedBatch, ReadMetadata->LoadSchema); + preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema); Y_VERIFY(preparedBatch); filter.Apply(preparedBatch); @@ -212,6 +211,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxRowsInBatch) { Y_VERIFY(SortReplaceDescription); + auto& indexInfo = ReadMetadata->GetIndexInfo(); if (NotIndexed.size() != ReadyNotIndexed) { // Wait till we have all not indexed data so we could replace keys in granules @@ -228,10 +228,10 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR // committed data before the first granule would be placed in fake preceding granule // committed data after the last granule would be placed into the last granule - marksGranules.MakePrecedingMark(IndexInfo()); + marksGranules.MakePrecedingMark(indexInfo); Y_VERIFY(!marksGranules.Empty()); - auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, IndexInfo()); + auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, indexInfo); GranulesContext->DrainNotIndexedBatches(&outNotIndexed); Y_VERIFY(outNotIndexed.size() <= 1); if (outNotIndexed.size() == 1) { @@ -254,7 +254,7 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR auto out = MakeResult(ReadyToOut(), maxRowsInBatch); if (requireResult && out.empty()) { out.push_back(TPartialReadResult{ - .ResultBatch = NArrow::MakeEmptyBatch(ReadMetadata->ResultSchema) + .ResultBatch = NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema()) }); } return out; @@ -265,6 +265,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: Y_VERIFY(SortReplaceDescription); Y_VERIFY(GranulesContext); + auto& indexInfo = ReadMetadata->GetIndexInfo(); std::vector<NIndexedReader::TGranule*> ready = GranulesContext->DetachReadyInOrder(); std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out; out.reserve(ready.size() + 1); @@ -289,7 +290,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey)); } #if 1 // optimization - auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, granule->GetBatchesToDedup()); + auto deduped = SpecialMergeSorted(inGranule, indexInfo, SortReplaceDescription, granule->GetBatchesToDedup()); out.emplace_back(std::move(deduped)); #else out.push_back({}); @@ -313,7 +314,8 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: std::shared_ptr<arrow::RecordBatch> TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const { Y_VERIFY(ReadMetadata->IsSorted()); - Y_VERIFY(IndexInfo().GetSortingKey()); + auto& indexInfo = ReadMetadata->GetIndexInfo(); + Y_VERIFY(indexInfo.GetSortingKey()); { const auto pred = [](const std::shared_ptr<arrow::RecordBatch>& b) { @@ -327,7 +329,6 @@ TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch } // We could merge data here only if backpressure limits committed data size. KIKIMR-12520 - auto& indexInfo = IndexInfo(); auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription()); Y_VERIFY(merged); Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey())); @@ -380,6 +381,7 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco int64_t maxRowsInBatch) const { Y_VERIFY(ReadMetadata->IsSorted()); Y_VERIFY(SortReplaceDescription); + auto& indexInfo = ReadMetadata->GetIndexInfo(); TVector<TPartialReadResult> out; @@ -413,7 +415,7 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco } for (auto& batch : batches) { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, IndexInfo().GetReplaceKey(), isDesc)); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey(), isDesc)); if (batch->num_rows() == 0) { Y_VERIFY_DEBUG(false, "Unexpected empty batch"); @@ -421,11 +423,11 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco } // Extract the last row's PK - auto keyBatch = NArrow::ExtractColumns(batch, IndexInfo().GetReplaceKey()); + auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); auto lastKey = keyBatch->Slice(keyBatch->num_rows()-1, 1); // Leave only requested columns - auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->ResultSchema); + auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema()); out.emplace_back(TPartialReadResult{ .ResultBatch = std::move(resultBatch), .LastReadKey = std::move(lastKey) diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 94a605e461f..ad22e964339 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -66,7 +66,7 @@ public: TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); void AddNotIndexed(ui32 batchNo, TString blob, ui64 planStep, ui64 txId) { - auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->BlobSchema); + auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(TSnapshot().SetPlanStep(planStep).SetTxId(txId))); AddNotIndexed(batchNo, batch, planStep, txId); } @@ -93,10 +93,6 @@ public: } private: - const TIndexInfo& IndexInfo() const { - return ReadMetadata->IndexInfo; - } - std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const; diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index dd52019d249..8f364f92ec4 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -9,6 +9,53 @@ namespace NKikimr::NOlap { +class ISnapshotSchema { +public: + using TPtr = std::shared_ptr<ISnapshotSchema>; + + virtual ~ISnapshotSchema() {} + virtual int GetFieldIndex(const ui32 columnId) const = 0; + virtual std::shared_ptr<arrow::Field> GetField(const int index) const = 0; + virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0; + virtual const TIndexInfo& GetIndexInfo() const = 0; + virtual const TSnapshot& GetSnapshot() const = 0; +}; + +class TSnapshotSchema : public ISnapshotSchema { + TIndexInfo IndexInfo; + std::shared_ptr<arrow::Schema> Schema; + TSnapshot Snapshot; +public: + TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot) + : IndexInfo(std::move(indexInfo)) + , Schema(IndexInfo.ArrowSchemaWithSpecials()) + , Snapshot(snapshot) + { + } + + int GetFieldIndex(const ui32 columnId) const override { + TString columnName = IndexInfo.GetColumnName(columnId); + std::string name(columnName.data(), columnName.size()); + return Schema->GetFieldIndex(name); + } + + std::shared_ptr<arrow::Field> GetField(const int index) const override { + return Schema->field(index); + } + + const std::shared_ptr<arrow::Schema>& GetSchema() const override { + return Schema; + } + + const TIndexInfo& GetIndexInfo() const override { + return IndexInfo; + } + + const TSnapshot& GetSnapshot() const override { + return Snapshot; + } +}; + struct TPortionMeta { // NOTE: These values are persisted in LocalDB so they must be stable enum EProduced : ui32 { diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 0cfbcf5d4c4..f6efac5e725 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -28,7 +28,8 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard Y_VERIFY(WaitIndexed.empty()); Y_VERIFY(PortionInfo->Produced()); Y_VERIFY(!FetchedInfo.GetFilteredBatch()); - auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data, CurrentColumnIds); + auto loadSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot()); + auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->GetIndexInfo(), loadSchema, Data, CurrentColumnIds); Data.clear(); if (!FetchedInfo.GetFilter()) { return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index 217e46911fb..8ad6a8b871a 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -23,11 +23,12 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco NonSortableBatches.emplace_back(batch); } + auto& indexInfo = Owner->GetReadMetadata()->GetIndexInfo(); if (!batchInfo.IsDuplicationsAvailable()) { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false)); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey(), false)); } else { AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion_on_ready"); - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false)); + Y_VERIFY_DEBUG(NArrow::IsSorted(batch, indexInfo.GetReplaceKey(), false)); Y_VERIFY(IsDuplicationsAvailable()); BatchesToDedup.insert(batch.get()); } @@ -67,9 +68,10 @@ std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata: batches.emplace_back(&i); } const int reverseKff = reverse ? -1 : 0; - const auto pred = [reverseKff, readMetadata](const TBatch* l, const TBatch* r) { + auto& indexInfo = readMetadata->GetIndexInfo(); + const auto pred = [reverseKff, indexInfo](const TBatch* l, const TBatch* r) { if (l->IsSortableInGranule() && r->IsSortableInGranule()) { - return l->GetPortionInfo().CompareMinByPk(r->GetPortionInfo(), readMetadata->IndexInfo) * reverseKff < 0; + return l->GetPortionInfo().CompareMinByPk(r->GetPortionInfo(), indexInfo) * reverseKff < 0; } else if (l->IsSortableInGranule()) { return false; } else if (r->IsSortableInGranule()) { @@ -85,7 +87,7 @@ std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata: auto& l = *batches[i]; auto& r = *batches[i + 1]; Y_VERIFY(r.IsSortableInGranule()); - Y_VERIFY(l.GetPortionInfo().CompareSelfMaxItemMinByPk(r.GetPortionInfo(), readMetadata->IndexInfo) * reverseKff <= 0); + Y_VERIFY(l.GetPortionInfo().CompareSelfMaxItemMinByPk(r.GetPortionInfo(), indexInfo) * reverseKff <= 0); nonCompactedSerial = false; } else { Y_VERIFY(nonCompactedSerial); diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp index 7ba4e4c2507..1e2beba67e6 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp @@ -109,7 +109,7 @@ std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) :TBase(readMetadata) - , MergeStream(readMetadata->IndexInfo.GetReplaceKey(), readMetadata->IsDescSorted()) + , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), readMetadata->IsDescSorted()) { CurrentItemsLimit = ReadMetadata->Limit; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index ef197c7ca55..6f43e2e95d3 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -5,25 +5,145 @@ namespace NKikimr::NOlap { +TDataStorageAccessor::TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertTable>& insertTable, + const std::unique_ptr<NOlap::IColumnEngine>& index, + const NColumnShard::TBatchCache& batchCache) + : InsertTable(insertTable) + , Index(index) + , BatchCache(batchCache) +{} + +std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const { + if (readDescription.ReadNothing) { + return std::make_shared<NOlap::TSelectInfo>(); + } + return Index->Select(readDescription.PathId, + {readDescription.PlanStep, readDescription.TxId}, + columnIds, + readDescription.PKRangesFilter); +} + +std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const { + return std::move(InsertTable->Read(readDescription.PathId, readDescription.PlanStep, readDescription.TxId)); +} + +std::shared_ptr<arrow::RecordBatch> TDataStorageAccessor::GetCachedBatch(const TUnifiedBlobId& blobId) const { + return BatchCache.Get(blobId); +} + std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const { return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters); } +bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor, std::string& error) { + auto& indexInfo = ResultIndexSchema->GetIndexInfo(); + + TVector<ui32> resultColumnsIds; + if (readDescription.ColumnIds.size()) { + resultColumnsIds = readDescription.ColumnIds; + } else if (readDescription.ColumnNames.size()) { + resultColumnsIds = indexInfo.GetColumnIds(readDescription.ColumnNames); + } else { + error = "Empty column list requested"; + return false; + } + ResultColumnsIds.swap(resultColumnsIds); + + if (!GetResultSchema()) { + error = "Could not get ResultSchema."; + return false; + } + + SetPKRangesFilter(readDescription.PKRangesFilter); + + /// @note We could have column name changes between schema versions: + /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads. + /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}. + /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones. + /// It's not possible for blobs with several columns. There should be a special logic for them. + { + Y_VERIFY(!ResultColumnsIds.empty(), "Empty column list"); + THashSet<TString> requiredColumns = indexInfo.GetRequiredColumns(); + + // Snapshot columns + requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_PLAN_STEP); + requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_TX_ID); + + for (auto&& i : readDescription.PKRangesFilter.GetColumnNames()) { + requiredColumns.emplace(i); + } + + for (auto& col : ResultColumnsIds) { + requiredColumns.erase(indexInfo.GetColumnName(col)); + } + + TVector<ui32> auxiliaryColumns; + auxiliaryColumns.reserve(requiredColumns.size()); + for (auto& reqCol : requiredColumns) { + auxiliaryColumns.push_back(indexInfo.GetColumnId(reqCol)); + } + AllColumns.reserve(AllColumns.size() + ResultColumnsIds.size() + auxiliaryColumns.size()); + AllColumns.insert(AllColumns.end(), ResultColumnsIds.begin(), ResultColumnsIds.end()); + AllColumns.insert(AllColumns.end(), auxiliaryColumns.begin(), auxiliaryColumns.end()); + } + + CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription); + for (auto& cmt : CommittedBlobs) { + if (auto batch = dataAccessor.GetCachedBatch(cmt.BlobId)) { + CommittedBatches.emplace(cmt.BlobId, batch); + } + } + + auto loadSchema = GetLoadSchema(Snapshot); + if (!loadSchema) { + return false; + } + + THashSet<ui32> columnIds; + for (auto& field : loadSchema->fields()) { + TString column(field->name().data(), field->name().size()); + columnIds.insert(indexInfo.GetColumnId(column)); + } + + Program = readDescription.Program; + if (Program) { + for (auto& [id, name] : Program->SourceColumns) { + columnIds.insert(id); + } + } + + SelectInfo = dataAccessor.Select(readDescription, columnIds); + return true; +} + std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { - std::set<ui32> result = GetPKRangesFilter().GetColumnIds(IndexInfo); + auto& indexInfo = ResultIndexSchema->GetIndexInfo(); + std::set<ui32> result = GetPKRangesFilter().GetColumnIds(indexInfo); + if (LessPredicate) { + for (auto&& i : LessPredicate->ColumnNames()) { + result.emplace(indexInfo.GetColumnId(i)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); + } + } + if (GreaterPredicate) { + for (auto&& i : GreaterPredicate->ColumnNames()) { + result.emplace(indexInfo.GetColumnId(i)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); + } + } if (Program) { for (auto&& i : Program->GetEarlyFilterColumns()) { - auto id = IndexInfo.GetColumnIdOptional(i); + auto id = indexInfo.GetColumnIdOptional(i); if (id) { result.emplace(*id); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); } } } - if (PlanStep) { + if (Snapshot.GetPlanStep()) { auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); for (auto&& i : snapSchema->fields()) { - result.emplace(IndexInfo.GetColumnId(i->name())); + result.emplace(indexInfo.GetColumnId(i->name())); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name()); } } @@ -32,27 +152,26 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { std::set<ui32> TReadMetadata::GetPKColumnIds() const { std::set<ui32> result; - for (auto&& i : IndexInfo.GetPrimaryKey()) { - Y_VERIFY(result.emplace(IndexInfo.GetColumnId(i.first)).second); + auto& indexInfo = ResultIndexSchema->GetIndexInfo(); + for (auto&& i : indexInfo.GetPrimaryKey()) { + Y_VERIFY(result.emplace(indexInfo.GetColumnId(i.first)).second); } return result; } std::set<ui32> TReadMetadata::GetUsedColumnIds() const { std::set<ui32> result; - if (PlanStep) { + auto& indexInfo = ResultIndexSchema->GetIndexInfo(); + if (Snapshot.GetPlanStep()) { auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); for (auto&& i : snapSchema->fields()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name()); - result.emplace(IndexInfo.GetColumnId(i->name())); + result.emplace(indexInfo.GetColumnId(i->name())); } } - for (auto&& f : LoadSchema->fields()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name()); - result.emplace(IndexInfo.GetColumnId(f->name())); - } - for (auto&& i : IndexInfo.GetPrimaryKey()) { - Y_VERIFY(result.contains(IndexInfo.GetColumnId(i.first))); + result.insert(AllColumns.begin(), AllColumns.end()); + for (auto&& i : indexInfo.GetPrimaryKey()) { + Y_VERIFY(result.contains(indexInfo.GetColumnId(i.first))); } return result; } @@ -90,13 +209,14 @@ void TReadStats::PrintToLog() { } NIndexedReader::IOrderPolicy::TPtr TReadMetadata::BuildSortingPolicy() const { - if (Limit && IsSorted() && IndexInfo.IsSorted() && IndexInfo.GetSortingKey()->num_fields()) { + auto& indexInfo = ResultIndexSchema->GetIndexInfo(); + if (Limit && IsSorted() && indexInfo.IsSorted() && indexInfo.GetSortingKey()->num_fields()) { ui32 idx = 0; - for (auto&& i : IndexInfo.GetPrimaryKey()) { - if (idx >= IndexInfo.GetSortingKey()->fields().size()) { + for (auto&& i : indexInfo.GetPrimaryKey()) { + if (idx >= indexInfo.GetSortingKey()->fields().size()) { break; } - if (IndexInfo.GetSortingKey()->fields()[idx]->name() != i.first) { + if (indexInfo.GetSortingKey()->fields()[idx]->name() != i.first) { return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this()); } ++idx; diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index fec567b55dd..843bb4486c2 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -1,9 +1,11 @@ #pragma once #include "conveyor_task.h" +#include "description.h" #include <ydb/library/accessor/accessor.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/counters.h> #include <ydb/core/tx/columnshard/columnshard__scan.h> +#include <ydb/core/tx/columnshard/columnshard_common.h> #include <ydb/core/tx/columnshard/engines/predicate/predicate.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <ydb/core/scheme_types/scheme_type_info.h> @@ -49,6 +51,21 @@ struct TReadStats { } }; +class TDataStorageAccessor { +private: + const std::unique_ptr<NOlap::TInsertTable>& InsertTable; + const std::unique_ptr<NOlap::IColumnEngine>& Index; + const NColumnShard::TBatchCache& BatchCache; + +public: + TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertTable>& insertTable, + const std::unique_ptr<NOlap::IColumnEngine>& index, + const NColumnShard::TBatchCache& batchCache); + std::shared_ptr<NOlap::TSelectInfo> Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const; + std::vector<NOlap::TCommittedBlob> GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const; + std::shared_ptr<arrow::RecordBatch> GetCachedBatch(const TUnifiedBlobId& blobId) const; +}; + // Holds all metadata that is needed to perform read/scan struct TReadMetadataBase { public: @@ -81,9 +98,8 @@ public: } virtual ~TReadMetadataBase() = default; - std::shared_ptr<arrow::Schema> BlobSchema; - std::shared_ptr<arrow::Schema> LoadSchema; // ResultSchema + required for intermediate operations - std::shared_ptr<arrow::Schema> ResultSchema; // TODO: add Program modifications + std::shared_ptr<NOlap::TPredicate> LessPredicate; + std::shared_ptr<NOlap::TPredicate> GreaterPredicate; std::shared_ptr<NSsa::TProgram> Program; std::shared_ptr<const THashSet<TUnifiedBlobId>> ExternBlobs; ui64 Limit{0}; // TODO @@ -110,30 +126,66 @@ public: // Holds all metadata that is needed to perform read/scan struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadMetadata> { -private: using TBase = TReadMetadataBase; +private: + TVersionedIndex IndexVersions; + TSnapshot Snapshot; + std::shared_ptr<ISnapshotSchema> ResultIndexSchema; + TVector<ui32> AllColumns; + TVector<ui32> ResultColumnsIds; public: using TConstPtr = std::shared_ptr<const TReadMetadata>; - TIndexInfo IndexInfo; - ui64 PlanStep = 0; - ui64 TxId = 0; std::shared_ptr<TSelectInfo> SelectInfo; std::vector<TCommittedBlob> CommittedBlobs; THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches; std::shared_ptr<TReadStats> ReadStats; + const TSnapshot& GetSnapshot() const { + return Snapshot; + } + std::shared_ptr<NIndexedReader::IOrderPolicy> BuildSortingPolicy() const; - TReadMetadata(const TIndexInfo& info, const ESorting sorting) + TReadMetadata(const TVersionedIndex& info, const TSnapshot& snapshot, const ESorting sorting) : TBase(sorting) - , IndexInfo(info) - , ReadStats(std::make_shared<TReadStats>(info.GetId())) - {} + , IndexVersions(info) + , Snapshot(snapshot) + , ResultIndexSchema(info.GetSchema(Snapshot)) + , ReadStats(std::make_shared<TReadStats>(info.GetLastSchema()->GetIndexInfo().GetId())) + { + } + + bool Init(const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor, std::string& error); + + ui64 GetSchemaColumnsCount() const { + return AllColumns.size(); + } + + std::shared_ptr<arrow::Schema> GetLoadSchema(const TSnapshot& version) const { + const auto& indexInfo = IndexVersions.GetSchema(version)->GetIndexInfo(); + return indexInfo.ArrowSchema(AllColumns, true); + } + + std::shared_ptr<arrow::Schema> GetBlobSchema(const TSnapshot& version) const { + return IndexVersions.GetSchema(version)->GetIndexInfo().ArrowSchema(); + } + + std::shared_ptr<arrow::Schema> GetResultSchema() const { + return ResultIndexSchema->GetIndexInfo().ArrowSchema(ResultColumnsIds); + } + + const TIndexInfo& GetIndexInfo(const std::optional<TSnapshot>& version = {}) const { + if (version && version < Snapshot) { + return IndexVersions.GetSchema(*version)->GetIndexInfo(); + } + return ResultIndexSchema->GetIndexInfo(); + } std::vector<std::string> GetColumnsOrder() const { + auto loadSchema = GetLoadSchema(Snapshot); std::vector<std::string> result; - for (auto&& i : LoadSchema->fields()) { + for (auto&& i : loadSchema->fields()) { result.emplace_back(i->name()); } return result; @@ -149,25 +201,28 @@ public: } std::shared_ptr<arrow::Schema> GetSortingKey() const { - return IndexInfo.GetSortingKey(); + return ResultIndexSchema->GetIndexInfo().GetSortingKey(); } std::shared_ptr<arrow::Schema> GetReplaceKey() const { - return IndexInfo.GetReplaceKey(); + return ResultIndexSchema->GetIndexInfo().GetReplaceKey(); } TVector<TNameTypeInfo> GetResultYqlSchema() const override { + auto& indexInfo = ResultIndexSchema->GetIndexInfo(); + auto resultSchema = GetResultSchema(); + Y_VERIFY(resultSchema); TVector<NTable::TTag> columnIds; - columnIds.reserve(ResultSchema->num_fields()); - for (const auto& field: ResultSchema->fields()) { + columnIds.reserve(resultSchema->num_fields()); + for (const auto& field: resultSchema->fields()) { TString name = TStringBuilder() << field->name(); - columnIds.emplace_back(IndexInfo.GetColumnId(name)); + columnIds.emplace_back(indexInfo.GetColumnId(name)); } - return IndexInfo.GetColumns(columnIds); + return indexInfo.GetColumns(columnIds); } TVector<TNameTypeInfo> GetKeyYqlSchema() const override { - return IndexInfo.GetPrimaryKey(); + return ResultIndexSchema->GetIndexInfo().GetPrimaryKey(); } size_t NumIndexedRecords() const { @@ -183,12 +238,12 @@ public: std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; void Dump(IOutputStream& out) const override { - out << "columns: " << (LoadSchema ? LoadSchema->num_fields() : 0) + out << "columns: " << GetSchemaColumnsCount() << " index records: " << NumIndexedRecords() << " index blobs: " << NumIndexedBlobs() << " committed blobs: " << CommittedBlobs.size() << " with program steps: " << (Program ? Program->Steps.size() : 0) - << " at snapshot: " << PlanStep << ":" << TxId; + << " at snapshot: " << Snapshot.GetPlanStep() << ":" << Snapshot.GetTxId(); TBase::Dump(out); if (SelectInfo) { out << ", " << *SelectInfo; diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index e351c6c5001..71c9ce8956e 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -1,7 +1,7 @@ #include <library/cpp/testing/unittest/registar.h> #include "column_engine_logs.h" -#include "index_logic.h" #include "predicate/predicate.h" +#include "index_logic_logs.h" namespace NKikimr { diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index b5af4486629..69cd682b6fd 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -1,6 +1,6 @@ #include "columnshard_impl.h" #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> -#include <ydb/core/tx/columnshard/engines/index_logic.h> +#include <ydb/core/tx/columnshard/engines/index_logic_logs.h> #include "blob_cache.h" namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index 29f574f1805..2c45bb7e562 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -1,6 +1,6 @@ #include "columnshard_impl.h" #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> -#include <ydb/core/tx/columnshard/engines/index_logic.h> +#include <ydb/core/tx/columnshard/engines/index_logic_logs.h> #include "blob_cache.h" namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 562458d4dd7..99f4d17485a 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -270,14 +270,15 @@ private: mutable TString SerializedSchema; TString GetSerializedSchema(const std::shared_ptr<arrow::RecordBatch>& batch) const { - Y_VERIFY(ReadMetadata->ResultSchema); + auto resultSchema = ReadMetadata->GetResultSchema(); + Y_VERIFY(resultSchema); // TODO: make real ResultSchema with SSA effects - if (ReadMetadata->ResultSchema->Equals(batch->schema())) { + if (resultSchema->Equals(batch->schema())) { if (!SerializedSchema.empty()) { return SerializedSchema; } - SerializedSchema = NArrow::SerializeSchema(*ReadMetadata->ResultSchema); + SerializedSchema = NArrow::SerializeSchema(*resultSchema); return SerializedSchema; } |