summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <[email protected]>2023-05-07 23:10:01 +0300
committernsofya <[email protected]>2023-05-07 23:10:01 +0300
commit7516af0e3476946c5739294eb0ab41f465dea027 (patch)
treec8ee46dcc444ae17a94d302f9aeb3aa639e52082
parent71fbfcec68afef3f4fbac197ad143759c320f11e (diff)
Preparation for VersionedIndex usage
Preparation for VersionedIndex usage
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.cpp102
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp11
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h29
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h9
-rw-r--r--ydb/core/tx/columnshard/engines/defs.h20
-rw-r--r--ydb/core/tx/columnshard/engines/filter.cpp18
-rw-r--r--ydb/core/tx/columnshard/engines/filter.h3
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp (renamed from ydb/core/tx/columnshard/engines/index_logic.cpp)2
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.h (renamed from ydb/core/tx/columnshard/engines/index_logic.h)0
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp38
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h6
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h47
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp156
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h97
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp2
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp7
28 files changed, 389 insertions, 200 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 096a68500c3..92d1295cb31 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -46,9 +46,9 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
txc.DB.NoMoreReadsForTx();
- const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo();
auto& record = Proto(Ev->Get());
-
+ const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(NOlap::TSnapshot().SetPlanStep(record.GetPlanStep()).SetTxId(record.GetTxId()));
+
ui64 metaShard = record.GetTxInitiator();
NOlap::TReadDescription read(false);
@@ -80,7 +80,7 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(read.PKRangesFilter.Add(fromPredicate, toPredicate, &indexInfo));
bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read,
- TIndexColumnResolver(Self->TablesManager.GetIndexInfo()));
+ TIndexColumnResolver(indexInfo));
std::shared_ptr<NOlap::TReadMetadata> metadata;
if (parseResult) {
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp
index fa3c6ac3e2c..550144ad25c 100644
--- a/ydb/core/tx/columnshard/columnshard__read_base.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp
@@ -5,6 +5,7 @@
namespace NKikimr::NColumnShard {
+
std::shared_ptr<NOlap::TReadMetadata>
TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDescription& read,
const std::unique_ptr<NOlap::TInsertTable>& insertTable,
@@ -22,103 +23,14 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDes
return {};
}
- const NOlap::TIndexInfo& indexInfo = index->GetIndexInfo();
- auto spOut = std::make_shared<NOlap::TReadMetadata>(indexInfo, isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC);
- auto& out = *spOut;
-
- out.PlanStep = read.PlanStep;
- out.TxId = read.TxId;
-
- // schemas
-
- out.BlobSchema = indexInfo.ArrowSchema();
- if (read.ColumnIds.size()) {
- out.ResultSchema = indexInfo.ArrowSchema(read.ColumnIds);
- } else if (read.ColumnNames.size()) {
- out.ResultSchema = indexInfo.ArrowSchema(read.ColumnNames);
- } else {
- error = "Empty column list requested";
- return {};
- }
-
- if (!out.BlobSchema) {
- error = "Could not get BlobSchema.";
- return {};
- }
-
- if (!out.ResultSchema) {
- error = "Could not get ResultSchema.";
+ NOlap::TDataStorageAccessor dataAccessor(insertTable, index, batchCache);
+ auto readSnapshot = NOlap::TSnapshot().SetPlanStep(read.PlanStep).SetTxId(read.TxId);
+ auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), readSnapshot, isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC);
+
+ if (!readMetadata->Init(read, dataAccessor, error)) {
return {};
}
-
- // insert table
-
- out.CommittedBlobs = insertTable->Read(read.PathId, read.PlanStep, read.TxId);
- for (auto& cmt : out.CommittedBlobs) {
- if (auto batch = batchCache.Get(cmt.BlobId)) {
- out.CommittedBatches.emplace(cmt.BlobId, batch);
- }
- }
-
- // index
-
- /// @note We could have column name changes between schema versions:
- /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads.
- /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}.
- /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones.
- /// It's not possible for blobs with several columns. There should be a special logic for them.
- TVector<TString> columns = read.ColumnNames;
- if (!read.ColumnIds.empty()) {
- columns = indexInfo.GetColumnNames(read.ColumnIds);
- }
- Y_VERIFY(!columns.empty(), "Empty column list");
-
- { // Add more columns: snapshot, replace, predicate
- // Key columns (replace, sort)
- THashSet<TString> requiredColumns = indexInfo.GetRequiredColumns();
-
- // Snapshot columns
- requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_PLAN_STEP);
- requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_TX_ID);
-
- for (auto&& i : read.PKRangesFilter.GetColumnNames()) {
- requiredColumns.emplace(i);
- }
- out.SetPKRangesFilter(read.PKRangesFilter);
-
- for (auto& col : columns) {
- requiredColumns.erase(col);
- }
-
- for (auto& reqCol : requiredColumns) {
- columns.push_back(reqCol);
- }
- }
-
- out.LoadSchema = indexInfo.AddColumns(out.ResultSchema, columns);
- if (!out.LoadSchema) {
- return {};
- }
-
- THashSet<ui32> columnIds;
- for (auto& field : out.LoadSchema->fields()) {
- TString column(field->name().data(), field->name().size());
- columnIds.insert(indexInfo.GetColumnId(column));
- }
-
- out.Program = std::move(read.Program);
- if (out.Program) {
- for (auto& [id, name] : out.Program->SourceColumns) {
- columnIds.insert(id);
- }
- }
-
- if (read.ReadNothing) {
- out.SelectInfo = std::make_shared<NOlap::TSelectInfo>();
- } else {
- out.SelectInfo = index->Select(read.PathId, {read.PlanStep, read.TxId}, columnIds, out.GetPKRangesFilter());
- }
- return spOut;
+ return readMetadata;
}
bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType,
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 8445f0d227d..279587bbdae 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -852,12 +852,17 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE);
read.ColumnIds.assign(record.GetColumnTags().begin(), record.GetColumnTags().end());
+ const NOlap::TIndexInfo* indexInfo = nullptr;
+ if (!isIndexStats) {
+ indexInfo = &(Self->TablesManager.GetIndexInfo(NOlap::TSnapshot().SetPlanStep(snapshot.GetStep()).SetTxId(snapshot.GetTxId())));
+ }
+
// TODO: move this to CreateReadMetadata?
if (read.ColumnIds.empty()) {
// "SELECT COUNT(*)" requests empty column list but we need non-empty list for PrepareReadMetadata.
// So we add first PK column to the request.
if (!isIndexStats) {
- read.ColumnIds.push_back(Self->TablesManager.GetIndexInfo().GetPKFirstColumnId());
+ read.ColumnIds.push_back(indexInfo->GetPKFirstColumnId());
} else {
read.ColumnIds.push_back(PrimaryIndexStatsSchema.KeyColumns.front());
}
@@ -866,7 +871,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
bool parseResult;
if (!isIndexStats) {
- TIndexColumnResolver columnResolver(Self->TablesManager.GetIndexInfo());
+ TIndexColumnResolver columnResolver(*indexInfo);
parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver);
} else {
TStatsColumnResolver columnResolver;
@@ -892,7 +897,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
auto ydbKey = isIndexStats ?
NOlap::GetColumns(PrimaryIndexStatsSchema, PrimaryIndexStatsSchema.KeyColumns) :
- Self->TablesManager.GetIndexInfo().GetPrimaryKey();
+ indexInfo->GetPrimaryKey();
for (auto& range: record.GetRanges()) {
if (!FillPredicatesFromRange(read, range, ydbKey, Self->TabletID(), isIndexStats ? nullptr : &Self->TablesManager.GetIndexInfo())) {
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index 29e8931d5b5..1c6e132e76c 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -1,6 +1,6 @@
#include "columnshard_impl.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
-#include <ydb/core/tx/columnshard/engines/index_logic.h>
+#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
#include "blob_cache.h"
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index 39c7a31a10f..27b30276865 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -35,7 +35,7 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index 7d878829f2b..cb47e22ffae 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -36,7 +36,7 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index 7d878829f2b..cb47e22ffae 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -36,7 +36,7 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index 39c7a31a10f..27b30276865 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -35,7 +35,7 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 230e0c90bd9..9d1eb13e3d3 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -335,11 +335,40 @@ struct TColumnEngineStats {
}
};
+class TVersionedIndex {
+ std::map<TSnapshot, ISnapshotSchema::TPtr> Snapshots;
+public:
+ ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const {
+ Y_UNUSED(version);
+ return GetLastSchema();
+ /*
+ for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) {
+ if (it->first <= version) {
+ return it->second;
+ }
+ }
+ Y_VERIFY(false);
+ return nullptr;
+ */
+ }
+
+ ISnapshotSchema::TPtr GetLastSchema() const {
+ Y_VERIFY(!Snapshots.empty());
+ return Snapshots.rbegin()->second;
+ }
+
+ void AddIndex(const TSnapshot& version, TIndexInfo&& indexInfo) {
+ Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version));
+ }
+};
+
+
class IColumnEngine {
public:
virtual ~IColumnEngine() = default;
virtual const TIndexInfo& GetIndexInfo() const = 0;
+ virtual const TVersionedIndex& GetVersionedIndex() const = 0;
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return GetIndexInfo().GetReplaceKey(); }
virtual const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return GetIndexInfo().GetSortingKey(); }
virtual const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return GetIndexInfo().GetIndexKey(); }
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 28b6060c41d..2b1cee15799 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1,6 +1,6 @@
#include "column_engine_logs.h"
#include "indexed_read_data.h"
-#include "index_logic.h"
+#include "index_logic_logs.h"
#include "filter.h"
@@ -333,7 +333,6 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
}
void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) {
- Y_UNUSED(snapshot);
if (!GranulesTable) {
MarkSchema = info.GetEffectiveKey();
ui32 indexId = info.GetId();
@@ -341,7 +340,7 @@ void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndex
ColumnsTable = std::make_shared<TColumnsTable>(indexId);
CountersTable = std::make_shared<TCountersTable>(indexId);
}
- IndexInfo = std::move(info);
+ VersionedIndex.AddIndex(snapshot, std::move(info));
}
bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 4bea5aa452e..b71585a8b8b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -173,8 +173,11 @@ public:
TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits = {});
const TIndexInfo& GetIndexInfo() const override {
- Y_VERIFY(IndexInfo);
- return *IndexInfo;
+ return VersionedIndex.GetLastSchema()->GetIndexInfo();
+ }
+
+ const TVersionedIndex& GetVersionedIndex() const override {
+ return VersionedIndex;
}
const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override {
@@ -234,7 +237,7 @@ private:
bool Empty() const noexcept { return Portions.empty(); }
};
- std::optional<TIndexInfo> IndexInfo;
+ TVersionedIndex VersionedIndex;
TCompactionLimits Limits;
ui64 TabletId;
std::shared_ptr<arrow::Schema> MarkSchema;
diff --git a/ydb/core/tx/columnshard/engines/defs.h b/ydb/core/tx/columnshard/engines/defs.h
index 7c271b1888b..575b60a2599 100644
--- a/ydb/core/tx/columnshard/engines/defs.h
+++ b/ydb/core/tx/columnshard/engines/defs.h
@@ -16,6 +16,24 @@ struct TSnapshot {
ui64 PlanStep{0};
ui64 TxId{0};
+ ui64 GetPlanStep() const {
+ return PlanStep;
+ }
+
+ ui64 GetTxId() const {
+ return TxId;
+ }
+
+ TSnapshot& SetPlanStep(const ui64 value) {
+ PlanStep = value;
+ return *this;
+ }
+
+ TSnapshot& SetTxId(const ui64 value) {
+ TxId = value;
+ return *this;
+ }
+
constexpr bool IsZero() const noexcept {
return PlanStep == 0 && TxId == 0;
}
@@ -27,7 +45,7 @@ struct TSnapshot {
constexpr auto operator <=> (const TSnapshot&) const noexcept = default;
static constexpr TSnapshot Max() noexcept {
- return TSnapshot{(ui64)-1ll, (ui64)-1ll};
+ return TSnapshot().SetPlanStep(-1ll).SetTxId(-1ll);
}
friend IOutputStream& operator << (IOutputStream& out, const TSnapshot& s) {
diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp
index 4310c0eee31..2e04d3defc9 100644
--- a/ydb/core/tx/columnshard/engines/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/filter.cpp
@@ -11,12 +11,10 @@ class TSnapshotGetter {
private:
const arrow::UInt64Array::value_type* RawSteps;
const arrow::UInt64Array::value_type* RawIds;
- const ui64 PlanStep;
- const ui64 TxId;
+ const TSnapshot Snapshot;
public:
- TSnapshotGetter(std::shared_ptr<arrow::Array> steps, std::shared_ptr<arrow::Array> ids, const ui64 planStep, const ui64 txId)
- : PlanStep(planStep)
- , TxId(txId)
+ TSnapshotGetter(std::shared_ptr<arrow::Array> steps, std::shared_ptr<arrow::Array> ids, const TSnapshot& snapshot)
+ : Snapshot(snapshot)
{
Y_VERIFY(steps);
Y_VERIFY(ids);
@@ -28,20 +26,20 @@ public:
}
bool operator[](const ui32 idx) const {
- return SnapLessOrEqual(RawSteps[idx], RawIds[idx], PlanStep, TxId);
+ return SnapLessOrEqual(RawSteps[idx], RawIds[idx], Snapshot.GetPlanStep(), Snapshot.GetTxId());
}
};
NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& snapSchema,
- ui64 planStep, ui64 txId) {
+ const TSnapshot& snapshot) {
Y_VERIFY(batch);
Y_VERIFY(snapSchema);
Y_VERIFY(snapSchema->num_fields() == 2);
auto steps = batch->GetColumnByName(snapSchema->fields()[0]->name());
auto ids = batch->GetColumnByName(snapSchema->fields()[1]->name());
NArrow::TColumnFilter result;
- TSnapshotGetter getter(steps, ids, planStep, txId);
+ TSnapshotGetter getter(steps, ids, snapshot);
result.Reset(steps->length(), std::move(getter));
return result;
}
@@ -82,9 +80,9 @@ NArrow::TColumnFilter MakeReplaceFilterLastWins(const std::shared_ptr<arrow::Rec
NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata) {
Y_VERIFY(portion);
NArrow::TColumnFilter result = readMetadata.GetPKRangesFilter().BuildFilter(portion);
- if (readMetadata.PlanStep) {
+ if (readMetadata.GetSnapshot().GetPlanStep()) {
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
- result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.PlanStep, readMetadata.TxId));
+ result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot()));
}
return result;
diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h
index 45dc4ab596c..f9628fe90bc 100644
--- a/ydb/core/tx/columnshard/engines/filter.h
+++ b/ydb/core/tx/columnshard/engines/filter.h
@@ -1,5 +1,6 @@
#pragma once
+#include "defs.h"
#include <ydb/core/formats/arrow/program.h>
#include <ydb/core/formats/arrow/replace_key.h>
@@ -7,7 +8,7 @@ namespace NKikimr::NOlap {
NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& snapSchema,
- ui64 planStep, ui64 txId);
+ const TSnapshot& snapshot);
NArrow::TColumnFilter MakeReplaceFilter(const std::shared_ptr<arrow::RecordBatch>& batch, THashSet<NArrow::TReplaceKey>& keys);
NArrow::TColumnFilter MakeReplaceFilterLastWins(const std::shared_ptr<arrow::RecordBatch>& batch, THashSet<NArrow::TReplaceKey>& keys);
diff --git a/ydb/core/tx/columnshard/engines/index_logic.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index e02dd2ebb6d..eed53c1896b 100644
--- a/ydb/core/tx/columnshard/engines/index_logic.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -1,4 +1,4 @@
-#include "index_logic.h"
+#include "index_logic_logs.h"
#include <span>
diff --git a/ydb/core/tx/columnshard/engines/index_logic.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h
index 86158ca4cfc..86158ca4cfc 100644
--- a/ydb/core/tx/columnshard/engines/index_logic.h
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index b85513d74c6..31ebe70139f 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -128,13 +128,11 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::
}
void TIndexedReadData::InitRead(ui32 inputBatch) {
- Y_VERIFY(ReadMetadata->BlobSchema);
- Y_VERIFY(ReadMetadata->LoadSchema);
- Y_VERIFY(ReadMetadata->ResultSchema);
- Y_VERIFY(IndexInfo().GetSortingKey());
- Y_VERIFY(IndexInfo().GetIndexKey() && IndexInfo().GetIndexKey()->num_fields());
+ auto& indexInfo = ReadMetadata->GetIndexInfo();
+ Y_VERIFY(indexInfo.GetSortingKey());
+ Y_VERIFY(indexInfo.GetIndexKey() && indexInfo.GetIndexKey()->num_fields());
- SortReplaceDescription = IndexInfo().SortReplaceDescription();
+ SortReplaceDescription = indexInfo.SortReplaceDescription();
NotIndexed.resize(inputBatch);
@@ -156,7 +154,7 @@ void TIndexedReadData::InitRead(ui32 inputBatch) {
stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size();
stats->IndexBatches = ReadMetadata->NumIndexedBlobs();
stats->CommittedBatches = ReadMetadata->CommittedBlobs.size();
- stats->SchemaColumns = ReadMetadata->LoadSchema->num_fields();
+ stats->SchemaColumns = ReadMetadata->GetSchemaColumnsCount();
stats->FilterColumns = GranulesContext->GetEarlyFilterColumns().size();
stats->AdditionalColumns = GranulesContext->GetPostFilterColumns().size();
stats->PortionsBytes = portionsBytes;
@@ -190,8 +188,9 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
// Extract columns (without check), filter, attach snapshot, extract columns with check
// (do not filter snapshot columns)
+ auto loadSchema = ReadMetadata->GetLoadSchema(TSnapshot().SetPlanStep(planStep).SetTxId(txId));
- auto batch = NArrow::ExtractExistedColumns(srcBatch, ReadMetadata->LoadSchema);
+ auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema);
Y_VERIFY(batch);
auto filter = FilterNotIndexed(batch, *ReadMetadata);
@@ -203,7 +202,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, planStep, txId);
Y_VERIFY(preparedBatch);
- preparedBatch = NArrow::ExtractColumns(preparedBatch, ReadMetadata->LoadSchema);
+ preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema);
Y_VERIFY(preparedBatch);
filter.Apply(preparedBatch);
@@ -212,6 +211,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxRowsInBatch) {
Y_VERIFY(SortReplaceDescription);
+ auto& indexInfo = ReadMetadata->GetIndexInfo();
if (NotIndexed.size() != ReadyNotIndexed) {
// Wait till we have all not indexed data so we could replace keys in granules
@@ -228,10 +228,10 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
// committed data before the first granule would be placed in fake preceding granule
// committed data after the last granule would be placed into the last granule
- marksGranules.MakePrecedingMark(IndexInfo());
+ marksGranules.MakePrecedingMark(indexInfo);
Y_VERIFY(!marksGranules.Empty());
- auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, IndexInfo());
+ auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, indexInfo);
GranulesContext->DrainNotIndexedBatches(&outNotIndexed);
Y_VERIFY(outNotIndexed.size() <= 1);
if (outNotIndexed.size() == 1) {
@@ -254,7 +254,7 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
auto out = MakeResult(ReadyToOut(), maxRowsInBatch);
if (requireResult && out.empty()) {
out.push_back(TPartialReadResult{
- .ResultBatch = NArrow::MakeEmptyBatch(ReadMetadata->ResultSchema)
+ .ResultBatch = NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema())
});
}
return out;
@@ -265,6 +265,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
Y_VERIFY(SortReplaceDescription);
Y_VERIFY(GranulesContext);
+ auto& indexInfo = ReadMetadata->GetIndexInfo();
std::vector<NIndexedReader::TGranule*> ready = GranulesContext->DetachReadyInOrder();
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out;
out.reserve(ready.size() + 1);
@@ -289,7 +290,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey));
}
#if 1 // optimization
- auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, granule->GetBatchesToDedup());
+ auto deduped = SpecialMergeSorted(inGranule, indexInfo, SortReplaceDescription, granule->GetBatchesToDedup());
out.emplace_back(std::move(deduped));
#else
out.push_back({});
@@ -313,7 +314,8 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
std::shared_ptr<arrow::RecordBatch>
TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const {
Y_VERIFY(ReadMetadata->IsSorted());
- Y_VERIFY(IndexInfo().GetSortingKey());
+ auto& indexInfo = ReadMetadata->GetIndexInfo();
+ Y_VERIFY(indexInfo.GetSortingKey());
{
const auto pred = [](const std::shared_ptr<arrow::RecordBatch>& b) {
@@ -327,7 +329,6 @@ TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch
}
// We could merge data here only if backpressure limits committed data size. KIKIMR-12520
- auto& indexInfo = IndexInfo();
auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription());
Y_VERIFY(merged);
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey()));
@@ -380,6 +381,7 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
int64_t maxRowsInBatch) const {
Y_VERIFY(ReadMetadata->IsSorted());
Y_VERIFY(SortReplaceDescription);
+ auto& indexInfo = ReadMetadata->GetIndexInfo();
TVector<TPartialReadResult> out;
@@ -413,7 +415,7 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
}
for (auto& batch : batches) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, IndexInfo().GetReplaceKey(), isDesc));
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey(), isDesc));
if (batch->num_rows() == 0) {
Y_VERIFY_DEBUG(false, "Unexpected empty batch");
@@ -421,11 +423,11 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
}
// Extract the last row's PK
- auto keyBatch = NArrow::ExtractColumns(batch, IndexInfo().GetReplaceKey());
+ auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
auto lastKey = keyBatch->Slice(keyBatch->num_rows()-1, 1);
// Leave only requested columns
- auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->ResultSchema);
+ auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema());
out.emplace_back(TPartialReadResult{
.ResultBatch = std::move(resultBatch),
.LastReadKey = std::move(lastKey)
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 94a605e461f..ad22e964339 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -66,7 +66,7 @@ public:
TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
void AddNotIndexed(ui32 batchNo, TString blob, ui64 planStep, ui64 txId) {
- auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->BlobSchema);
+ auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(TSnapshot().SetPlanStep(planStep).SetTxId(txId)));
AddNotIndexed(batchNo, batch, planStep, txId);
}
@@ -93,10 +93,6 @@ public:
}
private:
- const TIndexInfo& IndexInfo() const {
- return ReadMetadata->IndexInfo;
- }
-
std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const;
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index dd52019d249..8f364f92ec4 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -9,6 +9,53 @@
namespace NKikimr::NOlap {
+class ISnapshotSchema {
+public:
+ using TPtr = std::shared_ptr<ISnapshotSchema>;
+
+ virtual ~ISnapshotSchema() {}
+ virtual int GetFieldIndex(const ui32 columnId) const = 0;
+ virtual std::shared_ptr<arrow::Field> GetField(const int index) const = 0;
+ virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0;
+ virtual const TIndexInfo& GetIndexInfo() const = 0;
+ virtual const TSnapshot& GetSnapshot() const = 0;
+};
+
+class TSnapshotSchema : public ISnapshotSchema {
+ TIndexInfo IndexInfo;
+ std::shared_ptr<arrow::Schema> Schema;
+ TSnapshot Snapshot;
+public:
+ TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot)
+ : IndexInfo(std::move(indexInfo))
+ , Schema(IndexInfo.ArrowSchemaWithSpecials())
+ , Snapshot(snapshot)
+ {
+ }
+
+ int GetFieldIndex(const ui32 columnId) const override {
+ TString columnName = IndexInfo.GetColumnName(columnId);
+ std::string name(columnName.data(), columnName.size());
+ return Schema->GetFieldIndex(name);
+ }
+
+ std::shared_ptr<arrow::Field> GetField(const int index) const override {
+ return Schema->field(index);
+ }
+
+ const std::shared_ptr<arrow::Schema>& GetSchema() const override {
+ return Schema;
+ }
+
+ const TIndexInfo& GetIndexInfo() const override {
+ return IndexInfo;
+ }
+
+ const TSnapshot& GetSnapshot() const override {
+ return Snapshot;
+ }
+};
+
struct TPortionMeta {
// NOTE: These values are persisted in LocalDB so they must be stable
enum EProduced : ui32 {
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 0cfbcf5d4c4..f6efac5e725 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -28,7 +28,8 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard
Y_VERIFY(WaitIndexed.empty());
Y_VERIFY(PortionInfo->Produced());
Y_VERIFY(!FetchedInfo.GetFilteredBatch());
- auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data, CurrentColumnIds);
+ auto loadSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot());
+ auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->GetIndexInfo(), loadSchema, Data, CurrentColumnIds);
Data.clear();
if (!FetchedInfo.GetFilter()) {
return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata,
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index 217e46911fb..8ad6a8b871a 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -23,11 +23,12 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
NonSortableBatches.emplace_back(batch);
}
+ auto& indexInfo = Owner->GetReadMetadata()->GetIndexInfo();
if (!batchInfo.IsDuplicationsAvailable()) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false));
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey(), false));
} else {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion_on_ready");
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false));
+ Y_VERIFY_DEBUG(NArrow::IsSorted(batch, indexInfo.GetReplaceKey(), false));
Y_VERIFY(IsDuplicationsAvailable());
BatchesToDedup.insert(batch.get());
}
@@ -67,9 +68,10 @@ std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata:
batches.emplace_back(&i);
}
const int reverseKff = reverse ? -1 : 0;
- const auto pred = [reverseKff, readMetadata](const TBatch* l, const TBatch* r) {
+ auto& indexInfo = readMetadata->GetIndexInfo();
+ const auto pred = [reverseKff, indexInfo](const TBatch* l, const TBatch* r) {
if (l->IsSortableInGranule() && r->IsSortableInGranule()) {
- return l->GetPortionInfo().CompareMinByPk(r->GetPortionInfo(), readMetadata->IndexInfo) * reverseKff < 0;
+ return l->GetPortionInfo().CompareMinByPk(r->GetPortionInfo(), indexInfo) * reverseKff < 0;
} else if (l->IsSortableInGranule()) {
return false;
} else if (r->IsSortableInGranule()) {
@@ -85,7 +87,7 @@ std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata:
auto& l = *batches[i];
auto& r = *batches[i + 1];
Y_VERIFY(r.IsSortableInGranule());
- Y_VERIFY(l.GetPortionInfo().CompareSelfMaxItemMinByPk(r.GetPortionInfo(), readMetadata->IndexInfo) * reverseKff <= 0);
+ Y_VERIFY(l.GetPortionInfo().CompareSelfMaxItemMinByPk(r.GetPortionInfo(), indexInfo) * reverseKff <= 0);
nonCompactedSerial = false;
} else {
Y_VERIFY(nonCompactedSerial);
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
index 7ba4e4c2507..1e2beba67e6 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
@@ -109,7 +109,7 @@ std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64,
TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
:TBase(readMetadata)
- , MergeStream(readMetadata->IndexInfo.GetReplaceKey(), readMetadata->IsDescSorted())
+ , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), readMetadata->IsDescSorted())
{
CurrentItemsLimit = ReadMetadata->Limit;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index ef197c7ca55..6f43e2e95d3 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -5,25 +5,145 @@
namespace NKikimr::NOlap {
+TDataStorageAccessor::TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertTable>& insertTable,
+ const std::unique_ptr<NOlap::IColumnEngine>& index,
+ const NColumnShard::TBatchCache& batchCache)
+ : InsertTable(insertTable)
+ , Index(index)
+ , BatchCache(batchCache)
+{}
+
+std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const {
+ if (readDescription.ReadNothing) {
+ return std::make_shared<NOlap::TSelectInfo>();
+ }
+ return Index->Select(readDescription.PathId,
+ {readDescription.PlanStep, readDescription.TxId},
+ columnIds,
+ readDescription.PKRangesFilter);
+}
+
+std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const {
+ return std::move(InsertTable->Read(readDescription.PathId, readDescription.PlanStep, readDescription.TxId));
+}
+
+std::shared_ptr<arrow::RecordBatch> TDataStorageAccessor::GetCachedBatch(const TUnifiedBlobId& blobId) const {
+ return BatchCache.Get(blobId);
+}
+
std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const {
return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters);
}
+bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor, std::string& error) {
+ auto& indexInfo = ResultIndexSchema->GetIndexInfo();
+
+ TVector<ui32> resultColumnsIds;
+ if (readDescription.ColumnIds.size()) {
+ resultColumnsIds = readDescription.ColumnIds;
+ } else if (readDescription.ColumnNames.size()) {
+ resultColumnsIds = indexInfo.GetColumnIds(readDescription.ColumnNames);
+ } else {
+ error = "Empty column list requested";
+ return false;
+ }
+ ResultColumnsIds.swap(resultColumnsIds);
+
+ if (!GetResultSchema()) {
+ error = "Could not get ResultSchema.";
+ return false;
+ }
+
+ SetPKRangesFilter(readDescription.PKRangesFilter);
+
+ /// @note We could have column name changes between schema versions:
+ /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads.
+ /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}.
+ /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones.
+ /// It's not possible for blobs with several columns. There should be a special logic for them.
+ {
+ Y_VERIFY(!ResultColumnsIds.empty(), "Empty column list");
+ THashSet<TString> requiredColumns = indexInfo.GetRequiredColumns();
+
+ // Snapshot columns
+ requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_PLAN_STEP);
+ requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_TX_ID);
+
+ for (auto&& i : readDescription.PKRangesFilter.GetColumnNames()) {
+ requiredColumns.emplace(i);
+ }
+
+ for (auto& col : ResultColumnsIds) {
+ requiredColumns.erase(indexInfo.GetColumnName(col));
+ }
+
+ TVector<ui32> auxiliaryColumns;
+ auxiliaryColumns.reserve(requiredColumns.size());
+ for (auto& reqCol : requiredColumns) {
+ auxiliaryColumns.push_back(indexInfo.GetColumnId(reqCol));
+ }
+ AllColumns.reserve(AllColumns.size() + ResultColumnsIds.size() + auxiliaryColumns.size());
+ AllColumns.insert(AllColumns.end(), ResultColumnsIds.begin(), ResultColumnsIds.end());
+ AllColumns.insert(AllColumns.end(), auxiliaryColumns.begin(), auxiliaryColumns.end());
+ }
+
+ CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription);
+ for (auto& cmt : CommittedBlobs) {
+ if (auto batch = dataAccessor.GetCachedBatch(cmt.BlobId)) {
+ CommittedBatches.emplace(cmt.BlobId, batch);
+ }
+ }
+
+ auto loadSchema = GetLoadSchema(Snapshot);
+ if (!loadSchema) {
+ return false;
+ }
+
+ THashSet<ui32> columnIds;
+ for (auto& field : loadSchema->fields()) {
+ TString column(field->name().data(), field->name().size());
+ columnIds.insert(indexInfo.GetColumnId(column));
+ }
+
+ Program = readDescription.Program;
+ if (Program) {
+ for (auto& [id, name] : Program->SourceColumns) {
+ columnIds.insert(id);
+ }
+ }
+
+ SelectInfo = dataAccessor.Select(readDescription, columnIds);
+ return true;
+}
+
std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
- std::set<ui32> result = GetPKRangesFilter().GetColumnIds(IndexInfo);
+ auto& indexInfo = ResultIndexSchema->GetIndexInfo();
+ std::set<ui32> result = GetPKRangesFilter().GetColumnIds(indexInfo);
+ if (LessPredicate) {
+ for (auto&& i : LessPredicate->ColumnNames()) {
+ result.emplace(indexInfo.GetColumnId(i));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
+ }
+ }
+ if (GreaterPredicate) {
+ for (auto&& i : GreaterPredicate->ColumnNames()) {
+ result.emplace(indexInfo.GetColumnId(i));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
+ }
+ }
if (Program) {
for (auto&& i : Program->GetEarlyFilterColumns()) {
- auto id = IndexInfo.GetColumnIdOptional(i);
+ auto id = indexInfo.GetColumnIdOptional(i);
if (id) {
result.emplace(*id);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
}
- if (PlanStep) {
+ if (Snapshot.GetPlanStep()) {
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
for (auto&& i : snapSchema->fields()) {
- result.emplace(IndexInfo.GetColumnId(i->name()));
+ result.emplace(indexInfo.GetColumnId(i->name()));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name());
}
}
@@ -32,27 +152,26 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
std::set<ui32> TReadMetadata::GetPKColumnIds() const {
std::set<ui32> result;
- for (auto&& i : IndexInfo.GetPrimaryKey()) {
- Y_VERIFY(result.emplace(IndexInfo.GetColumnId(i.first)).second);
+ auto& indexInfo = ResultIndexSchema->GetIndexInfo();
+ for (auto&& i : indexInfo.GetPrimaryKey()) {
+ Y_VERIFY(result.emplace(indexInfo.GetColumnId(i.first)).second);
}
return result;
}
std::set<ui32> TReadMetadata::GetUsedColumnIds() const {
std::set<ui32> result;
- if (PlanStep) {
+ auto& indexInfo = ResultIndexSchema->GetIndexInfo();
+ if (Snapshot.GetPlanStep()) {
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
for (auto&& i : snapSchema->fields()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name());
- result.emplace(IndexInfo.GetColumnId(i->name()));
+ result.emplace(indexInfo.GetColumnId(i->name()));
}
}
- for (auto&& f : LoadSchema->fields()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name());
- result.emplace(IndexInfo.GetColumnId(f->name()));
- }
- for (auto&& i : IndexInfo.GetPrimaryKey()) {
- Y_VERIFY(result.contains(IndexInfo.GetColumnId(i.first)));
+ result.insert(AllColumns.begin(), AllColumns.end());
+ for (auto&& i : indexInfo.GetPrimaryKey()) {
+ Y_VERIFY(result.contains(indexInfo.GetColumnId(i.first)));
}
return result;
}
@@ -90,13 +209,14 @@ void TReadStats::PrintToLog() {
}
NIndexedReader::IOrderPolicy::TPtr TReadMetadata::BuildSortingPolicy() const {
- if (Limit && IsSorted() && IndexInfo.IsSorted() && IndexInfo.GetSortingKey()->num_fields()) {
+ auto& indexInfo = ResultIndexSchema->GetIndexInfo();
+ if (Limit && IsSorted() && indexInfo.IsSorted() && indexInfo.GetSortingKey()->num_fields()) {
ui32 idx = 0;
- for (auto&& i : IndexInfo.GetPrimaryKey()) {
- if (idx >= IndexInfo.GetSortingKey()->fields().size()) {
+ for (auto&& i : indexInfo.GetPrimaryKey()) {
+ if (idx >= indexInfo.GetSortingKey()->fields().size()) {
break;
}
- if (IndexInfo.GetSortingKey()->fields()[idx]->name() != i.first) {
+ if (indexInfo.GetSortingKey()->fields()[idx]->name() != i.first) {
return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this());
}
++idx;
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index fec567b55dd..843bb4486c2 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -1,9 +1,11 @@
#pragma once
#include "conveyor_task.h"
+#include "description.h"
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/counters.h>
#include <ydb/core/tx/columnshard/columnshard__scan.h>
+#include <ydb/core/tx/columnshard/columnshard_common.h>
#include <ydb/core/tx/columnshard/engines/predicate/predicate.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/scheme_types/scheme_type_info.h>
@@ -49,6 +51,21 @@ struct TReadStats {
}
};
+class TDataStorageAccessor {
+private:
+ const std::unique_ptr<NOlap::TInsertTable>& InsertTable;
+ const std::unique_ptr<NOlap::IColumnEngine>& Index;
+ const NColumnShard::TBatchCache& BatchCache;
+
+public:
+ TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertTable>& insertTable,
+ const std::unique_ptr<NOlap::IColumnEngine>& index,
+ const NColumnShard::TBatchCache& batchCache);
+ std::shared_ptr<NOlap::TSelectInfo> Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const;
+ std::vector<NOlap::TCommittedBlob> GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const;
+ std::shared_ptr<arrow::RecordBatch> GetCachedBatch(const TUnifiedBlobId& blobId) const;
+};
+
// Holds all metadata that is needed to perform read/scan
struct TReadMetadataBase {
public:
@@ -81,9 +98,8 @@ public:
}
virtual ~TReadMetadataBase() = default;
- std::shared_ptr<arrow::Schema> BlobSchema;
- std::shared_ptr<arrow::Schema> LoadSchema; // ResultSchema + required for intermediate operations
- std::shared_ptr<arrow::Schema> ResultSchema; // TODO: add Program modifications
+ std::shared_ptr<NOlap::TPredicate> LessPredicate;
+ std::shared_ptr<NOlap::TPredicate> GreaterPredicate;
std::shared_ptr<NSsa::TProgram> Program;
std::shared_ptr<const THashSet<TUnifiedBlobId>> ExternBlobs;
ui64 Limit{0}; // TODO
@@ -110,30 +126,66 @@ public:
// Holds all metadata that is needed to perform read/scan
struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadMetadata> {
-private:
using TBase = TReadMetadataBase;
+private:
+ TVersionedIndex IndexVersions;
+ TSnapshot Snapshot;
+ std::shared_ptr<ISnapshotSchema> ResultIndexSchema;
+ TVector<ui32> AllColumns;
+ TVector<ui32> ResultColumnsIds;
public:
using TConstPtr = std::shared_ptr<const TReadMetadata>;
- TIndexInfo IndexInfo;
- ui64 PlanStep = 0;
- ui64 TxId = 0;
std::shared_ptr<TSelectInfo> SelectInfo;
std::vector<TCommittedBlob> CommittedBlobs;
THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches;
std::shared_ptr<TReadStats> ReadStats;
+ const TSnapshot& GetSnapshot() const {
+ return Snapshot;
+ }
+
std::shared_ptr<NIndexedReader::IOrderPolicy> BuildSortingPolicy() const;
- TReadMetadata(const TIndexInfo& info, const ESorting sorting)
+ TReadMetadata(const TVersionedIndex& info, const TSnapshot& snapshot, const ESorting sorting)
: TBase(sorting)
- , IndexInfo(info)
- , ReadStats(std::make_shared<TReadStats>(info.GetId()))
- {}
+ , IndexVersions(info)
+ , Snapshot(snapshot)
+ , ResultIndexSchema(info.GetSchema(Snapshot))
+ , ReadStats(std::make_shared<TReadStats>(info.GetLastSchema()->GetIndexInfo().GetId()))
+ {
+ }
+
+ bool Init(const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor, std::string& error);
+
+ ui64 GetSchemaColumnsCount() const {
+ return AllColumns.size();
+ }
+
+ std::shared_ptr<arrow::Schema> GetLoadSchema(const TSnapshot& version) const {
+ const auto& indexInfo = IndexVersions.GetSchema(version)->GetIndexInfo();
+ return indexInfo.ArrowSchema(AllColumns, true);
+ }
+
+ std::shared_ptr<arrow::Schema> GetBlobSchema(const TSnapshot& version) const {
+ return IndexVersions.GetSchema(version)->GetIndexInfo().ArrowSchema();
+ }
+
+ std::shared_ptr<arrow::Schema> GetResultSchema() const {
+ return ResultIndexSchema->GetIndexInfo().ArrowSchema(ResultColumnsIds);
+ }
+
+ const TIndexInfo& GetIndexInfo(const std::optional<TSnapshot>& version = {}) const {
+ if (version && version < Snapshot) {
+ return IndexVersions.GetSchema(*version)->GetIndexInfo();
+ }
+ return ResultIndexSchema->GetIndexInfo();
+ }
std::vector<std::string> GetColumnsOrder() const {
+ auto loadSchema = GetLoadSchema(Snapshot);
std::vector<std::string> result;
- for (auto&& i : LoadSchema->fields()) {
+ for (auto&& i : loadSchema->fields()) {
result.emplace_back(i->name());
}
return result;
@@ -149,25 +201,28 @@ public:
}
std::shared_ptr<arrow::Schema> GetSortingKey() const {
- return IndexInfo.GetSortingKey();
+ return ResultIndexSchema->GetIndexInfo().GetSortingKey();
}
std::shared_ptr<arrow::Schema> GetReplaceKey() const {
- return IndexInfo.GetReplaceKey();
+ return ResultIndexSchema->GetIndexInfo().GetReplaceKey();
}
TVector<TNameTypeInfo> GetResultYqlSchema() const override {
+ auto& indexInfo = ResultIndexSchema->GetIndexInfo();
+ auto resultSchema = GetResultSchema();
+ Y_VERIFY(resultSchema);
TVector<NTable::TTag> columnIds;
- columnIds.reserve(ResultSchema->num_fields());
- for (const auto& field: ResultSchema->fields()) {
+ columnIds.reserve(resultSchema->num_fields());
+ for (const auto& field: resultSchema->fields()) {
TString name = TStringBuilder() << field->name();
- columnIds.emplace_back(IndexInfo.GetColumnId(name));
+ columnIds.emplace_back(indexInfo.GetColumnId(name));
}
- return IndexInfo.GetColumns(columnIds);
+ return indexInfo.GetColumns(columnIds);
}
TVector<TNameTypeInfo> GetKeyYqlSchema() const override {
- return IndexInfo.GetPrimaryKey();
+ return ResultIndexSchema->GetIndexInfo().GetPrimaryKey();
}
size_t NumIndexedRecords() const {
@@ -183,12 +238,12 @@ public:
std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
void Dump(IOutputStream& out) const override {
- out << "columns: " << (LoadSchema ? LoadSchema->num_fields() : 0)
+ out << "columns: " << GetSchemaColumnsCount()
<< " index records: " << NumIndexedRecords()
<< " index blobs: " << NumIndexedBlobs()
<< " committed blobs: " << CommittedBlobs.size()
<< " with program steps: " << (Program ? Program->Steps.size() : 0)
- << " at snapshot: " << PlanStep << ":" << TxId;
+ << " at snapshot: " << Snapshot.GetPlanStep() << ":" << Snapshot.GetTxId();
TBase::Dump(out);
if (SelectInfo) {
out << ", " << *SelectInfo;
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index e351c6c5001..71c9ce8956e 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -1,7 +1,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include "column_engine_logs.h"
-#include "index_logic.h"
#include "predicate/predicate.h"
+#include "index_logic_logs.h"
namespace NKikimr {
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index b5af4486629..69cd682b6fd 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -1,6 +1,6 @@
#include "columnshard_impl.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
-#include <ydb/core/tx/columnshard/engines/index_logic.h>
+#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
#include "blob_cache.h"
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index 29f574f1805..2c45bb7e562 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -1,6 +1,6 @@
#include "columnshard_impl.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
-#include <ydb/core/tx/columnshard/engines/index_logic.h>
+#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
#include "blob_cache.h"
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 562458d4dd7..99f4d17485a 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -270,14 +270,15 @@ private:
mutable TString SerializedSchema;
TString GetSerializedSchema(const std::shared_ptr<arrow::RecordBatch>& batch) const {
- Y_VERIFY(ReadMetadata->ResultSchema);
+ auto resultSchema = ReadMetadata->GetResultSchema();
+ Y_VERIFY(resultSchema);
// TODO: make real ResultSchema with SSA effects
- if (ReadMetadata->ResultSchema->Equals(batch->schema())) {
+ if (resultSchema->Equals(batch->schema())) {
if (!SerializedSchema.empty()) {
return SerializedSchema;
}
- SerializedSchema = NArrow::SerializeSchema(*ReadMetadata->ResultSchema);
+ SerializedSchema = NArrow::SerializeSchema(*resultSchema);
return SerializedSchema;
}