aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-01-17 18:36:02 +0300
committerGitHub <noreply@github.com>2024-01-17 18:36:02 +0300
commita854215ff2665425d20536c7b7f597dc1143f858 (patch)
tree927f7167bfb714f78cd724ddce09a60449eae78a
parent5fa4b2f5ec340934d41d70a0ebc24375f12e92d8 (diff)
downloadydb-a854215ff2665425d20536c7b7f597dc1143f858.tar.gz
general steps for data fetching (#1088)
-rw-r--r--ydb/core/formats/arrow/arrow_filter.cpp6
-rw-r--r--ydb/core/formats/arrow/arrow_filter.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp26
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h32
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h32
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp44
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h34
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp49
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h78
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp137
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/context.h65
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h110
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp71
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h174
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp231
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h76
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp145
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.h95
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_context.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h8
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h3
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.h2
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp1
35 files changed, 690 insertions, 823 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp
index e09f6d221e..bdfb8d0355 100644
--- a/ydb/core/formats/arrow/arrow_filter.cpp
+++ b/ydb/core/formats/arrow/arrow_filter.cpp
@@ -323,7 +323,7 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
}
if (filter.IsTotalDenyFilter()) {
batch = batch->Slice(0, 0);
- return false;
+ return true;
}
if (filter.IsTotalAllowFilter()) {
return true;
@@ -343,11 +343,11 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
return false;
}
-bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
+bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl<arrow::Datum::TABLE>(*this, batch, startPos, count);
}
-bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
+bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch, startPos, count);
}
diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h
index 309467b5be..29dc8471c5 100644
--- a/ydb/core/formats/arrow/arrow_filter.h
+++ b/ydb/core/formats/arrow/arrow_filter.h
@@ -171,8 +171,8 @@ public:
// It makes a filter using composite predicate
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);
- bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {});
- bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {});
+ bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
+ bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;
// Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions)
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp
index dbd0d638f6..000b8e7e63 100644
--- a/ydb/core/tx/columnshard/columnshard__read_base.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp
@@ -45,6 +45,7 @@ bool TTxReadBase::ParseProgram(NKikimrSchemeOp::EOlapProgramType programType,
AFL_VERIFY(namesChecker.emplace(names.back()).second);
}
NOlap::TProgramContainer container;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "overriden_columns")("columns", JoinSeq(",", names));
container.OverrideProcessingColumns(std::vector<TString>(names.begin(), names.end()));
read.SetProgram(std::move(container));
return true;
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 114881eb4e..e4c76ab6cc 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -118,7 +118,7 @@ public:
ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId()));
ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
- std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false,
+ std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool,
ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
@@ -367,7 +367,7 @@ private:
return Finish();
}
- auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
+ auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
}
@@ -921,29 +921,17 @@ public:
Results.emplace_back(std::move(res));
}
- void FillResult(std::vector<TPartialReadResult>& result, const bool mergePartsToMax) const {
+ void FillResult(std::vector<TPartialReadResult>& result) const {
if (Results.empty()) {
return;
}
- if (mergePartsToMax) {
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>> guards;
- for (auto&& i : Results) {
- batches.emplace_back(i.GetResultBatchPtrVerified());
- guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end());
- }
- auto res = NArrow::CombineBatches(batches);
- AFL_VERIFY(res);
- result.emplace_back(TPartialReadResult(guards, NArrow::TShardedRecordBatch(res), Results.back().GetLastReadKey()));
- } else {
- for (auto&& i : Results) {
- result.emplace_back(std::move(i));
- }
+ for (auto&& i : Results) {
+ result.emplace_back(std::move(i));
}
}
};
-std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) {
+std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult) {
std::vector<TCurrentBatch> resultBatches;
TCurrentBatch currentBatch;
for (auto&& i : resultsExt) {
@@ -960,7 +948,7 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults
std::vector<TPartialReadResult> result;
for (auto&& i : resultBatches) {
- i.FillResult(result, mergePartsToMax);
+ i.FillResult(result);
}
return result;
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index 5c20f2a5bd..2c6dd9c52a 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -47,7 +47,7 @@ public:
return ResultBatch.GetRecordsCount();
}
- static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax);
+ static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult);
const NArrow::TShardedRecordBatch& GetShardedBatch() const {
return ResultBatch;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 00e4fd205a..2ba089bb80 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -97,7 +97,7 @@ public:
TSerializationStats GetSerializationStat(const ISnapshotSchema& schema) const {
TSerializationStats result;
for (auto&& i : Records) {
- if (schema.GetFieldByColumnId(i.ColumnId)) {
+ if (schema.GetFieldByColumnIdOptional(i.ColumnId)) {
result.AddStat(i.GetSerializationStat(schema.GetFieldByColumnIdVerified(i.ColumnId)->name()));
}
}
@@ -151,8 +151,7 @@ public:
: PathId(pathId)
, Portion(portionId)
, MinSnapshot(minSnapshot)
- , BlobsOperator(blobsOperator)
- {
+ , BlobsOperator(blobsOperator) {
}
TString DebugString(const bool withDetails = false) const;
@@ -399,22 +398,30 @@ public:
public:
TAssembleBlobInfo(const ui32 rowsCount)
: NullRowsCount(rowsCount) {
-
+ AFL_VERIFY(NullRowsCount);
}
TAssembleBlobInfo(const TString& data)
: Data(data) {
-
+ AFL_VERIFY(!!Data);
}
ui32 GetNullRowsCount() const noexcept {
- return NullRowsCount;
+ return NullRowsCount;
}
const TString& GetData() const noexcept {
return Data;
}
+ bool IsBlob() const {
+ return !NullRowsCount && !!Data;
+ }
+
+ bool IsNull() const {
+ return NullRowsCount && !Data;
+ }
+
std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const;
};
@@ -437,8 +444,7 @@ public:
TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader)
: Loader(loader)
- , Blobs(std::move(blobs))
- {
+ , Blobs(std::move(blobs)) {
Y_ABORT_UNLESS(Loader);
Y_ABORT_UNLESS(Loader->GetExpectedSchema()->num_fields() == 1);
}
@@ -505,8 +511,7 @@ public:
TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount)
: Columns(std::move(columns))
, Schema(schema)
- , RowsCount(rowsCount)
- {
+ , RowsCount(rowsCount) {
}
std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const;
@@ -525,8 +530,7 @@ public:
: ColumnId(resultLoader->GetColumnId())
, NumRows(numRows)
, DataLoader(dataLoader)
- , ResultLoader(resultLoader)
- {
+ , ResultLoader(resultLoader) {
AFL_VERIFY(ResultLoader);
if (DataLoader) {
AFL_VERIFY(ResultLoader->GetColumnId() == DataLoader->GetColumnId());
@@ -598,8 +602,8 @@ public:
}
std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema,
- const ISnapshotSchema& resultSchema,
- THashMap<TBlobRange, TString>& data) const {
+ const ISnapshotSchema& resultSchema,
+ THashMap<TBlobRange, TString>& data) const {
auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble();
Y_ABORT_UNLESS(batch->Validate().ok());
return batch;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
index 6e73043a44..0d05fe444c 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
@@ -12,6 +12,12 @@ TString TColumnsSet::DebugString() const {
}
NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsSet& external) const {
+ if (external.IsEmpty()) {
+ return *this;
+ }
+ if (IsEmpty()) {
+ return external;
+ }
TColumnsSet result = *this;
for (auto&& i : external.ColumnIds) {
result.ColumnIds.erase(i);
@@ -28,6 +34,12 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS
}
NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsSet& external) const {
+ if (external.IsEmpty()) {
+ return *this;
+ }
+ if (IsEmpty()) {
+ return external;
+ }
TColumnsSet result = *this;
result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end());
auto fields = result.Schema->fields();
@@ -42,7 +54,7 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsS
}
bool TColumnsSet::ColumnsOnly(const std::vector<std::string>& fieldNames) const {
- if (fieldNames.size() != GetSize()) {
+ if (fieldNames.size() != GetColumnsCount()) {
return false;
}
std::set<std::string> fieldNamesSet;
@@ -64,11 +76,7 @@ void TColumnsSet::Rebuild() {
ColumnNamesVector.emplace_back(i);
ColumnNames.emplace(i);
}
- if (ColumnIds.size()) {
- FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds);
- } else {
- FilteredSchema = FullReadSchema;
- }
+ FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
index 5c6eaecb05..9b50c4cfaa 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
@@ -18,12 +18,19 @@ private:
public:
TColumnsSet() = default;
+ bool IsEmpty() const {
+ return ColumnIds.empty();
+ }
+
+ bool operator!() const {
+ return IsEmpty();
+ }
const std::vector<TString>& GetColumnNamesVector() const {
return ColumnNamesVector;
}
- ui32 GetSize() const {
+ ui32 GetColumnsCount() const {
return ColumnIds.size();
}
@@ -96,27 +103,4 @@ public:
TColumnsSet operator-(const TColumnsSet& external) const;
};
-class TFetchingPlan {
-private:
- YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FilterStage);
- YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FetchingStage);
- bool CanUseEarlyFilterImmediatelyFlag = false;
-public:
- TFetchingPlan(const std::shared_ptr<TColumnsSet>& filterStage, const std::shared_ptr<TColumnsSet>& fetchingStage, const bool canUseEarlyFilterImmediately)
- : FilterStage(filterStage)
- , FetchingStage(fetchingStage)
- , CanUseEarlyFilterImmediatelyFlag(canUseEarlyFilterImmediately) {
-
- }
-
- TString DebugString() const {
- return TStringBuilder() << "{filter=" << (FilterStage ? FilterStage->DebugString() : "NO") << ";fetching=" <<
- (FetchingStage ? FetchingStage->DebugString() : "NO") << ";use_filter=" << CanUseEarlyFilterImmediatelyFlag << "}";
- }
-
- bool CanUseEarlyFilterImmediately() const {
- return CanUseEarlyFilterImmediatelyFlag;
- }
-};
-
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp
deleted file mode 100644
index 8ae01b74e1..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-#include "committed_assembler.h"
-#include "plain_read_data.h"
-
-namespace NKikimr::NOlap::NPlainReader {
-
-bool TCommittedAssembler::DoExecute() {
- ResultBatch = NArrow::DeserializeBatch(BlobData, ReadMetadata->GetBlobSchema(SchemaVersion));
- Y_ABORT_UNLESS(ResultBatch);
- ResultBatch = ReadMetadata->GetIndexInfo().AddSpecialColumns(ResultBatch, DataSnapshot);
- Y_ABORT_UNLESS(ResultBatch);
- ReadMetadata->GetPKRangesFilter().BuildFilter(ResultBatch).Apply(ResultBatch);
- auto t = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({ResultBatch}));
- EarlyFilter = ReadMetadata->GetProgram().ApplyEarlyFilter(t, false);
- ResultBatch = NArrow::ToBatch(t, true);
- return true;
-}
-
-bool TCommittedAssembler::DoApply(IDataReader& /*owner*/) const {
- if (Source->GetFetchingPlan().GetFilterStage()->GetSchema()) {
- Source->InitFilterStageData(nullptr, EarlyFilter, NArrow::ExtractColumns(ResultBatch, Source->GetFetchingPlan().GetFilterStage()->GetSchema(), true), Source);
- } else {
- Source->InitFilterStageData(nullptr, EarlyFilter, nullptr, Source);
- }
- if (Source->GetFetchingPlan().GetFetchingStage()->GetSchema()) {
- Source->InitFetchStageData(NArrow::ExtractColumns(ResultBatch, Source->GetFetchingPlan().GetFetchingStage()->GetSchema(), true));
- } else {
- Source->InitFetchStageData(nullptr);
- }
- return true;
-}
-
-TCommittedAssembler::TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const std::shared_ptr<IDataSource>& source,
- const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard)
- : TBase(scanActorId)
- , BlobData(blobData)
- , ReadMetadata(readMetadata)
- , Source(source)
- , SchemaVersion(cBlob.GetSchemaVersion())
- , DataSnapshot(cBlob.GetSnapshot())
- , TaskGuard(std::move(taskGuard))
-{
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h
deleted file mode 100644
index eafcc342d9..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h
+++ /dev/null
@@ -1,34 +0,0 @@
-#pragma once
-#include "source.h"
-#include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h>
-#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
-#include <ydb/core/tx/columnshard/counters/scan.h>
-#include <ydb/core/formats/arrow/arrow_filter.h>
-
-namespace NKikimr::NOlap::NPlainReader {
-class TCommittedAssembler: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TCommittedAssembler, true> {
-private:
- using TBase = NColumnShard::IDataTasksProcessor::ITask;
- TString BlobData;
- TReadMetadata::TConstPtr ReadMetadata;
- const std::shared_ptr<IDataSource> Source;
- ui64 SchemaVersion;
- TSnapshot DataSnapshot;
-
- std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
- std::shared_ptr<arrow::RecordBatch> ResultBatch;
- const NColumnShard::TCounterGuard TaskGuard;
-protected:
- virtual bool DoExecute() override;
- virtual bool DoApply(IDataReader& owner) const override;
-public:
- virtual TString GetTaskClassIdentifier() const override {
- return "PlainReader::TCommittedAssembler";
- }
-
- TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const std::shared_ptr<IDataSource>& source,
- const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard);
-};
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
index 9c06905261..a4ecce5de3 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
@@ -1,51 +1,20 @@
#include "constructor.h"
-#include "filter_assembler.h"
-#include "column_assembler.h"
-#include "committed_assembler.h"
-#include <ydb/core/tx/columnshard/engines/reader/read_context.h>
-#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
-#include <ydb/core/tx/conveyor/usage/events.h>
#include <ydb/core/tx/conveyor/usage/service.h>
namespace NKikimr::NOlap::NPlainReader {
-THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> TAssembleColumnsTaskConstructor::BuildBatchAssembler() {
- auto blobs = ExtractBlobsData();
- THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
- for (auto&& i : blobs) {
- blobsDataAssemble.emplace(i.first, i.second);
- }
- for (auto&& i : NullBlocks) {
- AFL_VERIFY(blobsDataAssemble.emplace(i.first, i.second).second);
- }
- return blobsDataAssemble;
-}
-
-void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
- auto task = std::make_shared<TAssembleFilter>(Context, PortionInfo, Source, Columns, UseEarlyFilter, BuildBatchAssembler());
- task->SetPriority(NConveyor::ITask::EPriority::Normal);
- NConveyor::TScanServiceOperator::SendTaskToExecute(task);
-}
-
-void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
- auto task = std::make_shared<TAssembleFFBatch>(Context, PortionInfo, Source, Columns, BuildBatchAssembler(), AppliedFilter);
- task->SetPriority(NConveyor::ITask::EPriority::High);
- NConveyor::TScanServiceOperator::SendTaskToExecute(task);
-}
-
-void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
- auto blobs = ExtractBlobsData();
- Y_ABORT_UNLESS(NullBlocks.size() == 0);
- Y_ABORT_UNLESS(blobs.size() == 1);
- auto task = std::make_shared<TCommittedAssembler>(Context->GetCommonContext()->GetScanActorId(), blobs.begin()->second,
- Context->GetReadMetadata(), Source, CommittedBlob, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard());
- task->SetPriority(NConveyor::ITask::EPriority::High);
+void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
+ Source->MutableStageData().AddBlobs(ExtractBlobsData());
+ AFL_VERIFY(Step->GetNextStep());
+ auto task = std::make_shared<TStepAction>(Source, Step->GetNextStep(), Context->GetCommonContext()->GetScanActorId());
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
-bool IFetchTaskConstructor::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())("status_code", status.GetStatus());
- NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(), std::make_unique<NConveyor::TEvExecution::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
+bool TBlobsFetcherTask::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())
+ ("status", status.GetErrorMessage())("status_code", status.GetStatus());
+ NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(),
+ std::make_unique<NConveyor::TEvExecution::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
return false;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
index 54105ee81e..3b5ceca520 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
@@ -8,87 +8,25 @@
namespace NKikimr::NOlap::NPlainReader {
-class IFetchTaskConstructor: public NBlobOperations::NRead::ITask {
+class TBlobsFetcherTask: public NBlobOperations::NRead::ITask {
private:
using TBase = NBlobOperations::NRead::ITask;
-protected:
const std::shared_ptr<IDataSource> Source;
+ const std::shared_ptr<IFetchingStep> Step;
const std::shared_ptr<TSpecialReadContext> Context;
- THashMap<TBlobRange, ui32> NullBlocks;
- NColumnShard::TCounterGuard TasksGuard;
+
+ virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override;
public:
- IFetchTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer)
- : TBase(readActions, taskCustomer)
+ TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<IDataSource>& sourcePtr,
+ const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId)
+ : TBase(readActions, taskCustomer, externalTaskId)
, Source(sourcePtr)
+ , Step(step)
, Context(context)
- , NullBlocks(std::move(nullBlocks))
- , TasksGuard(context->GetCommonContext()->GetCounters().GetReadTasksGuard())
- {
- }
-};
-
-class TCommittedColumnsTaskConstructor: public IFetchTaskConstructor {
-private:
- TCommittedBlob CommittedBlob;
- using TBase = IFetchTaskConstructor;
-protected:
- virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
-public:
- TCommittedColumnsTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const TCommittedDataSource& source, const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer)
- : TBase(context, readActions, std::move(nullBlocks), sourcePtr, taskCustomer)
- , CommittedBlob(source.GetCommitted())
- {
-
- }
-};
-
-class TAssembleColumnsTaskConstructor: public IFetchTaskConstructor {
-private:
- using TBase = IFetchTaskConstructor;
-protected:
- std::shared_ptr<TColumnsSet> Columns;
- std::shared_ptr<TPortionInfo> PortionInfo;
- THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> BuildBatchAssembler();
-public:
- TAssembleColumnsTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const std::shared_ptr<TColumnsSet>& columns, const TPortionDataSource& portion, const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer)
- : TBase(context, readActions, std::move(nullBlocks), sourcePtr, taskCustomer)
- , Columns(columns)
- , PortionInfo(portion.GetPortionInfoPtr())
{
}
};
-class TFFColumnsTaskConstructor: public TAssembleColumnsTaskConstructor {
-private:
- using TBase = TAssembleColumnsTaskConstructor;
- std::shared_ptr<NArrow::TColumnFilter> AppliedFilter;
- virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
-public:
- TFFColumnsTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const std::shared_ptr<TColumnsSet>& columns, const TPortionDataSource& portion, const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer)
- : TBase(context, readActions, std::move(nullBlocks), columns, portion, sourcePtr, taskCustomer)
- , AppliedFilter(portion.GetFilterStageData().GetAppliedFilter())
- {
- }
-};
-
-class TEFTaskConstructor: public TAssembleColumnsTaskConstructor {
-private:
- bool UseEarlyFilter = false;
- using TBase = TAssembleColumnsTaskConstructor;
- virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
-public:
- TEFTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const std::shared_ptr<TColumnsSet>& columns, const TPortionDataSource& portion, const std::shared_ptr<IDataSource>& sourcePtr, const bool useEarlyFilter, const TString& taskCustomer)
- : TBase(context, readActions, std::move(nullBlocks), columns, portion, sourcePtr, taskCustomer)
- , UseEarlyFilter(useEarlyFilter)
- {
- }
-};
-
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
index 35a90e520b..78a5da19a2 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
@@ -8,14 +8,145 @@ std::shared_ptr<NKikimr::NOlap::NIndexedReader::TMergePartialStream> TSpecialRea
}
ui64 TSpecialReadContext::GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive) {
- auto fetchingPlan = GetColumnsFetchingPlan(isExclusive);
ui64 result = 0;
for (auto&& i : sources) {
+ auto fetchingPlan = GetColumnsFetchingPlan(i.second, isExclusive);
AFL_VERIFY(i.second->GetIntervalsCount());
- result += i.second->GetRawBytes(fetchingPlan.GetFilterStage()->GetColumnIds()) / i.second->GetIntervalsCount();
- result += i.second->GetRawBytes(fetchingPlan.GetFetchingStage()->GetColumnIds()) / i.second->GetIntervalsCount();
+ result += fetchingPlan->PredictRawBytes(i.second) / i.second->GetIntervalsCount();
}
return result;
}
+std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const {
+ const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax();
+ auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0];
+ if (!result) {
+ return std::make_shared<TBuildFakeSpec>(source->GetRecordsCount(), "fake");
+ }
+ return result;
+}
+
+std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const {
+ if (!EFColumns->GetColumnsCount()) {
+ TColumnsSet columnsFetch = *FFColumns;
+ if (needSnapshots) {
+ columnsFetch = columnsFetch + *SpecColumns;
+ }
+ if (!exclusiveSource) {
+ columnsFetch = columnsFetch + *PKColumns + *SpecColumns;
+ }
+ if (columnsFetch.GetColumnsCount()) {
+ std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "simple");
+ std::shared_ptr<IFetchingStep> current = result;
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
+ return result;
+ } else {
+ return nullptr;
+ }
+ } else if (exclusiveSource) {
+ TColumnsSet columnsFetch = *EFColumns;
+ if (needSnapshots || FFColumns->Contains(SpecColumns)) {
+ columnsFetch = columnsFetch + *SpecColumns;
+ }
+ AFL_VERIFY(columnsFetch.GetColumnsCount());
+ std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef");
+ std::shared_ptr<IFetchingStep> current = result;
+
+ if (needSnapshots || FFColumns->Contains(SpecColumns)) {
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(SpecColumns));
+ current = current->AttachNext(std::make_shared<TSnapshotFilter>());
+ columnsFetch = columnsFetch - *SpecColumns;
+ }
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
+ if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
+ current = current->AttachNext(std::make_shared<TPredicateFilter>());
+ }
+ for (auto&& i : ReadMetadata->GetProgram().GetSteps()) {
+ if (!i->IsFilterOnly()) {
+ break;
+ }
+ current = current->AttachNext(std::make_shared<TFilterProgramStep>(i));
+ }
+ const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns;
+ if (columnsAdditionalFetch.GetColumnsCount()) {
+ current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
+ }
+ return result;
+ } else {
+ TColumnsSet columnsFetch = *MergeColumns + *EFColumns;
+ AFL_VERIFY(columnsFetch.GetColumnsCount());
+ std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "full");
+ std::shared_ptr<IFetchingStep> current = result;
+
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(SpecColumns));
+ if (needSnapshots) {
+ current = current->AttachNext(std::make_shared<TSnapshotFilter>());
+ }
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(PKColumns));
+ if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
+ current = current->AttachNext(std::make_shared<TPredicateFilter>());
+ }
+ const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns;
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetchEF)));
+ for (auto&& i : ReadMetadata->GetProgram().GetSteps()) {
+ if (!i->IsFilterOnly()) {
+ break;
+ }
+ current = current->AttachNext(std::make_shared<TFilterProgramStep>(i));
+ }
+ const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns;
+ if (columnsAdditionalFetch.GetColumnsCount()) {
+ current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
+ }
+ return result;
+ }
+}
+
+TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
+ : CommonContext(commonContext)
+{
+ ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata());
+ Y_ABORT_UNLESS(ReadMetadata);
+ Y_ABORT_UNLESS(ReadMetadata->SelectInfo);
+
+ auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot());
+ SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema);
+ {
+ auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
+ if (efColumns.size()) {
+ EFColumns = std::make_shared<TColumnsSet>(efColumns, ReadMetadata->GetIndexInfo(), readSchema);
+ } else {
+ EFColumns = std::make_shared<TColumnsSet>();
+ }
+ }
+ if (ReadMetadata->HasProcessingColumnIds()) {
+ FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), ReadMetadata->GetIndexInfo(), readSchema);
+ if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) {
+ FFColumns = std::make_shared<TColumnsSet>(*EFColumns + *SpecColumns);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString());
+ } else {
+ AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_first", FFColumns->DebugString());
+ }
+ } else {
+ FFColumns = EFColumns;
+ }
+ if (FFColumns->IsEmpty()) {
+ ProgramInputColumns = SpecColumns;
+ } else {
+ ProgramInputColumns = FFColumns;
+ }
+
+ PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), ReadMetadata->GetIndexInfo(), readSchema);
+ MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);
+
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
+ CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false);
+ CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true);
+ CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false);
+ CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
index 2c6f213143..3fe05f23d8 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
@@ -1,5 +1,6 @@
#pragma once
#include "columns_set.h"
+#include "fetching.h"
#include <ydb/core/tx/columnshard/engines/reader/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/read_filter_merger.h>
@@ -23,8 +24,8 @@ private:
std::shared_ptr<TColumnsSet> PKFFColumns;
std::shared_ptr<TColumnsSet> EFPKColumns;
std::shared_ptr<TColumnsSet> FFMinusEFColumns;
- std::shared_ptr<TColumnsSet> FFMinusEFPKColumns;
- bool TrivialEFFlag = false;
+ std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const;
+ std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2> CacheFetchingScripts;
public:
ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive);
@@ -43,65 +44,9 @@ public:
;
}
- TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
- : CommonContext(commonContext)
- {
- ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata());
- Y_ABORT_UNLESS(ReadMetadata);
- Y_ABORT_UNLESS(ReadMetadata->SelectInfo);
+ TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext);
- auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot());
- SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema);
- {
- auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
- if (efColumns.size()) {
- EFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetEarlyFilterColumnIds(), ReadMetadata->GetIndexInfo(), readSchema);
- } else {
- EFColumns = SpecColumns;
- }
- }
- *EFColumns = *EFColumns + *SpecColumns;
- if (ReadMetadata->HasProcessingColumnIds()) {
- FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), ReadMetadata->GetIndexInfo(), readSchema);
- AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString());
- *FFColumns = *FFColumns + *EFColumns;
- } else {
- FFColumns = std::make_shared<TColumnsSet>(*EFColumns);
- }
- ProgramInputColumns = FFColumns;
-
- PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), ReadMetadata->GetIndexInfo(), readSchema);
- MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);
-
- TrivialEFFlag = EFColumns->ColumnsOnly(ReadMetadata->GetIndexInfo().ArrowSchemaSnapshot()->field_names());
-
- PKFFColumns = std::make_shared<TColumnsSet>(*PKColumns + *FFColumns);
- EFPKColumns = std::make_shared<TColumnsSet>(*EFColumns + *PKColumns);
- FFMinusEFColumns = std::make_shared<TColumnsSet>(*FFColumns - *EFColumns);
- FFMinusEFPKColumns = std::make_shared<TColumnsSet>(*FFColumns - *EFColumns - *PKColumns);
-
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
- }
-
- TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const {
- if (CommonContext->GetIsInternalRead()) {
- return TFetchingPlan(PKFFColumns, EmptyColumns, exclusiveSource);
- }
-
- if (exclusiveSource) {
- if (TrivialEFFlag) {
- return TFetchingPlan(FFColumns, EmptyColumns, true);
- } else {
- return TFetchingPlan(EFColumns, FFMinusEFColumns, true);
- }
- } else {
- if (TrivialEFFlag) {
- return TFetchingPlan(PKFFColumns, EmptyColumns, false);
- } else {
- return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false);
- }
- }
- }
+ std::shared_ptr<IFetchingStep> GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const;
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp
index 1c9e0df7ff..d4f1a808b7 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp
@@ -1,5 +1,17 @@
#include "fetched_data.h"
+#include <ydb/core/formats/arrow/simple_arrays_cache.h>
+#include <ydb/core/formats/arrow/common/validation.h>
namespace NKikimr::NOlap {
+void TFetchedData::SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields) {
+ for (auto&& i : fields) {
+ if (Table->GetColumnByName(i->name())) {
+ continue;
+ }
+ Table = NArrow::TStatusValidator::GetValid(Table->AddColumn(Table->num_columns(), i,
+ std::make_shared<arrow::ChunkedArray>(NArrow::TThreadSimpleArraysCache::GetNull(i->type(), Table->num_rows()))));
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
index c07efcaf47..4c29f220d3 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
@@ -1,45 +1,109 @@
#pragma once
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/table.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/actors/core/log.h>
namespace NKikimr::NOlap {
class TFetchedData {
protected:
- YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
+ using TBlobs = THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>;
+ YDB_ACCESSOR_DEF(TBlobs, Blobs);
+ YDB_READONLY_DEF(std::shared_ptr<arrow::Table>, Table);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
+ YDB_READONLY(bool, UseFilter, false);
public:
- TFetchedData(const std::shared_ptr<arrow::RecordBatch>& batch)
- : Batch(batch)
+ TFetchedData(const bool useFilter)
+ : UseFilter(useFilter)
{
+
+ }
+
+ void SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields);
+
+ std::shared_ptr<NArrow::TColumnFilter> GetAppliedFilter() const {
+ return UseFilter ? Filter : nullptr;
+ }
+
+ std::shared_ptr<NArrow::TColumnFilter> GetNotAppliedFilter() const {
+ return UseFilter ? nullptr : Filter;
+ }
+
+ TString ExtractBlob(const TBlobRange& bRange) {
+ auto it = Blobs.find(bRange);
+ AFL_VERIFY(it != Blobs.end());
+ AFL_VERIFY(it->second.IsBlob());
+ auto result = it->second.GetData();
+ Blobs.erase(it);
+ return result;
+ }
+
+ void AddBlobs(THashMap<TBlobRange, TString>&& blobs) {
+ for (auto&& i : blobs) {
+ AFL_VERIFY(Blobs.emplace(i.first, std::move(i.second)).second);
+ }
+ }
+
+ std::shared_ptr<arrow::RecordBatch> GetBatch() const {
+ return NArrow::ToBatch(Table, true);
+ }
+
+ void AddNulls(THashMap<TBlobRange, ui32>&& blobs) {
+ for (auto&& i : blobs) {
+ AFL_VERIFY(Blobs.emplace(i.first, i.second).second);
+ }
}
-};
-class TFilterStageData: public TFetchedData {
-private:
- using TBase = TFetchedData;
- YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, AppliedFilter);
- YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedEarlyFilter);
-public:
bool IsEmptyFilter() const {
- return (AppliedFilter && AppliedFilter->IsTotalDenyFilter()) || (NotAppliedEarlyFilter && NotAppliedEarlyFilter->IsTotalDenyFilter());
+ return Filter && Filter->IsTotalDenyFilter();
}
- TFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch)
- : TBase(batch)
- , AppliedFilter(appliedFilter)
- , NotAppliedEarlyFilter(earlyFilter)
- {
+ bool IsEmpty() const {
+ return IsEmptyFilter() || (Table && !Table->num_rows());
+ }
+ void AddFilter(const std::shared_ptr<NArrow::TColumnFilter>& filter) {
+ if (UseFilter && Table && filter) {
+ AFL_VERIFY(filter->Apply(Table));
+ }
+ if (!Filter) {
+ Filter = filter;
+ } else if (filter) {
+ *Filter = Filter->CombineSequentialAnd(*filter);
+ }
+ }
+
+ void AddFilter(const NArrow::TColumnFilter& filter) {
+ if (UseFilter && Table) {
+ AFL_VERIFY(filter.Apply(Table));
+ }
+ if (!Filter) {
+ Filter = std::make_shared<NArrow::TColumnFilter>(filter);
+ } else {
+ *Filter = Filter->CombineSequentialAnd(filter);
+ }
+ }
+
+ void AddBatch(const std::shared_ptr<arrow::RecordBatch>& batch) {
+ return AddBatch(arrow::Table::Make(batch->schema(), batch->columns(), batch->num_rows()));
+ }
+
+ void AddBatch(const std::shared_ptr<arrow::Table>& table) {
+ auto tableLocal = table;
+ if (Filter && UseFilter) {
+ AFL_VERIFY(Filter->Apply(tableLocal));
+ }
+ if (!Table) {
+ Table = tableLocal;
+ } else {
+ AFL_VERIFY(NArrow::MergeBatchColumns({Table, tableLocal}, Table));
+ }
}
-};
-class TFetchStageData: public TFetchedData {
-private:
- using TBase = TFetchedData;
-public:
- using TBase::TBase;
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp
new file mode 100644
index 0000000000..3058b64781
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp
@@ -0,0 +1,71 @@
+#include "fetching.h"
+#include "source.h"
+#include <ydb/core/tx/columnshard/engines/filter.h>
+#include <ydb/core/formats/arrow/simple_arrays_cache.h>
+
+namespace NKikimr::NOlap::NPlainReader {
+
+bool TStepAction::DoApply(IDataReader& /*owner*/) const {
+ if (FinishedFlag) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply");
+ Source->SetIsReady();
+ }
+ return true;
+}
+
+bool TStepAction::DoExecute() {
+ while (Step) {
+ if (!Step->ExecuteInplace(Source, Step)) {
+ return true;
+ }
+ if (Source->IsEmptyData()) {
+ FinishedFlag = true;
+ return true;
+ }
+ Step = Step->GetNextStep();
+ }
+ FinishedFlag = true;
+ return true;
+}
+
+bool TBlobsFetchingStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const {
+ return !source->StartFetchingColumns(source, step, Columns);
+}
+
+ui64 TBlobsFetchingStep::PredictRawBytes(const std::shared_ptr<IDataSource>& source) const {
+ return source->GetRawBytes(Columns->GetColumnIds());
+}
+
+bool TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const {
+ source->AssembleColumns(Columns);
+ return true;
+}
+
+bool TFilterProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const {
+ auto filter = Step->BuildFilter(source->GetStageData().GetTable());
+ source->MutableStageData().AddFilter(filter);
+ return true;
+}
+
+bool TPredicateFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const {
+ auto filter = source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(source->GetStageData().GetTable());
+ source->MutableStageData().AddFilter(filter);
+ return true;
+}
+
+bool TSnapshotFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const {
+ auto filter = MakeSnapshotFilter(source->GetStageData().GetTable(), source->GetContext()->GetReadMetadata()->GetSnapshot());
+ source->MutableStageData().AddFilter(filter);
+ return true;
+}
+
+bool TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ for (auto&& f : TIndexInfo::ArrowSchemaSnapshot()->fields()) {
+ columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), std::make_shared<arrow::UInt64Scalar>(0), Count));
+ }
+ source->MutableStageData().AddBatch(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns));
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h
new file mode 100644
index 0000000000..297863392a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h
@@ -0,0 +1,174 @@
+#pragma once
+#include "columns_set.h"
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
+#include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h>
+#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
+
+namespace NKikimr::NOlap::NPlainReader {
+class IDataSource;
+
+class IFetchingStep {
+private:
+ std::shared_ptr<IFetchingStep> NextStep;
+ YDB_READONLY_DEF(TString, Name);
+ YDB_READONLY(ui32, Index, 0);
+ YDB_READONLY_DEF(TString, BranchName);
+protected:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const = 0;
+ virtual TString DoDebugString() const {
+ return "";
+ }
+public:
+ virtual ~IFetchingStep() = default;
+
+ std::shared_ptr<IFetchingStep> AttachNext(const std::shared_ptr<IFetchingStep>& nextStep) {
+ AFL_VERIFY(nextStep);
+ NextStep = nextStep;
+ nextStep->Index = Index + 1;
+ nextStep->BranchName = BranchName;
+ return nextStep;
+ }
+
+ virtual ui64 PredictRawBytes(const std::shared_ptr<IDataSource>& /*source*/) const {
+ return 0;
+ }
+
+ bool ExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", DebugString())("scan_step_idx", GetIndex());
+ return DoExecuteInplace(source, step);
+ }
+
+ const std::shared_ptr<IFetchingStep>& GetNextStep() const {
+ return NextStep;
+ }
+
+ IFetchingStep(const TString& name, const TString& branchName = Default<TString>())
+ : Name(name)
+ , BranchName(branchName)
+ {
+
+ }
+
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "name=" << Name << ";" << DoDebugString() << ";branch=" << BranchName << ";";
+ if (NextStep) {
+ sb << "next=" << NextStep->DebugString() << ";";
+ }
+ return sb;
+ }
+};
+
+class TStepAction: public NColumnShard::IDataTasksProcessor::ITask {
+private:
+ using TBase = NColumnShard::IDataTasksProcessor::ITask;
+ std::shared_ptr<IDataSource> Source;
+ std::shared_ptr<IFetchingStep> Step;
+ bool FinishedFlag = false;
+protected:
+ virtual bool DoApply(IDataReader& /*owner*/) const override;
+ virtual bool DoExecute() override;
+public:
+ virtual TString GetTaskClassIdentifier() const override {
+ return "STEP_ACTION";
+ }
+
+ TStepAction(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step, const NActors::TActorId& ownerActorId)
+ : TBase(ownerActorId)
+ , Source(source)
+ , Step(step)
+ {
+
+ }
+};
+
+class TBuildFakeSpec: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ const ui32 Count = 0;
+protected:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const override;
+public:
+ TBuildFakeSpec(const ui32 count, const TString& nameBranch = "")
+ : TBase("FAKE_SPEC", nameBranch)
+ , Count(count)
+ {
+ AFL_VERIFY(Count);
+ }
+};
+
+class TBlobsFetchingStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ std::shared_ptr<TColumnsSet> Columns;
+protected:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override;
+ virtual ui64 PredictRawBytes(const std::shared_ptr<IDataSource>& source) const override;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
+ }
+public:
+ TBlobsFetchingStep(const std::shared_ptr<TColumnsSet>& columns, const TString& nameBranch = "")
+ : TBase("FETCHING", nameBranch)
+ , Columns(columns)
+ {
+ AFL_VERIFY(Columns);
+ }
+};
+
+class TAssemblerStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
+ }
+public:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const override;
+ TAssemblerStep(const std::shared_ptr<TColumnsSet>& columns)
+ : TBase("ASSEMBLER")
+ , Columns(columns)
+ {
+ AFL_VERIFY(Columns);
+ }
+};
+
+class TFilterProgramStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ std::shared_ptr<NSsa::TProgramStep> Step;
+public:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override;
+ TFilterProgramStep(const std::shared_ptr<NSsa::TProgramStep>& step)
+ : TBase("PROGRAM")
+ , Step(step)
+ {
+
+ }
+};
+
+class TPredicateFilter: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+public:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override;
+ TPredicateFilter()
+ : TBase("PREDICATE") {
+
+ }
+};
+
+class TSnapshotFilter: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+public:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override;
+ TSnapshotFilter()
+ : TBase("SNAPSHOT") {
+
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
deleted file mode 100644
index 7b12b1ebf0..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
+++ /dev/null
@@ -1,231 +0,0 @@
-#include "filter_assembler.h"
-#include "plain_read_data.h"
-#include <ydb/core/formats/arrow/arrow_filter.h>
-#include <ydb/core/tx/columnshard/engines/filter.h>
-
-namespace NKikimr::NOlap::NPlainReader {
-
-class TFilterContext {
-private:
- TPortionInfo::TPreparedBatchData BatchAssembler;
- YDB_ACCESSOR_DEF(std::shared_ptr<TSpecialReadContext>, Context);
- YDB_ACCESSOR_DEF(std::set<ui32>, ReadyColumnIds);
- std::optional<ui32> OriginalCount;
- YDB_READONLY(std::shared_ptr<NArrow::TColumnFilter>, AppliedFilter, std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter()));
- YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, EarlyFilter);
- YDB_READONLY_DEF(std::shared_ptr<arrow::Table>, ResultTable);
-public:
- TFilterContext(TPortionInfo::TPreparedBatchData&& batchAssembler, const std::shared_ptr<TSpecialReadContext>& context)
- : BatchAssembler(std::move(batchAssembler))
- , Context(context)
- {
-
- }
-
- ui32 GetOriginalCountVerified() const {
- AFL_VERIFY(OriginalCount);
- return *OriginalCount;
- }
-
- std::shared_ptr<arrow::Table> AppendToResult(const std::set<ui32>& columnIds, const std::optional<std::shared_ptr<arrow::Scalar>> constantFill = {}) {
- TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- options.IncludedColumnIds = columnIds;
- for (auto it = options.IncludedColumnIds->begin(); it != options.IncludedColumnIds->end();) {
- if (!ReadyColumnIds.emplace(*it).second) {
- it = options.IncludedColumnIds->erase(it);
- } else {
- ++it;
- }
- }
- if (options.IncludedColumnIds->empty()) {
- AFL_VERIFY(ResultTable);
- return ResultTable;
- }
- if (constantFill) {
- for (auto&& i : *options.IncludedColumnIds) {
- options.ConstantColumnIds.emplace(i, *constantFill);
- }
- }
- std::shared_ptr<arrow::Table> table = BatchAssembler.AssembleTable(options);
- AFL_VERIFY(table);
- if (!OriginalCount) {
- OriginalCount = table->num_rows();
- }
- AFL_VERIFY(AppliedFilter->Apply(table));
- if (!ResultTable) {
- ResultTable = table;
- } else {
- AFL_VERIFY(NArrow::MergeBatchColumns({ResultTable, table}, ResultTable));
- }
- return ResultTable;
- }
-
- void ApplyFilter(const std::shared_ptr<NArrow::TColumnFilter>& filter, const bool useFilter) {
- if (useFilter) {
- filter->Apply(ResultTable);
- *AppliedFilter = AppliedFilter->CombineSequentialAnd(*filter);
- } else if (EarlyFilter) {
- *EarlyFilter = EarlyFilter->And(*filter);
- } else {
- EarlyFilter = filter;
- }
- }
-};
-
-class IFilterConstructor {
-private:
- const std::set<ui32> ColumnIds;
- const std::optional<std::shared_ptr<arrow::Scalar>> ConstantFill;
-protected:
- virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& filterContext, const std::shared_ptr<arrow::Table>& data) const = 0;
-public:
- IFilterConstructor(const std::set<ui32>& columnIds, const std::optional<std::shared_ptr<arrow::Scalar>> constantFill = {})
- : ColumnIds(columnIds)
- , ConstantFill(constantFill)
- {
- AFL_VERIFY(ColumnIds.size());
- }
-
- virtual ~IFilterConstructor() = default;
-
- void Execute(TFilterContext& filterContext, const bool useFilter) {
- auto result = filterContext.AppendToResult(ColumnIds, ConstantFill);
- auto localFilter = BuildFilter(filterContext, result);
- AFL_VERIFY(!!localFilter);
- filterContext.ApplyFilter(localFilter, useFilter);
- }
-};
-
-class TPredicateFilter: public IFilterConstructor {
-private:
- using TBase = IFilterConstructor;
-protected:
- virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& filterContext, const std::shared_ptr<arrow::Table>& data) const override {
- return std::make_shared<NArrow::TColumnFilter>(filterContext.GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(data));
- }
-public:
- TPredicateFilter(const std::shared_ptr<TSpecialReadContext>& ctx)
- : TBase(ctx->GetReadMetadata()->GetPKRangesFilter().GetColumnIds(ctx->GetReadMetadata()->GetIndexInfo())) {
-
- }
-};
-
-class TRestoreMergeData: public IFilterConstructor {
-private:
- using TBase = IFilterConstructor;
-protected:
- virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr<arrow::Table>& /*data*/) const override {
- return std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter());
- }
-public:
- TRestoreMergeData(const std::shared_ptr<TSpecialReadContext>& ctx)
- : TBase(ctx->GetMergeColumns()->GetColumnIds()) {
-
- }
-};
-
-class TFakeSnapshotData: public IFilterConstructor {
-private:
- using TBase = IFilterConstructor;
-protected:
- virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr<arrow::Table>& /*data*/) const override {
- return std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter());
- }
-public:
- TFakeSnapshotData(const std::shared_ptr<TSpecialReadContext>& ctx)
- : TBase(ctx->GetSpecColumns()->GetColumnIds(), std::make_shared<arrow::UInt64Scalar>(0)) {
-
- }
-};
-
-class TSnapshotFilter: public IFilterConstructor {
-private:
- using TBase = IFilterConstructor;
-protected:
- virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& filterContext, const std::shared_ptr<arrow::Table>& data) const override {
- return std::make_shared<NArrow::TColumnFilter>(MakeSnapshotFilter(data, filterContext.GetContext()->GetReadMetadata()->GetSnapshot()));
- }
-public:
- TSnapshotFilter(const std::shared_ptr<TSpecialReadContext>& ctx)
- : TBase(ctx->GetSpecColumns()->GetColumnIds()) {
-
- }
-};
-
-class TProgramStepFilter: public IFilterConstructor {
-private:
- using TBase = IFilterConstructor;
- std::shared_ptr<NSsa::TProgramStep> Step;
-protected:
- virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr<arrow::Table>& data) const override {
- return Step->BuildFilter(data);
- }
-public:
- TProgramStepFilter(const std::shared_ptr<NSsa::TProgramStep>& step)
- : TBase(step->GetFilterOriginalColumnIds())
- , Step(step)
- {
- }
-};
-
-bool TAssembleFilter::DoExecute() {
- std::vector<std::shared_ptr<IFilterConstructor>> filters;
- if (!UseFilter) {
- filters.emplace_back(std::make_shared<TRestoreMergeData>(Context));
- }
- if (FilterColumns->GetColumnIds().contains((ui32)TIndexInfo::ESpecialColumn::PLAN_STEP)) {
- const bool needSnapshotsFilter = ReadMetadata->GetSnapshot() < RecordsMaxSnapshot;
- if (needSnapshotsFilter) {
- filters.emplace_back(std::make_shared<TSnapshotFilter>(Context));
- } else {
- filters.emplace_back(std::make_shared<TFakeSnapshotData>(Context));
- }
- }
- if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
- filters.emplace_back(std::make_shared<TPredicateFilter>(Context));
- }
-
- for (auto&& i : ReadMetadata->GetProgram().GetSteps()) {
- if (!i->IsFilterOnly()) {
- break;
- }
-
- filters.emplace_back(std::make_shared<TProgramStepFilter>(i));
- }
-
- TFilterContext filterContext(BuildBatchConstructor(FilterColumns->GetFilteredSchemaVerified()), Context);
-
- for (auto&& f : filters) {
- f->Execute(filterContext, UseFilter);
- AFL_VERIFY(filterContext.GetResultTable());
- if (filterContext.GetResultTable()->num_rows() == 0 || (filterContext.GetEarlyFilter() && filterContext.GetEarlyFilter()->IsTotalDenyFilter())) {
- break;
- }
- }
-
- AppliedFilter = filterContext.GetAppliedFilter();
- EarlyFilter = filterContext.GetEarlyFilter();
-
- auto batch = filterContext.GetResultTable();
- if (!batch || batch->num_rows() == 0) {
- return true;
- }
-
- OriginalCount = filterContext.GetOriginalCountVerified();
- auto fullTable = filterContext.AppendToResult(FilterColumns->GetColumnIds());
- AFL_VERIFY(AppliedFilter->IsTotalAllowFilter() || AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")
- ("original_count", OriginalCount)("filtered_count", batch->num_rows())("use_filter", UseFilter)
- ("filter_columns", FilterColumns->GetColumnIds().size())("af_count", AppliedFilter->Size())("ef_count", EarlyFilter ? EarlyFilter->Size() : 0);
-
- FilteredBatch = NArrow::ToBatch(fullTable, true);
- return true;
-}
-
-bool TAssembleFilter::DoApply(IDataReader& /*owner*/) const {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply");
- Source->InitFilterStageData(AppliedFilter, EarlyFilter, FilteredBatch, Source);
- return true;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h
deleted file mode 100644
index 12492a6d8a..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h
+++ /dev/null
@@ -1,76 +0,0 @@
-#pragma once
-#include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
-#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
-#include <ydb/core/formats/arrow/arrow_filter.h>
-#include "source.h"
-#include "columns_set.h"
-
-namespace NKikimr::NOlap::NPlainReader {
-
-class TAssemblerCommon: public NColumnShard::IDataTasksProcessor::ITask {
-private:
- using TBase = NColumnShard::IDataTasksProcessor::ITask;
-protected:
- const std::shared_ptr<TSpecialReadContext> Context;
- const std::shared_ptr<IDataSource> Source;
- const std::shared_ptr<TPortionInfo> PortionInfo;
- const TReadMetadata::TConstPtr ReadMetadata;
- THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Blobs;
-
- TPortionInfo::TPreparedBatchData BuildBatchConstructor(const ISnapshotSchema& resultSchema) {
- auto blobSchema = ReadMetadata->GetLoadSchema(PortionInfo->GetMinSnapshot());
- return PortionInfo->PrepareForAssemble(*blobSchema, resultSchema, Blobs);
- }
-
-public:
- TAssemblerCommon(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo,
- const std::shared_ptr<IDataSource>& source, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs)
- : TBase(context->GetCommonContext()->GetScanActorId())
- , Context(context)
- , Source(source)
- , PortionInfo(portionInfo)
- , ReadMetadata(Context->GetReadMetadata())
- , Blobs(blobs)
- {
- AFL_VERIFY(Blobs.size());
- }
-
-};
-
-class TAssembleFilter: public TAssemblerCommon, public NColumnShard::TMonitoringObjectsCounter<TAssembleFilter, true, true> {
-private:
- using TBase = TAssemblerCommon;
-
- std::shared_ptr<arrow::RecordBatch> FilteredBatch;
- std::shared_ptr<NArrow::TColumnFilter> AppliedFilter;
- std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
- const TSnapshot RecordsMaxSnapshot;
- ui32 OriginalCount = 0;
- std::shared_ptr<TColumnsSet> FilterColumns;
- const bool UseFilter = true;
- const NColumnShard::TCounterGuard TaskGuard;
-protected:
- virtual bool DoApply(IDataReader& owner) const override;
- virtual bool DoExecute() override;
-public:
-
- virtual TString GetTaskClassIdentifier() const override {
- return "PlainReading::TAssembleFilter";
- }
-
- TAssembleFilter(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo,
- const std::shared_ptr<IDataSource>& source, const std::shared_ptr<TColumnsSet>& filterColumns, const bool useFilter, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs)
- : TBase(context, portionInfo, source, std::move(blobs))
- , RecordsMaxSnapshot(PortionInfo->RecordSnapshotMax())
- , FilterColumns(filterColumns)
- , UseFilter(useFilter)
- , TaskGuard(Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard())
- {
- Y_UNUSED(RecordsMaxSnapshot);
- TBase::SetPriority(TBase::EPriority::Normal);
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index 464127ef02..5b55803cd3 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -45,11 +45,7 @@ private:
bool EmptyFiltersOnly() const {
for (auto&& [_, i] : Sources) {
- if (!i->GetFilterStageData().GetBatch() || !i->GetFilterStageData().GetBatch()->num_rows()) {
- continue;
- }
- auto f = i->GetFilterStageData().GetNotAppliedEarlyFilter();
- if (!f || !f->IsTotalDenyFilter()) {
+ if (!i->IsEmptyData()) {
return false;
}
}
@@ -64,7 +60,7 @@ protected:
}
virtual bool DoExecute() override {
if (MergingContext->IsExclusiveInterval()) {
- ResultBatch = Sources.begin()->second->GetBatch();
+ ResultBatch = Sources.begin()->second->GetStageData().GetBatch();
if (ResultBatch && ResultBatch->num_rows()) {
LastPK = Sources.begin()->second->GetLastPK();
ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector());
@@ -85,8 +81,8 @@ protected:
}
std::shared_ptr<NIndexedReader::TMergePartialStream> merger = Context->BuildMerger();
for (auto&& [_, i] : Sources) {
- if (auto rb = i->GetBatch()) {
- merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter());
+ if (auto rb = i->GetStageData().GetBatch()) {
+ merger->AddSource(rb, i->GetStageData().GetNotAppliedFilter());
}
}
AFL_VERIFY(merger->GetSourcesCount() <= Sources.size());
@@ -196,7 +192,7 @@ void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptr<NResourceBro
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("interval_idx", IntervalIdx)("event", "resources_allocated")
("resources", guard->DebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size());
for (auto&& [_, i] : Sources) {
- i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval()), i);
+ i->InitFetchingPlan(Context->GetColumnsFetchingPlan(i, MergingContext->IsExclusiveInterval()), i, MergingContext->IsExclusiveInterval());
}
OnInitResourcesGuard(guard);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
index d84609c623..0720416cdd 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
@@ -52,7 +52,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte
stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size();
- stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetSize();
+ stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
stats->CommittedPortionsBytes = committedPortionsBytes;
stats->InsertedPortionsBytes = insertedPortionsBytes;
stats->CompactedPortionsBytes = compactedPortionsBytes;
@@ -60,10 +60,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte
}
std::vector<NKikimr::NOlap::TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) {
- if ((GetContext().GetIsInternalRead() && ReadyResultsCount < maxRowsInBatch) && !Scanner->IsFinished()) {
- return {};
- }
- auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch, GetContext().GetIsInternalRead());
+ auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
ui32 count = 0;
for (auto&& r: result) {
count += r.GetRecordsCount();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
index b80813b1d8..7611a4488f 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
@@ -131,10 +131,6 @@ bool TScanHead::IsReverse() const {
return GetContext().GetReadMetadata()->IsDescSorted();
}
-NKikimr::NOlap::NPlainReader::TFetchingPlan TScanHead::GetColumnsFetchingPlan(const bool exclusiveSource) const {
- return Context->GetColumnsFetchingPlan(exclusiveSource);
-}
-
void TScanHead::Abort() {
for (auto&& i : FetchingIntervals) {
i.second->Abort();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
index 971cbc654b..8e071e8f54 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
@@ -48,8 +48,6 @@ private:
ui64 ZeroCount = 0;
public:
- TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const;
-
bool IsReverse() const;
void Abort();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index 254c6205b1..e6a408933f 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -1,66 +1,49 @@
#include "source.h"
#include "interval.h"
#include "fetched_data.h"
-#include "constructor.h"
#include "plain_read_data.h"
+#include "constructor.h"
#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
#include <ydb/core/tx/columnshard/blobs_reader/events.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/formats/arrow/simple_arrays_cache.h>
namespace NKikimr::NOlap::NPlainReader {
-void IDataSource::InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& batchExt) {
- auto batch = batchExt;
- if (!batch && FetchingPlan->GetFetchingStage()->GetSize()) {
- const ui32 numRows = GetFilterStageData().GetBatch() ? GetFilterStageData().GetBatch()->num_rows() : 0;
- batch = NArrow::MakeEmptyBatch(FetchingPlan->GetFetchingStage()->GetSchema(), numRows);
- }
- if (batch) {
- Y_ABORT_UNLESS((ui32)batch->num_columns() == FetchingPlan->GetFetchingStage()->GetSize());
- }
- NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchStageData"));
- Y_ABORT_UNLESS(!FetchStageData);
- FetchStageData = std::make_shared<TFetchStageData>(batch);
- for (auto&& i : Intervals) {
- i.second->OnSourceFetchStageReady(GetSourceIdx());
- }
- Intervals.clear();
-}
-
-void IDataSource::InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter,
- const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<IDataSource>& sourcePtr) {
- if (IsAborted()) {
- return;
- }
- NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFilterStageData"));
- Y_ABORT_UNLESS(!FilterStageData);
- FilterStageData = std::make_shared<TFilterStageData>(appliedFilter, earlyFilter, batch);
- if (batch) {
- AFL_VERIFY((ui32)batch->num_columns() == FetchingPlan->GetFilterStage()->GetSize())("batch", batch->schema()->ToString())("filter", FetchingPlan->GetFilterStage()->DebugString());
- }
- DoStartFetchStage(sourcePtr);
-}
-
-void IDataSource::InitFetchingPlan(const TFetchingPlan& fetchingPlan, const std::shared_ptr<IDataSource>& sourcePtr) {
+void IDataSource::InitFetchingPlan(const std::shared_ptr<IFetchingStep>& fetchingFirstStep, const std::shared_ptr<IDataSource>& sourcePtr, const bool isExclusive) {
+ AFL_VERIFY(fetchingFirstStep);
if (AtomicCas(&FilterStageFlag, 1, 0)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingPlan.DebugString());
- Y_ABORT_UNLESS(!FetchingPlan);
- FetchingPlan = fetchingPlan;
+ StageData = std::make_shared<TFetchedData>(isExclusive);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingFirstStep->DebugString())("source_idx", SourceIdx);
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
if (IsAborted()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "InitFetchingPlanAborted");
return;
}
- DoStartFilterStage(sourcePtr);
+ if (fetchingFirstStep->ExecuteInplace(sourcePtr, fetchingFirstStep)) {
+ auto task = std::make_shared<TStepAction>(sourcePtr, fetchingFirstStep->GetNextStep(), Context->GetCommonContext()->GetScanActorId());
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ }
}
}
void IDataSource::RegisterInterval(TFetchingInterval& interval) {
- if (!FetchStageData) {
+ if (!IsReadyFlag) {
AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second);
}
}
+void IDataSource::SetIsReady() {
+ AFL_VERIFY(!IsReadyFlag);
+ IsReadyFlag = true;
+ for (auto&& i : Intervals) {
+ i.second->OnSourceFetchStageReady(SourceIdx);
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "source_ready")("intervals_count", Intervals.size())("source_idx", SourceIdx);
+ Intervals.clear();
+}
+
void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds,
const std::shared_ptr<IBlobsReadingAction>& readingAction, THashMap<TBlobRange, ui32>& nullBlocks,
const std::shared_ptr<NArrow::TColumnFilter>& filter) {
@@ -90,65 +73,59 @@ void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds,
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "chunks_stats")("fetch", fetchedChunks)("null", nullChunks)("reading_action", readingAction->GetStorageId())("columns", columnIds.size());
}
-void TPortionDataSource::DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchEF");
- Y_ABORT_UNLESS(FetchingPlan->GetFilterStage()->GetSize());
- auto& columnIds = FetchingPlan->GetFilterStage()->GetColumnIds();
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "StartFilterStage")("fetching_info", FetchingPlan->DebugString());
+bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr,
+ const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName());
+ Y_ABORT_UNLESS(columns->GetColumnsCount());
+ auto& columnIds = columns->GetColumnIds();
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString());
- auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FILTER");
+ auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::" + step->GetName());
readAction->SetIsBackgroundProcess(false);
- THashMap<TBlobRange, ui32> nullBlocks;
- NeedFetchColumns(columnIds, readAction, nullBlocks, nullptr);
+ {
+ THashMap<TBlobRange, ui32> nullBlocks;
+ NeedFetchColumns(columnIds, readAction, nullBlocks, StageData->GetAppliedFilter());
+ StageData->AddNulls(std::move(nullBlocks));
+ }
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
- auto constructor = std::make_shared<TEFTaskConstructor>(GetContext(), actions, std::move(nullBlocks), FetchingPlan->GetFilterStage(), *this, sourcePtr, FetchingPlan->CanUseEarlyFilterImmediately(), "ReaderFilter");
-// NActors::TActivationContext::AsActorContext().Send(GetContext()->GetCommonContext()->GetReadCoordinatorActorId(), new NOlap::NBlobOperations::NRead::TEvStartReadTask(constructor));
+ auto constructor = std::make_shared<TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), "");
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
+ return true;
}
-void TPortionDataSource::DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) {
- Y_ABORT_UNLESS(!FetchStageData);
- Y_ABORT_UNLESS(FilterStageData);
- if (FetchingPlan->GetFetchingStage()->GetSize() && !FilterStageData->IsEmptyFilter()) {
- auto& columnIds = FetchingPlan->GetFetchingStage()->GetColumnIds();
+void TPortionDataSource::DoAbort() {
+}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "RealStartFetchStage")("fetching_info", FetchingPlan->DebugString());
- auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FETCHING");
- readAction->SetIsBackgroundProcess(false);
- THashMap<TBlobRange, ui32> nullBlocks;
- NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetAppliedFilter());
- if (readAction->GetExpectedBlobsCount()) {
- std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
- auto constructor = std::make_shared<TFFColumnsTaskConstructor>(GetContext(), actions, std::move(nullBlocks), FetchingPlan->GetFetchingStage(), *this, sourcePtr, "ReaderFetcher");
- NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
- return;
- }
- } else {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DontStartFetchStage")("fetching_info", FetchingPlan->DebugString());
+bool TCommittedDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& /*columns*/) {
+ if (ReadStarted) {
+ return false;
}
- InitFetchStageData(nullptr);
-}
+ ReadStarted = true;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString());
-void TPortionDataSource::DoAbort() {
-}
+ std::shared_ptr<IBlobsStorageOperator> storageOperator = GetContext()->GetCommonContext()->GetStoragesManager()->GetInsertOperator();
+ auto readAction = storageOperator->StartReadingAction("CS::READ::" + step->GetName());
-void TCommittedDataSource::DoFetch(const std::shared_ptr<IDataSource>& sourcePtr) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetch");
- if (!ReadStarted) {
- Y_ABORT_UNLESS(!ResultReady);
- ReadStarted = true;
+ readAction->SetIsBackgroundProcess(false);
+ readAction->AddRange(CommittedBlob.GetBlobRange());
- std::shared_ptr<IBlobsStorageOperator> storageOperator = GetContext()->GetCommonContext()->GetStoragesManager()->GetInsertOperator();
- auto readAction = storageOperator->StartReadingAction("CS::READ::COMMITTED");
- readAction->SetIsBackgroundProcess(false);
- readAction->AddRange(CommittedBlob.GetBlobRange());
+ std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
+ auto constructor = std::make_shared<TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), "");
+ NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
+ return true;
+}
- THashMap<TBlobRange, ui32> nullBlocks;
- std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
- auto constructor = std::make_shared<TCommittedColumnsTaskConstructor>(GetContext(), actions, std::move(nullBlocks), *this, sourcePtr, "ReaderCommitted");
- NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
+void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) {
+ if (!GetStageData().GetTable()) {
+ Y_ABORT_UNLESS(GetStageData().GetBlobs().size() == 1);
+ auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
+ auto batch = NArrow::DeserializeBatch(bData, GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion()));
+ Y_ABORT_UNLESS(batch);
+ batch = GetContext()->GetReadMetadata()->GetIndexInfo().AddSpecialColumns(batch, CommittedBlob.GetSnapshot());
+ MutableStageData().AddBatch(batch);
}
+ MutableStageData().SyncTableColumns(columns->GetSchema()->fields());
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
index 693a4353bc..34f6a1fe04 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
@@ -7,6 +7,7 @@
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/insert_table/data.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
namespace NKikimr::NOlap {
@@ -18,6 +19,7 @@ namespace NKikimr::NOlap::NPlainReader {
class TFetchingInterval;
class TPlainReadData;
class IFetchTaskConstructor;
+class IFetchingStep;
class IDataSource {
private:
@@ -25,6 +27,8 @@ private:
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start);
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish);
YDB_READONLY_DEF(std::shared_ptr<TSpecialReadContext>, Context);
+ YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero());
+ std::optional<ui32> RecordsCount;
YDB_READONLY(ui32, IntervalsCount, 0);
virtual NJson::TJsonValue DoDebugJson() const = 0;
bool MergingStartedFlag = false;
@@ -32,21 +36,38 @@ private:
protected:
THashMap<ui32, TFetchingInterval*> Intervals;
- std::shared_ptr<TFilterStageData> FilterStageData;
- std::shared_ptr<TFetchStageData> FetchStageData;
-
- std::optional<TFetchingPlan> FetchingPlan;
+ std::shared_ptr<TFetchedData> StageData;
TAtomic FilterStageFlag = 0;
+ bool IsReadyFlag = false;
bool IsAborted() const {
return AbortedFlag;
}
- virtual void DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) = 0;
- virtual void DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) = 0;
+ virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) = 0;
+ virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) = 0;
virtual void DoAbort() = 0;
public:
+ void SetIsReady();
+
+ bool IsEmptyData() const {
+ return GetStageData().IsEmpty();
+ }
+
+ void AssembleColumns(const std::shared_ptr<TColumnsSet>& columns) {
+ if (columns->IsEmpty()) {
+ return;
+ }
+ DoAssembleColumns(columns);
+ }
+
+ bool StartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) {
+ AFL_VERIFY(columns);
+ return DoStartFetchingColumns(sourcePtr, step, columns);
+ }
+ void InitFetchingPlan(const std::shared_ptr<IFetchingStep>& fetchingFirstStep, const std::shared_ptr<IDataSource>& sourcePtr, const bool isExclusive);
+
std::shared_ptr<arrow::RecordBatch> GetLastPK() const {
return Finish.ExtractSortingPosition();
}
@@ -56,11 +77,6 @@ public:
virtual ui64 GetRawBytes(const std::set<ui32>& columnIds) const = 0;
- const TFetchingPlan& GetFetchingPlan() const {
- Y_ABORT_UNLESS(FetchingPlan);
- return *FetchingPlan;
- }
-
bool IsMergingStarted() const {
return MergingStartedFlag;
}
@@ -87,35 +103,37 @@ public:
bool OnIntervalFinished(const ui32 intervalIdx);
- std::shared_ptr<arrow::RecordBatch> GetBatch() const {
- if (!FilterStageData || !FetchStageData) {
- return nullptr;
- }
- return NArrow::MergeColumns({FilterStageData->GetBatch(), FetchStageData->GetBatch()});
- }
-
bool IsDataReady() const {
- return !!FilterStageData && !!FetchStageData;
+ return IsReadyFlag;
}
- const TFilterStageData& GetFilterStageData() const {
- Y_ABORT_UNLESS(FilterStageData);
- return *FilterStageData;
+ const TFetchedData& GetStageData() const {
+ AFL_VERIFY(StageData);
+ return *StageData;
}
- void InitFetchingPlan(const TFetchingPlan& fetchingPlan, const std::shared_ptr<IDataSource>& sourcePtr);
+ TFetchedData& MutableStageData() {
+ AFL_VERIFY(StageData);
+ return *StageData;
+ }
- void InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch
- , const std::shared_ptr<IDataSource>& sourcePtr);
- void InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& batch);
+ ui32 GetRecordsCount() const {
+ AFL_VERIFY(RecordsCount);
+ return *RecordsCount;
+ }
void RegisterInterval(TFetchingInterval& interval);
- IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context, const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
+ IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context,
+ const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
+ const TSnapshot& recordSnapshotMax, const std::optional<ui32> recordsCount
+ )
: SourceIdx(sourceIdx)
, Start(start)
, Finish(finish)
, Context(context)
+ , RecordSnapshotMax(recordSnapshotMax)
+ , RecordsCount(recordsCount)
{
if (Start.IsReverseSort()) {
std::swap(Start, Finish);
@@ -137,8 +155,11 @@ private:
const std::shared_ptr<IBlobsReadingAction>& readingAction, THashMap<TBlobRange, ui32>& nullBlocks,
const std::shared_ptr<NArrow::TColumnFilter>& filter);
- virtual void DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) override;
- virtual void DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) override;
+ virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) override;
+ virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) override {
+ auto blobSchema = GetContext()->GetReadMetadata()->GetLoadSchema(Portion->GetMinSnapshot());
+ MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, columns->GetFilteredSchemaVerified(), MutableStageData().MutableBlobs()).AssembleTable());
+ }
virtual NJson::TJsonValue DoDebugJson() const override {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("type", "portion");
@@ -162,7 +183,7 @@ public:
TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context,
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
- : TBase(sourceIdx, context, start, finish)
+ : TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount())
, Portion(portion) {
}
@@ -173,20 +194,14 @@ private:
using TBase = IDataSource;
TCommittedBlob CommittedBlob;
bool ReadStarted = false;
- bool ResultReady = false;
- void DoFetch(const std::shared_ptr<IDataSource>& sourcePtr);
virtual void DoAbort() override {
}
- virtual void DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) override {
- DoFetch(sourcePtr);
- }
-
- virtual void DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) override {
- DoFetch(sourcePtr);
- }
+ virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr,
+ const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) override;
+ virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) override;
virtual NJson::TJsonValue DoDebugJson() const override {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("type", "commit");
@@ -204,7 +219,7 @@ public:
TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context,
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
- : TBase(sourceIdx, context, start, finish)
+ : TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {})
, CommittedBlob(committed) {
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
index c9224b8d78..4a4db941aa 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
@@ -2,16 +2,14 @@ LIBRARY()
SRCS(
scanner.cpp
- source.cpp
constructor.cpp
+ source.cpp
interval.cpp
fetched_data.cpp
plain_read_data.cpp
- filter_assembler.cpp
- column_assembler.cpp
- committed_assembler.cpp
columns_set.cpp
context.cpp
+ fetching.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h
index 8164f28f82..bc5d5fa1ba 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_context.h
@@ -57,7 +57,6 @@ class TReadContext {
private:
YDB_READONLY_DEF(std::shared_ptr<IStoragesManager>, StoragesManager);
const NColumnShard::TConcreteScanCounters Counters;
- YDB_READONLY(bool, IsInternalRead, false);
TReadMetadataBase::TConstPtr ReadMetadata;
NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext;
const TActorId ScanActorId;
@@ -97,11 +96,10 @@ public:
return ResourcesTaskContext;
}
- TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const bool isInternalRead, const TReadMetadataBase::TConstPtr& readMetadata,
+ TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata,
const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const NOlap::TComputeShardingPolicy& computeShardingPolicy)
: StoragesManager(storagesManager)
, Counters(counters)
- , IsInternalRead(isInternalRead)
, ReadMetadata(readMetadata)
, ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters)
, ScanActorId(scanActorId)
@@ -165,8 +163,6 @@ public:
TString DebugString(const bool verbose) const {
TStringBuilder sb;
- sb << "internal:" << Context->GetIsInternalRead() << ";"
- ;
sb << DoDebugString(verbose);
return sb;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index 729a50ae7a..988d6553b3 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -56,13 +56,6 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
- if (Snapshot.GetPlanStep()) {
- auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
- for (auto&& i : snapSchema->fields()) {
- result.emplace(indexInfo.GetColumnId(i->name()));
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name());
- }
- }
return result;
}
@@ -87,7 +80,6 @@ void TReadStats::PrintToLog() {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
("event", "statistic")
("begin", BeginTimestamp)
- ("selected", SelectedIndex)
("index_granules", IndexGranules)
("index_portions", IndexPortions)
("index_batches", IndexBatches)
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 4d322c3716..4b0580a6bb 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -25,7 +25,6 @@ class TReadContext;
struct TReadStats {
TInstant BeginTimestamp;
- ui32 SelectedIndex{0};
ui64 IndexGranules{0};
ui64 IndexPortions{0};
ui64 IndexBatches{0};
@@ -42,9 +41,8 @@ struct TReadStats {
ui32 SelectedRows = 0;
- TReadStats(ui32 indexNo = 0)
+ TReadStats()
: BeginTimestamp(TInstant::Now())
- , SelectedIndex(indexNo)
{}
void PrintToLog();
@@ -166,7 +164,7 @@ public:
, IndexVersions(info)
, Snapshot(snapshot)
, ResultIndexSchema(info.GetSchema(Snapshot))
- , ReadStats(std::make_shared<TReadStats>(info.GetLastSchema()->GetIndexInfo().GetId()))
+ , ReadStats(std::make_shared<TReadStats>())
{
}
@@ -271,7 +269,7 @@ public:
if (!ResultIndexSchema) {
return {};
}
- auto f = ResultIndexSchema->GetFieldByColumnId(columnId);
+ auto f = ResultIndexSchema->GetFieldByColumnIdOptional(columnId);
if (!f) {
return {};
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
index 13dc7a35f1..a80496460e 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
@@ -12,7 +12,7 @@ std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByIndex(const int index)
}
return schema->field(index);
}
-std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnId(const ui32 columnId) const {
+std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnIdOptional(const ui32 columnId) const {
return GetFieldByIndex(GetFieldIndex(columnId));
}
@@ -111,8 +111,8 @@ ui32 ISnapshotSchema::GetColumnId(const std::string& columnName) const {
}
std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnIdVerified(const ui32 columnId) const {
- auto result = GetFieldByColumnId(columnId);
- AFL_VERIFY(result)("event", "unknown_column")("column_id", columnId);
+ auto result = GetFieldByColumnIdOptional(columnId);
+ AFL_VERIFY(result)("event", "unknown_column")("column_id", columnId)("schema", DebugString());
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
index 9232edcf1d..b063ba7d02 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
@@ -51,7 +51,7 @@ public:
ui32 GetColumnId(const std::string& columnName) const;
std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const;
- std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const;
+ std::shared_ptr<arrow::Field> GetFieldByColumnIdOptional(const ui32 columnId) const;
std::shared_ptr<arrow::Field> GetFieldByColumnIdVerified(const ui32 columnId) const;
TString DebugString() const {
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 1e72a2e7ea..b742348509 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -206,8 +206,7 @@ public:
std::shared_ptr<NOlap::TSerializationStats> BuildSerializationStats(ISnapshotSchema::TPtr schema) const {
auto result = std::make_shared<NOlap::TSerializationStats>();
for (auto&& i : GetAdditiveSummary().GetCompacted().GetColumnStats()) {
- auto field = schema->GetFieldByColumnId(i.first);
- AFL_VERIFY(field)("column_id", i.first)("schema", schema->DebugString());
+ auto field = schema->GetFieldByColumnIdVerified(i.first);
NOlap::TColumnSerializationStat columnInfo(i.first, field->name());
columnInfo.Merge(i.second);
result->AddStat(columnInfo);
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h
index 5edc9c18d1..feda743b26 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.h
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.h
@@ -63,7 +63,7 @@ public:
AFL_VERIFY(Stats);
}
virtual std::shared_ptr<arrow::Field> GetField(const ui32 columnId) const override {
- return Schema->GetFieldByColumnId(columnId);
+ return Schema->GetFieldByColumnIdOptional(columnId);
}
virtual bool NeedMinMaxForColumn(const ui32 columnId) const override {
return Schema->GetIndexInfo().GetMinMaxIdxColumns().contains(columnId);
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index dfc554c4ad..d08e222589 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -685,6 +685,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
std::unique_ptr<NOlap::NTests::TShardReader> reader;
if (!misconfig) {
reader = std::make_unique<NOlap::NTests::TShardReader>(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>()));
+ reader->SetReplyColumns({specs[i].TtlColumn});
counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here
counter.WaitReadsCaptured(runtime);
reader->InitializeScanner();