diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-01-17 18:36:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-17 18:36:02 +0300 |
commit | a854215ff2665425d20536c7b7f597dc1143f858 (patch) | |
tree | 927f7167bfb714f78cd724ddce09a60449eae78a | |
parent | 5fa4b2f5ec340934d41d70a0ebc24375f12e92d8 (diff) | |
download | ydb-a854215ff2665425d20536c7b7f597dc1143f858.tar.gz |
general steps for data fetching (#1088)
35 files changed, 690 insertions, 823 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index e09f6d221e..bdfb8d0355 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -323,7 +323,7 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const } if (filter.IsTotalDenyFilter()) { batch = batch->Slice(0, 0); - return false; + return true; } if (filter.IsTotalAllowFilter()) { return true; @@ -343,11 +343,11 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const return false; } -bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) { +bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const { return ApplyImpl<arrow::Datum::TABLE>(*this, batch, startPos, count); } -bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) { +bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const { return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch, startPos, count); } diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h index 309467b5be..29dc8471c5 100644 --- a/ydb/core/formats/arrow/arrow_filter.h +++ b/ydb/core/formats/arrow/arrow_filter.h @@ -171,8 +171,8 @@ public: // It makes a filter using composite predicate static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType); - bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}); - bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}); + bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const; + bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const; void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const; // Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions) diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp index dbd0d638f6..000b8e7e63 100644 --- a/ydb/core/tx/columnshard/columnshard__read_base.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp @@ -45,6 +45,7 @@ bool TTxReadBase::ParseProgram(NKikimrSchemeOp::EOlapProgramType programType, AFL_VERIFY(namesChecker.emplace(names.back()).second); } NOlap::TProgramContainer container; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "overriden_columns")("columns", JoinSeq(",", names)); container.OverrideProcessingColumns(std::vector<TString>(names.begin(), names.end())); read.SetProgram(std::move(container)); return true; diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 114881eb4e..e4c76ab6cc 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -118,7 +118,7 @@ public: ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId())); ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId())); - std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, + std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); @@ -367,7 +367,7 @@ private: return Finish(); } - auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(), + auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); } @@ -921,29 +921,17 @@ public: Results.emplace_back(std::move(res)); } - void FillResult(std::vector<TPartialReadResult>& result, const bool mergePartsToMax) const { + void FillResult(std::vector<TPartialReadResult>& result) const { if (Results.empty()) { return; } - if (mergePartsToMax) { - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>> guards; - for (auto&& i : Results) { - batches.emplace_back(i.GetResultBatchPtrVerified()); - guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end()); - } - auto res = NArrow::CombineBatches(batches); - AFL_VERIFY(res); - result.emplace_back(TPartialReadResult(guards, NArrow::TShardedRecordBatch(res), Results.back().GetLastReadKey())); - } else { - for (auto&& i : Results) { - result.emplace_back(std::move(i)); - } + for (auto&& i : Results) { + result.emplace_back(std::move(i)); } } }; -std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) { +std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult) { std::vector<TCurrentBatch> resultBatches; TCurrentBatch currentBatch; for (auto&& i : resultsExt) { @@ -960,7 +948,7 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults std::vector<TPartialReadResult> result; for (auto&& i : resultBatches) { - i.FillResult(result, mergePartsToMax); + i.FillResult(result); } return result; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 5c20f2a5bd..2c6dd9c52a 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -47,7 +47,7 @@ public: return ResultBatch.GetRecordsCount(); } - static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax); + static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult); const NArrow::TShardedRecordBatch& GetShardedBatch() const { return ResultBatch; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 00e4fd205a..2ba089bb80 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -97,7 +97,7 @@ public: TSerializationStats GetSerializationStat(const ISnapshotSchema& schema) const { TSerializationStats result; for (auto&& i : Records) { - if (schema.GetFieldByColumnId(i.ColumnId)) { + if (schema.GetFieldByColumnIdOptional(i.ColumnId)) { result.AddStat(i.GetSerializationStat(schema.GetFieldByColumnIdVerified(i.ColumnId)->name())); } } @@ -151,8 +151,7 @@ public: : PathId(pathId) , Portion(portionId) , MinSnapshot(minSnapshot) - , BlobsOperator(blobsOperator) - { + , BlobsOperator(blobsOperator) { } TString DebugString(const bool withDetails = false) const; @@ -399,22 +398,30 @@ public: public: TAssembleBlobInfo(const ui32 rowsCount) : NullRowsCount(rowsCount) { - + AFL_VERIFY(NullRowsCount); } TAssembleBlobInfo(const TString& data) : Data(data) { - + AFL_VERIFY(!!Data); } ui32 GetNullRowsCount() const noexcept { - return NullRowsCount; + return NullRowsCount; } const TString& GetData() const noexcept { return Data; } + bool IsBlob() const { + return !NullRowsCount && !!Data; + } + + bool IsNull() const { + return NullRowsCount && !Data; + } + std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const; }; @@ -437,8 +444,7 @@ public: TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader) : Loader(loader) - , Blobs(std::move(blobs)) - { + , Blobs(std::move(blobs)) { Y_ABORT_UNLESS(Loader); Y_ABORT_UNLESS(Loader->GetExpectedSchema()->num_fields() == 1); } @@ -505,8 +511,7 @@ public: TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount) : Columns(std::move(columns)) , Schema(schema) - , RowsCount(rowsCount) - { + , RowsCount(rowsCount) { } std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const; @@ -525,8 +530,7 @@ public: : ColumnId(resultLoader->GetColumnId()) , NumRows(numRows) , DataLoader(dataLoader) - , ResultLoader(resultLoader) - { + , ResultLoader(resultLoader) { AFL_VERIFY(ResultLoader); if (DataLoader) { AFL_VERIFY(ResultLoader->GetColumnId() == DataLoader->GetColumnId()); @@ -598,8 +602,8 @@ public: } std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema, - const ISnapshotSchema& resultSchema, - THashMap<TBlobRange, TString>& data) const { + const ISnapshotSchema& resultSchema, + THashMap<TBlobRange, TString>& data) const { auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble(); Y_ABORT_UNLESS(batch->Validate().ok()); return batch; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp index 6e73043a44..0d05fe444c 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp @@ -12,6 +12,12 @@ TString TColumnsSet::DebugString() const { } NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsSet& external) const { + if (external.IsEmpty()) { + return *this; + } + if (IsEmpty()) { + return external; + } TColumnsSet result = *this; for (auto&& i : external.ColumnIds) { result.ColumnIds.erase(i); @@ -28,6 +34,12 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS } NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsSet& external) const { + if (external.IsEmpty()) { + return *this; + } + if (IsEmpty()) { + return external; + } TColumnsSet result = *this; result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end()); auto fields = result.Schema->fields(); @@ -42,7 +54,7 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsS } bool TColumnsSet::ColumnsOnly(const std::vector<std::string>& fieldNames) const { - if (fieldNames.size() != GetSize()) { + if (fieldNames.size() != GetColumnsCount()) { return false; } std::set<std::string> fieldNamesSet; @@ -64,11 +76,7 @@ void TColumnsSet::Rebuild() { ColumnNamesVector.emplace_back(i); ColumnNames.emplace(i); } - if (ColumnIds.size()) { - FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds); - } else { - FilteredSchema = FullReadSchema; - } + FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h index 5c6eaecb05..9b50c4cfaa 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h @@ -18,12 +18,19 @@ private: public: TColumnsSet() = default; + bool IsEmpty() const { + return ColumnIds.empty(); + } + + bool operator!() const { + return IsEmpty(); + } const std::vector<TString>& GetColumnNamesVector() const { return ColumnNamesVector; } - ui32 GetSize() const { + ui32 GetColumnsCount() const { return ColumnIds.size(); } @@ -96,27 +103,4 @@ public: TColumnsSet operator-(const TColumnsSet& external) const; }; -class TFetchingPlan { -private: - YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FilterStage); - YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FetchingStage); - bool CanUseEarlyFilterImmediatelyFlag = false; -public: - TFetchingPlan(const std::shared_ptr<TColumnsSet>& filterStage, const std::shared_ptr<TColumnsSet>& fetchingStage, const bool canUseEarlyFilterImmediately) - : FilterStage(filterStage) - , FetchingStage(fetchingStage) - , CanUseEarlyFilterImmediatelyFlag(canUseEarlyFilterImmediately) { - - } - - TString DebugString() const { - return TStringBuilder() << "{filter=" << (FilterStage ? FilterStage->DebugString() : "NO") << ";fetching=" << - (FetchingStage ? FetchingStage->DebugString() : "NO") << ";use_filter=" << CanUseEarlyFilterImmediatelyFlag << "}"; - } - - bool CanUseEarlyFilterImmediately() const { - return CanUseEarlyFilterImmediatelyFlag; - } -}; - } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp deleted file mode 100644 index 8ae01b74e1..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include "committed_assembler.h" -#include "plain_read_data.h" - -namespace NKikimr::NOlap::NPlainReader { - -bool TCommittedAssembler::DoExecute() { - ResultBatch = NArrow::DeserializeBatch(BlobData, ReadMetadata->GetBlobSchema(SchemaVersion)); - Y_ABORT_UNLESS(ResultBatch); - ResultBatch = ReadMetadata->GetIndexInfo().AddSpecialColumns(ResultBatch, DataSnapshot); - Y_ABORT_UNLESS(ResultBatch); - ReadMetadata->GetPKRangesFilter().BuildFilter(ResultBatch).Apply(ResultBatch); - auto t = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({ResultBatch})); - EarlyFilter = ReadMetadata->GetProgram().ApplyEarlyFilter(t, false); - ResultBatch = NArrow::ToBatch(t, true); - return true; -} - -bool TCommittedAssembler::DoApply(IDataReader& /*owner*/) const { - if (Source->GetFetchingPlan().GetFilterStage()->GetSchema()) { - Source->InitFilterStageData(nullptr, EarlyFilter, NArrow::ExtractColumns(ResultBatch, Source->GetFetchingPlan().GetFilterStage()->GetSchema(), true), Source); - } else { - Source->InitFilterStageData(nullptr, EarlyFilter, nullptr, Source); - } - if (Source->GetFetchingPlan().GetFetchingStage()->GetSchema()) { - Source->InitFetchStageData(NArrow::ExtractColumns(ResultBatch, Source->GetFetchingPlan().GetFetchingStage()->GetSchema(), true)); - } else { - Source->InitFetchStageData(nullptr); - } - return true; -} - -TCommittedAssembler::TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const std::shared_ptr<IDataSource>& source, - const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard) - : TBase(scanActorId) - , BlobData(blobData) - , ReadMetadata(readMetadata) - , Source(source) - , SchemaVersion(cBlob.GetSchemaVersion()) - , DataSnapshot(cBlob.GetSnapshot()) - , TaskGuard(std::move(taskGuard)) -{ -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h deleted file mode 100644 index eafcc342d9..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once -#include "source.h" -#include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h> -#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> -#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <ydb/core/tx/columnshard/counters/common/object_counter.h> -#include <ydb/core/tx/columnshard/counters/scan.h> -#include <ydb/core/formats/arrow/arrow_filter.h> - -namespace NKikimr::NOlap::NPlainReader { -class TCommittedAssembler: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TCommittedAssembler, true> { -private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; - TString BlobData; - TReadMetadata::TConstPtr ReadMetadata; - const std::shared_ptr<IDataSource> Source; - ui64 SchemaVersion; - TSnapshot DataSnapshot; - - std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; - std::shared_ptr<arrow::RecordBatch> ResultBatch; - const NColumnShard::TCounterGuard TaskGuard; -protected: - virtual bool DoExecute() override; - virtual bool DoApply(IDataReader& owner) const override; -public: - virtual TString GetTaskClassIdentifier() const override { - return "PlainReader::TCommittedAssembler"; - } - - TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const std::shared_ptr<IDataSource>& source, - const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard); -}; -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp index 9c06905261..a4ecce5de3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp @@ -1,51 +1,20 @@ #include "constructor.h" -#include "filter_assembler.h" -#include "column_assembler.h" -#include "committed_assembler.h" -#include <ydb/core/tx/columnshard/engines/reader/read_context.h> -#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> -#include <ydb/core/tx/conveyor/usage/events.h> #include <ydb/core/tx/conveyor/usage/service.h> namespace NKikimr::NOlap::NPlainReader { -THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> TAssembleColumnsTaskConstructor::BuildBatchAssembler() { - auto blobs = ExtractBlobsData(); - THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble; - for (auto&& i : blobs) { - blobsDataAssemble.emplace(i.first, i.second); - } - for (auto&& i : NullBlocks) { - AFL_VERIFY(blobsDataAssemble.emplace(i.first, i.second).second); - } - return blobsDataAssemble; -} - -void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { - auto task = std::make_shared<TAssembleFilter>(Context, PortionInfo, Source, Columns, UseEarlyFilter, BuildBatchAssembler()); - task->SetPriority(NConveyor::ITask::EPriority::Normal); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); -} - -void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { - auto task = std::make_shared<TAssembleFFBatch>(Context, PortionInfo, Source, Columns, BuildBatchAssembler(), AppliedFilter); - task->SetPriority(NConveyor::ITask::EPriority::High); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); -} - -void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { - auto blobs = ExtractBlobsData(); - Y_ABORT_UNLESS(NullBlocks.size() == 0); - Y_ABORT_UNLESS(blobs.size() == 1); - auto task = std::make_shared<TCommittedAssembler>(Context->GetCommonContext()->GetScanActorId(), blobs.begin()->second, - Context->GetReadMetadata(), Source, CommittedBlob, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()); - task->SetPriority(NConveyor::ITask::EPriority::High); +void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { + Source->MutableStageData().AddBlobs(ExtractBlobsData()); + AFL_VERIFY(Step->GetNextStep()); + auto task = std::make_shared<TStepAction>(Source, Step->GetNextStep(), Context->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } -bool IFetchTaskConstructor::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())("status_code", status.GetStatus()); - NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(), std::make_unique<NConveyor::TEvExecution::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString()))); +bool TBlobsFetcherTask::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId()) + ("status", status.GetErrorMessage())("status_code", status.GetStatus()); + NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(), + std::make_unique<NConveyor::TEvExecution::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString()))); return false; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h index 54105ee81e..3b5ceca520 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h @@ -8,87 +8,25 @@ namespace NKikimr::NOlap::NPlainReader { -class IFetchTaskConstructor: public NBlobOperations::NRead::ITask { +class TBlobsFetcherTask: public NBlobOperations::NRead::ITask { private: using TBase = NBlobOperations::NRead::ITask; -protected: const std::shared_ptr<IDataSource> Source; + const std::shared_ptr<IFetchingStep> Step; const std::shared_ptr<TSpecialReadContext> Context; - THashMap<TBlobRange, ui32> NullBlocks; - NColumnShard::TCounterGuard TasksGuard; + + virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override; virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override; public: - IFetchTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer) - : TBase(readActions, taskCustomer) + TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<IDataSource>& sourcePtr, + const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId) + : TBase(readActions, taskCustomer, externalTaskId) , Source(sourcePtr) + , Step(step) , Context(context) - , NullBlocks(std::move(nullBlocks)) - , TasksGuard(context->GetCommonContext()->GetCounters().GetReadTasksGuard()) - { - } -}; - -class TCommittedColumnsTaskConstructor: public IFetchTaskConstructor { -private: - TCommittedBlob CommittedBlob; - using TBase = IFetchTaskConstructor; -protected: - virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override; -public: - TCommittedColumnsTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const TCommittedDataSource& source, const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer) - : TBase(context, readActions, std::move(nullBlocks), sourcePtr, taskCustomer) - , CommittedBlob(source.GetCommitted()) - { - - } -}; - -class TAssembleColumnsTaskConstructor: public IFetchTaskConstructor { -private: - using TBase = IFetchTaskConstructor; -protected: - std::shared_ptr<TColumnsSet> Columns; - std::shared_ptr<TPortionInfo> PortionInfo; - THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> BuildBatchAssembler(); -public: - TAssembleColumnsTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const std::shared_ptr<TColumnsSet>& columns, const TPortionDataSource& portion, const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer) - : TBase(context, readActions, std::move(nullBlocks), sourcePtr, taskCustomer) - , Columns(columns) - , PortionInfo(portion.GetPortionInfoPtr()) { } }; -class TFFColumnsTaskConstructor: public TAssembleColumnsTaskConstructor { -private: - using TBase = TAssembleColumnsTaskConstructor; - std::shared_ptr<NArrow::TColumnFilter> AppliedFilter; - virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override; -public: - TFFColumnsTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const std::shared_ptr<TColumnsSet>& columns, const TPortionDataSource& portion, const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer) - : TBase(context, readActions, std::move(nullBlocks), columns, portion, sourcePtr, taskCustomer) - , AppliedFilter(portion.GetFilterStageData().GetAppliedFilter()) - { - } -}; - -class TEFTaskConstructor: public TAssembleColumnsTaskConstructor { -private: - bool UseEarlyFilter = false; - using TBase = TAssembleColumnsTaskConstructor; - virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override; -public: - TEFTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const std::shared_ptr<TColumnsSet>& columns, const TPortionDataSource& portion, const std::shared_ptr<IDataSource>& sourcePtr, const bool useEarlyFilter, const TString& taskCustomer) - : TBase(context, readActions, std::move(nullBlocks), columns, portion, sourcePtr, taskCustomer) - , UseEarlyFilter(useEarlyFilter) - { - } -}; - } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index 35a90e520b..78a5da19a2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -8,14 +8,145 @@ std::shared_ptr<NKikimr::NOlap::NIndexedReader::TMergePartialStream> TSpecialRea } ui64 TSpecialReadContext::GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive) { - auto fetchingPlan = GetColumnsFetchingPlan(isExclusive); ui64 result = 0; for (auto&& i : sources) { + auto fetchingPlan = GetColumnsFetchingPlan(i.second, isExclusive); AFL_VERIFY(i.second->GetIntervalsCount()); - result += i.second->GetRawBytes(fetchingPlan.GetFilterStage()->GetColumnIds()) / i.second->GetIntervalsCount(); - result += i.second->GetRawBytes(fetchingPlan.GetFetchingStage()->GetColumnIds()) / i.second->GetIntervalsCount(); + result += fetchingPlan->PredictRawBytes(i.second) / i.second->GetIntervalsCount(); } return result; } +std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const { + const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax(); + auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0]; + if (!result) { + return std::make_shared<TBuildFakeSpec>(source->GetRecordsCount(), "fake"); + } + return result; +} + +std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const { + if (!EFColumns->GetColumnsCount()) { + TColumnsSet columnsFetch = *FFColumns; + if (needSnapshots) { + columnsFetch = columnsFetch + *SpecColumns; + } + if (!exclusiveSource) { + columnsFetch = columnsFetch + *PKColumns + *SpecColumns; + } + if (columnsFetch.GetColumnsCount()) { + std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "simple"); + std::shared_ptr<IFetchingStep> current = result; + current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch))); + return result; + } else { + return nullptr; + } + } else if (exclusiveSource) { + TColumnsSet columnsFetch = *EFColumns; + if (needSnapshots || FFColumns->Contains(SpecColumns)) { + columnsFetch = columnsFetch + *SpecColumns; + } + AFL_VERIFY(columnsFetch.GetColumnsCount()); + std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef"); + std::shared_ptr<IFetchingStep> current = result; + + if (needSnapshots || FFColumns->Contains(SpecColumns)) { + current = current->AttachNext(std::make_shared<TAssemblerStep>(SpecColumns)); + current = current->AttachNext(std::make_shared<TSnapshotFilter>()); + columnsFetch = columnsFetch - *SpecColumns; + } + current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch))); + if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { + current = current->AttachNext(std::make_shared<TPredicateFilter>()); + } + for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { + if (!i->IsFilterOnly()) { + break; + } + current = current->AttachNext(std::make_shared<TFilterProgramStep>(i)); + } + const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns; + if (columnsAdditionalFetch.GetColumnsCount()) { + current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch))); + current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch))); + } + return result; + } else { + TColumnsSet columnsFetch = *MergeColumns + *EFColumns; + AFL_VERIFY(columnsFetch.GetColumnsCount()); + std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "full"); + std::shared_ptr<IFetchingStep> current = result; + + current = current->AttachNext(std::make_shared<TAssemblerStep>(SpecColumns)); + if (needSnapshots) { + current = current->AttachNext(std::make_shared<TSnapshotFilter>()); + } + current = current->AttachNext(std::make_shared<TAssemblerStep>(PKColumns)); + if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { + current = current->AttachNext(std::make_shared<TPredicateFilter>()); + } + const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns; + current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetchEF))); + for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { + if (!i->IsFilterOnly()) { + break; + } + current = current->AttachNext(std::make_shared<TFilterProgramStep>(i)); + } + const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns; + if (columnsAdditionalFetch.GetColumnsCount()) { + current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch))); + current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch))); + } + return result; + } +} + +TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext) + : CommonContext(commonContext) +{ + ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata()); + Y_ABORT_UNLESS(ReadMetadata); + Y_ABORT_UNLESS(ReadMetadata->SelectInfo); + + auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot()); + SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema); + { + auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); + if (efColumns.size()) { + EFColumns = std::make_shared<TColumnsSet>(efColumns, ReadMetadata->GetIndexInfo(), readSchema); + } else { + EFColumns = std::make_shared<TColumnsSet>(); + } + } + if (ReadMetadata->HasProcessingColumnIds()) { + FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); + if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) { + FFColumns = std::make_shared<TColumnsSet>(*EFColumns + *SpecColumns); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString()); + } else { + AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_first", FFColumns->DebugString()); + } + } else { + FFColumns = EFColumns; + } + if (FFColumns->IsEmpty()) { + ProgramInputColumns = SpecColumns; + } else { + ProgramInputColumns = FFColumns; + } + + PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); + MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns); + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); + CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false); + CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true); + CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false); + CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h index 2c6f213143..3fe05f23d8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h @@ -1,5 +1,6 @@ #pragma once #include "columns_set.h" +#include "fetching.h" #include <ydb/core/tx/columnshard/engines/reader/read_context.h> #include <ydb/core/tx/columnshard/engines/reader/read_filter_merger.h> @@ -23,8 +24,8 @@ private: std::shared_ptr<TColumnsSet> PKFFColumns; std::shared_ptr<TColumnsSet> EFPKColumns; std::shared_ptr<TColumnsSet> FFMinusEFColumns; - std::shared_ptr<TColumnsSet> FFMinusEFPKColumns; - bool TrivialEFFlag = false; + std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const; + std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2> CacheFetchingScripts; public: ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive); @@ -43,65 +44,9 @@ public: ; } - TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext) - : CommonContext(commonContext) - { - ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata()); - Y_ABORT_UNLESS(ReadMetadata); - Y_ABORT_UNLESS(ReadMetadata->SelectInfo); + TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext); - auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot()); - SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema); - { - auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); - if (efColumns.size()) { - EFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetEarlyFilterColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); - } else { - EFColumns = SpecColumns; - } - } - *EFColumns = *EFColumns + *SpecColumns; - if (ReadMetadata->HasProcessingColumnIds()) { - FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); - AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString()); - *FFColumns = *FFColumns + *EFColumns; - } else { - FFColumns = std::make_shared<TColumnsSet>(*EFColumns); - } - ProgramInputColumns = FFColumns; - - PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); - MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns); - - TrivialEFFlag = EFColumns->ColumnsOnly(ReadMetadata->GetIndexInfo().ArrowSchemaSnapshot()->field_names()); - - PKFFColumns = std::make_shared<TColumnsSet>(*PKColumns + *FFColumns); - EFPKColumns = std::make_shared<TColumnsSet>(*EFColumns + *PKColumns); - FFMinusEFColumns = std::make_shared<TColumnsSet>(*FFColumns - *EFColumns); - FFMinusEFPKColumns = std::make_shared<TColumnsSet>(*FFColumns - *EFColumns - *PKColumns); - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); - } - - TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const { - if (CommonContext->GetIsInternalRead()) { - return TFetchingPlan(PKFFColumns, EmptyColumns, exclusiveSource); - } - - if (exclusiveSource) { - if (TrivialEFFlag) { - return TFetchingPlan(FFColumns, EmptyColumns, true); - } else { - return TFetchingPlan(EFColumns, FFMinusEFColumns, true); - } - } else { - if (TrivialEFFlag) { - return TFetchingPlan(PKFFColumns, EmptyColumns, false); - } else { - return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false); - } - } - } + std::shared_ptr<IFetchingStep> GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const; }; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp index 1c9e0df7ff..d4f1a808b7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp @@ -1,5 +1,17 @@ #include "fetched_data.h" +#include <ydb/core/formats/arrow/simple_arrays_cache.h> +#include <ydb/core/formats/arrow/common/validation.h> namespace NKikimr::NOlap { +void TFetchedData::SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields) { + for (auto&& i : fields) { + if (Table->GetColumnByName(i->name())) { + continue; + } + Table = NArrow::TStatusValidator::GetValid(Table->AddColumn(Table->num_columns(), i, + std::make_shared<arrow::ChunkedArray>(NArrow::TThreadSimpleArraysCache::GetNull(i->type(), Table->num_rows())))); + } +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index c07efcaf47..4c29f220d3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -1,45 +1,109 @@ #pragma once -#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/table.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> #include <ydb/core/formats/arrow/arrow_filter.h> +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/library/accessor/accessor.h> +#include <ydb/library/actors/core/log.h> namespace NKikimr::NOlap { class TFetchedData { protected: - YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch); + using TBlobs = THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>; + YDB_ACCESSOR_DEF(TBlobs, Blobs); + YDB_READONLY_DEF(std::shared_ptr<arrow::Table>, Table); + YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); + YDB_READONLY(bool, UseFilter, false); public: - TFetchedData(const std::shared_ptr<arrow::RecordBatch>& batch) - : Batch(batch) + TFetchedData(const bool useFilter) + : UseFilter(useFilter) { + + } + + void SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields); + + std::shared_ptr<NArrow::TColumnFilter> GetAppliedFilter() const { + return UseFilter ? Filter : nullptr; + } + + std::shared_ptr<NArrow::TColumnFilter> GetNotAppliedFilter() const { + return UseFilter ? nullptr : Filter; + } + + TString ExtractBlob(const TBlobRange& bRange) { + auto it = Blobs.find(bRange); + AFL_VERIFY(it != Blobs.end()); + AFL_VERIFY(it->second.IsBlob()); + auto result = it->second.GetData(); + Blobs.erase(it); + return result; + } + + void AddBlobs(THashMap<TBlobRange, TString>&& blobs) { + for (auto&& i : blobs) { + AFL_VERIFY(Blobs.emplace(i.first, std::move(i.second)).second); + } + } + + std::shared_ptr<arrow::RecordBatch> GetBatch() const { + return NArrow::ToBatch(Table, true); + } + + void AddNulls(THashMap<TBlobRange, ui32>&& blobs) { + for (auto&& i : blobs) { + AFL_VERIFY(Blobs.emplace(i.first, i.second).second); + } } -}; -class TFilterStageData: public TFetchedData { -private: - using TBase = TFetchedData; - YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, AppliedFilter); - YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedEarlyFilter); -public: bool IsEmptyFilter() const { - return (AppliedFilter && AppliedFilter->IsTotalDenyFilter()) || (NotAppliedEarlyFilter && NotAppliedEarlyFilter->IsTotalDenyFilter()); + return Filter && Filter->IsTotalDenyFilter(); } - TFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch) - : TBase(batch) - , AppliedFilter(appliedFilter) - , NotAppliedEarlyFilter(earlyFilter) - { + bool IsEmpty() const { + return IsEmptyFilter() || (Table && !Table->num_rows()); + } + void AddFilter(const std::shared_ptr<NArrow::TColumnFilter>& filter) { + if (UseFilter && Table && filter) { + AFL_VERIFY(filter->Apply(Table)); + } + if (!Filter) { + Filter = filter; + } else if (filter) { + *Filter = Filter->CombineSequentialAnd(*filter); + } + } + + void AddFilter(const NArrow::TColumnFilter& filter) { + if (UseFilter && Table) { + AFL_VERIFY(filter.Apply(Table)); + } + if (!Filter) { + Filter = std::make_shared<NArrow::TColumnFilter>(filter); + } else { + *Filter = Filter->CombineSequentialAnd(filter); + } + } + + void AddBatch(const std::shared_ptr<arrow::RecordBatch>& batch) { + return AddBatch(arrow::Table::Make(batch->schema(), batch->columns(), batch->num_rows())); + } + + void AddBatch(const std::shared_ptr<arrow::Table>& table) { + auto tableLocal = table; + if (Filter && UseFilter) { + AFL_VERIFY(Filter->Apply(tableLocal)); + } + if (!Table) { + Table = tableLocal; + } else { + AFL_VERIFY(NArrow::MergeBatchColumns({Table, tableLocal}, Table)); + } } -}; -class TFetchStageData: public TFetchedData { -private: - using TBase = TFetchedData; -public: - using TBase::TBase; }; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp new file mode 100644 index 0000000000..3058b64781 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -0,0 +1,71 @@ +#include "fetching.h" +#include "source.h" +#include <ydb/core/tx/columnshard/engines/filter.h> +#include <ydb/core/formats/arrow/simple_arrays_cache.h> + +namespace NKikimr::NOlap::NPlainReader { + +bool TStepAction::DoApply(IDataReader& /*owner*/) const { + if (FinishedFlag) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply"); + Source->SetIsReady(); + } + return true; +} + +bool TStepAction::DoExecute() { + while (Step) { + if (!Step->ExecuteInplace(Source, Step)) { + return true; + } + if (Source->IsEmptyData()) { + FinishedFlag = true; + return true; + } + Step = Step->GetNextStep(); + } + FinishedFlag = true; + return true; +} + +bool TBlobsFetchingStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const { + return !source->StartFetchingColumns(source, step, Columns); +} + +ui64 TBlobsFetchingStep::PredictRawBytes(const std::shared_ptr<IDataSource>& source) const { + return source->GetRawBytes(Columns->GetColumnIds()); +} + +bool TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const { + source->AssembleColumns(Columns); + return true; +} + +bool TFilterProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const { + auto filter = Step->BuildFilter(source->GetStageData().GetTable()); + source->MutableStageData().AddFilter(filter); + return true; +} + +bool TPredicateFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const { + auto filter = source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(source->GetStageData().GetTable()); + source->MutableStageData().AddFilter(filter); + return true; +} + +bool TSnapshotFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const { + auto filter = MakeSnapshotFilter(source->GetStageData().GetTable(), source->GetContext()->GetReadMetadata()->GetSnapshot()); + source->MutableStageData().AddFilter(filter); + return true; +} + +bool TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const { + std::vector<std::shared_ptr<arrow::Array>> columns; + for (auto&& f : TIndexInfo::ArrowSchemaSnapshot()->fields()) { + columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), std::make_shared<arrow::UInt64Scalar>(0), Count)); + } + source->MutableStageData().AddBatch(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns)); + return true; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h new file mode 100644 index 0000000000..297863392a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h @@ -0,0 +1,174 @@ +#pragma once +#include "columns_set.h" +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> +#include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h> +#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> + +namespace NKikimr::NOlap::NPlainReader { +class IDataSource; + +class IFetchingStep { +private: + std::shared_ptr<IFetchingStep> NextStep; + YDB_READONLY_DEF(TString, Name); + YDB_READONLY(ui32, Index, 0); + YDB_READONLY_DEF(TString, BranchName); +protected: + virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const = 0; + virtual TString DoDebugString() const { + return ""; + } +public: + virtual ~IFetchingStep() = default; + + std::shared_ptr<IFetchingStep> AttachNext(const std::shared_ptr<IFetchingStep>& nextStep) { + AFL_VERIFY(nextStep); + NextStep = nextStep; + nextStep->Index = Index + 1; + nextStep->BranchName = BranchName; + return nextStep; + } + + virtual ui64 PredictRawBytes(const std::shared_ptr<IDataSource>& /*source*/) const { + return 0; + } + + bool ExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", DebugString())("scan_step_idx", GetIndex()); + return DoExecuteInplace(source, step); + } + + const std::shared_ptr<IFetchingStep>& GetNextStep() const { + return NextStep; + } + + IFetchingStep(const TString& name, const TString& branchName = Default<TString>()) + : Name(name) + , BranchName(branchName) + { + + } + + TString DebugString() const { + TStringBuilder sb; + sb << "name=" << Name << ";" << DoDebugString() << ";branch=" << BranchName << ";"; + if (NextStep) { + sb << "next=" << NextStep->DebugString() << ";"; + } + return sb; + } +}; + +class TStepAction: public NColumnShard::IDataTasksProcessor::ITask { +private: + using TBase = NColumnShard::IDataTasksProcessor::ITask; + std::shared_ptr<IDataSource> Source; + std::shared_ptr<IFetchingStep> Step; + bool FinishedFlag = false; +protected: + virtual bool DoApply(IDataReader& /*owner*/) const override; + virtual bool DoExecute() override; +public: + virtual TString GetTaskClassIdentifier() const override { + return "STEP_ACTION"; + } + + TStepAction(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step, const NActors::TActorId& ownerActorId) + : TBase(ownerActorId) + , Source(source) + , Step(step) + { + + } +}; + +class TBuildFakeSpec: public IFetchingStep { +private: + using TBase = IFetchingStep; + const ui32 Count = 0; +protected: + virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const override; +public: + TBuildFakeSpec(const ui32 count, const TString& nameBranch = "") + : TBase("FAKE_SPEC", nameBranch) + , Count(count) + { + AFL_VERIFY(Count); + } +}; + +class TBlobsFetchingStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + std::shared_ptr<TColumnsSet> Columns; +protected: + virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override; + virtual ui64 PredictRawBytes(const std::shared_ptr<IDataSource>& source) const override; + virtual TString DoDebugString() const override { + return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; + } +public: + TBlobsFetchingStep(const std::shared_ptr<TColumnsSet>& columns, const TString& nameBranch = "") + : TBase("FETCHING", nameBranch) + , Columns(columns) + { + AFL_VERIFY(Columns); + } +}; + +class TAssemblerStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns); + virtual TString DoDebugString() const override { + return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; + } +public: + virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const override; + TAssemblerStep(const std::shared_ptr<TColumnsSet>& columns) + : TBase("ASSEMBLER") + , Columns(columns) + { + AFL_VERIFY(Columns); + } +}; + +class TFilterProgramStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + std::shared_ptr<NSsa::TProgramStep> Step; +public: + virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override; + TFilterProgramStep(const std::shared_ptr<NSsa::TProgramStep>& step) + : TBase("PROGRAM") + , Step(step) + { + + } +}; + +class TPredicateFilter: public IFetchingStep { +private: + using TBase = IFetchingStep; +public: + virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override; + TPredicateFilter() + : TBase("PREDICATE") { + + } +}; + +class TSnapshotFilter: public IFetchingStep { +private: + using TBase = IFetchingStep; +public: + virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override; + TSnapshotFilter() + : TBase("SNAPSHOT") { + + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp deleted file mode 100644 index 7b12b1ebf0..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp +++ /dev/null @@ -1,231 +0,0 @@ -#include "filter_assembler.h" -#include "plain_read_data.h" -#include <ydb/core/formats/arrow/arrow_filter.h> -#include <ydb/core/tx/columnshard/engines/filter.h> - -namespace NKikimr::NOlap::NPlainReader { - -class TFilterContext { -private: - TPortionInfo::TPreparedBatchData BatchAssembler; - YDB_ACCESSOR_DEF(std::shared_ptr<TSpecialReadContext>, Context); - YDB_ACCESSOR_DEF(std::set<ui32>, ReadyColumnIds); - std::optional<ui32> OriginalCount; - YDB_READONLY(std::shared_ptr<NArrow::TColumnFilter>, AppliedFilter, std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter())); - YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, EarlyFilter); - YDB_READONLY_DEF(std::shared_ptr<arrow::Table>, ResultTable); -public: - TFilterContext(TPortionInfo::TPreparedBatchData&& batchAssembler, const std::shared_ptr<TSpecialReadContext>& context) - : BatchAssembler(std::move(batchAssembler)) - , Context(context) - { - - } - - ui32 GetOriginalCountVerified() const { - AFL_VERIFY(OriginalCount); - return *OriginalCount; - } - - std::shared_ptr<arrow::Table> AppendToResult(const std::set<ui32>& columnIds, const std::optional<std::shared_ptr<arrow::Scalar>> constantFill = {}) { - TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.IncludedColumnIds = columnIds; - for (auto it = options.IncludedColumnIds->begin(); it != options.IncludedColumnIds->end();) { - if (!ReadyColumnIds.emplace(*it).second) { - it = options.IncludedColumnIds->erase(it); - } else { - ++it; - } - } - if (options.IncludedColumnIds->empty()) { - AFL_VERIFY(ResultTable); - return ResultTable; - } - if (constantFill) { - for (auto&& i : *options.IncludedColumnIds) { - options.ConstantColumnIds.emplace(i, *constantFill); - } - } - std::shared_ptr<arrow::Table> table = BatchAssembler.AssembleTable(options); - AFL_VERIFY(table); - if (!OriginalCount) { - OriginalCount = table->num_rows(); - } - AFL_VERIFY(AppliedFilter->Apply(table)); - if (!ResultTable) { - ResultTable = table; - } else { - AFL_VERIFY(NArrow::MergeBatchColumns({ResultTable, table}, ResultTable)); - } - return ResultTable; - } - - void ApplyFilter(const std::shared_ptr<NArrow::TColumnFilter>& filter, const bool useFilter) { - if (useFilter) { - filter->Apply(ResultTable); - *AppliedFilter = AppliedFilter->CombineSequentialAnd(*filter); - } else if (EarlyFilter) { - *EarlyFilter = EarlyFilter->And(*filter); - } else { - EarlyFilter = filter; - } - } -}; - -class IFilterConstructor { -private: - const std::set<ui32> ColumnIds; - const std::optional<std::shared_ptr<arrow::Scalar>> ConstantFill; -protected: - virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& filterContext, const std::shared_ptr<arrow::Table>& data) const = 0; -public: - IFilterConstructor(const std::set<ui32>& columnIds, const std::optional<std::shared_ptr<arrow::Scalar>> constantFill = {}) - : ColumnIds(columnIds) - , ConstantFill(constantFill) - { - AFL_VERIFY(ColumnIds.size()); - } - - virtual ~IFilterConstructor() = default; - - void Execute(TFilterContext& filterContext, const bool useFilter) { - auto result = filterContext.AppendToResult(ColumnIds, ConstantFill); - auto localFilter = BuildFilter(filterContext, result); - AFL_VERIFY(!!localFilter); - filterContext.ApplyFilter(localFilter, useFilter); - } -}; - -class TPredicateFilter: public IFilterConstructor { -private: - using TBase = IFilterConstructor; -protected: - virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& filterContext, const std::shared_ptr<arrow::Table>& data) const override { - return std::make_shared<NArrow::TColumnFilter>(filterContext.GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(data)); - } -public: - TPredicateFilter(const std::shared_ptr<TSpecialReadContext>& ctx) - : TBase(ctx->GetReadMetadata()->GetPKRangesFilter().GetColumnIds(ctx->GetReadMetadata()->GetIndexInfo())) { - - } -}; - -class TRestoreMergeData: public IFilterConstructor { -private: - using TBase = IFilterConstructor; -protected: - virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr<arrow::Table>& /*data*/) const override { - return std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter()); - } -public: - TRestoreMergeData(const std::shared_ptr<TSpecialReadContext>& ctx) - : TBase(ctx->GetMergeColumns()->GetColumnIds()) { - - } -}; - -class TFakeSnapshotData: public IFilterConstructor { -private: - using TBase = IFilterConstructor; -protected: - virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr<arrow::Table>& /*data*/) const override { - return std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter()); - } -public: - TFakeSnapshotData(const std::shared_ptr<TSpecialReadContext>& ctx) - : TBase(ctx->GetSpecColumns()->GetColumnIds(), std::make_shared<arrow::UInt64Scalar>(0)) { - - } -}; - -class TSnapshotFilter: public IFilterConstructor { -private: - using TBase = IFilterConstructor; -protected: - virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& filterContext, const std::shared_ptr<arrow::Table>& data) const override { - return std::make_shared<NArrow::TColumnFilter>(MakeSnapshotFilter(data, filterContext.GetContext()->GetReadMetadata()->GetSnapshot())); - } -public: - TSnapshotFilter(const std::shared_ptr<TSpecialReadContext>& ctx) - : TBase(ctx->GetSpecColumns()->GetColumnIds()) { - - } -}; - -class TProgramStepFilter: public IFilterConstructor { -private: - using TBase = IFilterConstructor; - std::shared_ptr<NSsa::TProgramStep> Step; -protected: - virtual std::shared_ptr<NArrow::TColumnFilter> BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr<arrow::Table>& data) const override { - return Step->BuildFilter(data); - } -public: - TProgramStepFilter(const std::shared_ptr<NSsa::TProgramStep>& step) - : TBase(step->GetFilterOriginalColumnIds()) - , Step(step) - { - } -}; - -bool TAssembleFilter::DoExecute() { - std::vector<std::shared_ptr<IFilterConstructor>> filters; - if (!UseFilter) { - filters.emplace_back(std::make_shared<TRestoreMergeData>(Context)); - } - if (FilterColumns->GetColumnIds().contains((ui32)TIndexInfo::ESpecialColumn::PLAN_STEP)) { - const bool needSnapshotsFilter = ReadMetadata->GetSnapshot() < RecordsMaxSnapshot; - if (needSnapshotsFilter) { - filters.emplace_back(std::make_shared<TSnapshotFilter>(Context)); - } else { - filters.emplace_back(std::make_shared<TFakeSnapshotData>(Context)); - } - } - if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { - filters.emplace_back(std::make_shared<TPredicateFilter>(Context)); - } - - for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { - if (!i->IsFilterOnly()) { - break; - } - - filters.emplace_back(std::make_shared<TProgramStepFilter>(i)); - } - - TFilterContext filterContext(BuildBatchConstructor(FilterColumns->GetFilteredSchemaVerified()), Context); - - for (auto&& f : filters) { - f->Execute(filterContext, UseFilter); - AFL_VERIFY(filterContext.GetResultTable()); - if (filterContext.GetResultTable()->num_rows() == 0 || (filterContext.GetEarlyFilter() && filterContext.GetEarlyFilter()->IsTotalDenyFilter())) { - break; - } - } - - AppliedFilter = filterContext.GetAppliedFilter(); - EarlyFilter = filterContext.GetEarlyFilter(); - - auto batch = filterContext.GetResultTable(); - if (!batch || batch->num_rows() == 0) { - return true; - } - - OriginalCount = filterContext.GetOriginalCountVerified(); - auto fullTable = filterContext.AppendToResult(FilterColumns->GetColumnIds()); - AFL_VERIFY(AppliedFilter->IsTotalAllowFilter() || AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data") - ("original_count", OriginalCount)("filtered_count", batch->num_rows())("use_filter", UseFilter) - ("filter_columns", FilterColumns->GetColumnIds().size())("af_count", AppliedFilter->Size())("ef_count", EarlyFilter ? EarlyFilter->Size() : 0); - - FilteredBatch = NArrow::ToBatch(fullTable, true); - return true; -} - -bool TAssembleFilter::DoApply(IDataReader& /*owner*/) const { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply"); - Source->InitFilterStageData(AppliedFilter, EarlyFilter, FilteredBatch, Source); - return true; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h deleted file mode 100644 index 12492a6d8a..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h +++ /dev/null @@ -1,76 +0,0 @@ -#pragma once -#include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h> -#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> -#include <ydb/core/tx/columnshard/counters/common/object_counter.h> -#include <ydb/core/formats/arrow/arrow_filter.h> -#include "source.h" -#include "columns_set.h" - -namespace NKikimr::NOlap::NPlainReader { - -class TAssemblerCommon: public NColumnShard::IDataTasksProcessor::ITask { -private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; -protected: - const std::shared_ptr<TSpecialReadContext> Context; - const std::shared_ptr<IDataSource> Source; - const std::shared_ptr<TPortionInfo> PortionInfo; - const TReadMetadata::TConstPtr ReadMetadata; - THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Blobs; - - TPortionInfo::TPreparedBatchData BuildBatchConstructor(const ISnapshotSchema& resultSchema) { - auto blobSchema = ReadMetadata->GetLoadSchema(PortionInfo->GetMinSnapshot()); - return PortionInfo->PrepareForAssemble(*blobSchema, resultSchema, Blobs); - } - -public: - TAssemblerCommon(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo, - const std::shared_ptr<IDataSource>& source, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs) - : TBase(context->GetCommonContext()->GetScanActorId()) - , Context(context) - , Source(source) - , PortionInfo(portionInfo) - , ReadMetadata(Context->GetReadMetadata()) - , Blobs(blobs) - { - AFL_VERIFY(Blobs.size()); - } - -}; - -class TAssembleFilter: public TAssemblerCommon, public NColumnShard::TMonitoringObjectsCounter<TAssembleFilter, true, true> { -private: - using TBase = TAssemblerCommon; - - std::shared_ptr<arrow::RecordBatch> FilteredBatch; - std::shared_ptr<NArrow::TColumnFilter> AppliedFilter; - std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; - const TSnapshot RecordsMaxSnapshot; - ui32 OriginalCount = 0; - std::shared_ptr<TColumnsSet> FilterColumns; - const bool UseFilter = true; - const NColumnShard::TCounterGuard TaskGuard; -protected: - virtual bool DoApply(IDataReader& owner) const override; - virtual bool DoExecute() override; -public: - - virtual TString GetTaskClassIdentifier() const override { - return "PlainReading::TAssembleFilter"; - } - - TAssembleFilter(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo, - const std::shared_ptr<IDataSource>& source, const std::shared_ptr<TColumnsSet>& filterColumns, const bool useFilter, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs) - : TBase(context, portionInfo, source, std::move(blobs)) - , RecordsMaxSnapshot(PortionInfo->RecordSnapshotMax()) - , FilterColumns(filterColumns) - , UseFilter(useFilter) - , TaskGuard(Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) - { - Y_UNUSED(RecordsMaxSnapshot); - TBase::SetPriority(TBase::EPriority::Normal); - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 464127ef02..5b55803cd3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -45,11 +45,7 @@ private: bool EmptyFiltersOnly() const { for (auto&& [_, i] : Sources) { - if (!i->GetFilterStageData().GetBatch() || !i->GetFilterStageData().GetBatch()->num_rows()) { - continue; - } - auto f = i->GetFilterStageData().GetNotAppliedEarlyFilter(); - if (!f || !f->IsTotalDenyFilter()) { + if (!i->IsEmptyData()) { return false; } } @@ -64,7 +60,7 @@ protected: } virtual bool DoExecute() override { if (MergingContext->IsExclusiveInterval()) { - ResultBatch = Sources.begin()->second->GetBatch(); + ResultBatch = Sources.begin()->second->GetStageData().GetBatch(); if (ResultBatch && ResultBatch->num_rows()) { LastPK = Sources.begin()->second->GetLastPK(); ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); @@ -85,8 +81,8 @@ protected: } std::shared_ptr<NIndexedReader::TMergePartialStream> merger = Context->BuildMerger(); for (auto&& [_, i] : Sources) { - if (auto rb = i->GetBatch()) { - merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter()); + if (auto rb = i->GetStageData().GetBatch()) { + merger->AddSource(rb, i->GetStageData().GetNotAppliedFilter()); } } AFL_VERIFY(merger->GetSourcesCount() <= Sources.size()); @@ -196,7 +192,7 @@ void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptr<NResourceBro AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("interval_idx", IntervalIdx)("event", "resources_allocated") ("resources", guard->DebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size()); for (auto&& [_, i] : Sources) { - i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval()), i); + i->InitFetchingPlan(Context->GetColumnsFetchingPlan(i, MergingContext->IsExclusiveInterval()), i, MergingContext->IsExclusiveInterval()); } OnInitResourcesGuard(guard); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp index d84609c623..0720416cdd 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp @@ -52,7 +52,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size(); stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs(); stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size(); - stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetSize(); + stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount(); stats->CommittedPortionsBytes = committedPortionsBytes; stats->InsertedPortionsBytes = insertedPortionsBytes; stats->CompactedPortionsBytes = compactedPortionsBytes; @@ -60,10 +60,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte } std::vector<NKikimr::NOlap::TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) { - if ((GetContext().GetIsInternalRead() && ReadyResultsCount < maxRowsInBatch) && !Scanner->IsFinished()) { - return {}; - } - auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch, GetContext().GetIsInternalRead()); + auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch); ui32 count = 0; for (auto&& r: result) { count += r.GetRecordsCount(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp index b80813b1d8..7611a4488f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp @@ -131,10 +131,6 @@ bool TScanHead::IsReverse() const { return GetContext().GetReadMetadata()->IsDescSorted(); } -NKikimr::NOlap::NPlainReader::TFetchingPlan TScanHead::GetColumnsFetchingPlan(const bool exclusiveSource) const { - return Context->GetColumnsFetchingPlan(exclusiveSource); -} - void TScanHead::Abort() { for (auto&& i : FetchingIntervals) { i.second->Abort(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h index 971cbc654b..8e071e8f54 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h @@ -48,8 +48,6 @@ private: ui64 ZeroCount = 0; public: - TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const; - bool IsReverse() const; void Abort(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 254c6205b1..e6a408933f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -1,66 +1,49 @@ #include "source.h" #include "interval.h" #include "fetched_data.h" -#include "constructor.h" #include "plain_read_data.h" +#include "constructor.h" #include <ydb/core/formats/arrow/serializer/full.h> #include <ydb/core/tx/columnshard/blobs_reader/actor.h> #include <ydb/core/tx/columnshard/blobs_reader/events.h> +#include <ydb/core/tx/conveyor/usage/service.h> +#include <ydb/core/formats/arrow/simple_arrays_cache.h> namespace NKikimr::NOlap::NPlainReader { -void IDataSource::InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& batchExt) { - auto batch = batchExt; - if (!batch && FetchingPlan->GetFetchingStage()->GetSize()) { - const ui32 numRows = GetFilterStageData().GetBatch() ? GetFilterStageData().GetBatch()->num_rows() : 0; - batch = NArrow::MakeEmptyBatch(FetchingPlan->GetFetchingStage()->GetSchema(), numRows); - } - if (batch) { - Y_ABORT_UNLESS((ui32)batch->num_columns() == FetchingPlan->GetFetchingStage()->GetSize()); - } - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchStageData")); - Y_ABORT_UNLESS(!FetchStageData); - FetchStageData = std::make_shared<TFetchStageData>(batch); - for (auto&& i : Intervals) { - i.second->OnSourceFetchStageReady(GetSourceIdx()); - } - Intervals.clear(); -} - -void IDataSource::InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, - const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<IDataSource>& sourcePtr) { - if (IsAborted()) { - return; - } - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFilterStageData")); - Y_ABORT_UNLESS(!FilterStageData); - FilterStageData = std::make_shared<TFilterStageData>(appliedFilter, earlyFilter, batch); - if (batch) { - AFL_VERIFY((ui32)batch->num_columns() == FetchingPlan->GetFilterStage()->GetSize())("batch", batch->schema()->ToString())("filter", FetchingPlan->GetFilterStage()->DebugString()); - } - DoStartFetchStage(sourcePtr); -} - -void IDataSource::InitFetchingPlan(const TFetchingPlan& fetchingPlan, const std::shared_ptr<IDataSource>& sourcePtr) { +void IDataSource::InitFetchingPlan(const std::shared_ptr<IFetchingStep>& fetchingFirstStep, const std::shared_ptr<IDataSource>& sourcePtr, const bool isExclusive) { + AFL_VERIFY(fetchingFirstStep); if (AtomicCas(&FilterStageFlag, 1, 0)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingPlan.DebugString()); - Y_ABORT_UNLESS(!FetchingPlan); - FetchingPlan = fetchingPlan; + StageData = std::make_shared<TFetchedData>(isExclusive); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingFirstStep->DebugString())("source_idx", SourceIdx); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); if (IsAborted()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "InitFetchingPlanAborted"); return; } - DoStartFilterStage(sourcePtr); + if (fetchingFirstStep->ExecuteInplace(sourcePtr, fetchingFirstStep)) { + auto task = std::make_shared<TStepAction>(sourcePtr, fetchingFirstStep->GetNextStep(), Context->GetCommonContext()->GetScanActorId()); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); + } } } void IDataSource::RegisterInterval(TFetchingInterval& interval) { - if (!FetchStageData) { + if (!IsReadyFlag) { AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second); } } +void IDataSource::SetIsReady() { + AFL_VERIFY(!IsReadyFlag); + IsReadyFlag = true; + for (auto&& i : Intervals) { + i.second->OnSourceFetchStageReady(SourceIdx); + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "source_ready")("intervals_count", Intervals.size())("source_idx", SourceIdx); + Intervals.clear(); +} + void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, const std::shared_ptr<IBlobsReadingAction>& readingAction, THashMap<TBlobRange, ui32>& nullBlocks, const std::shared_ptr<NArrow::TColumnFilter>& filter) { @@ -90,65 +73,59 @@ void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "chunks_stats")("fetch", fetchedChunks)("null", nullChunks)("reading_action", readingAction->GetStorageId())("columns", columnIds.size()); } -void TPortionDataSource::DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchEF"); - Y_ABORT_UNLESS(FetchingPlan->GetFilterStage()->GetSize()); - auto& columnIds = FetchingPlan->GetFilterStage()->GetColumnIds(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "StartFilterStage")("fetching_info", FetchingPlan->DebugString()); +bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, + const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName()); + Y_ABORT_UNLESS(columns->GetColumnsCount()); + auto& columnIds = columns->GetColumnIds(); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString()); - auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FILTER"); + auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::" + step->GetName()); readAction->SetIsBackgroundProcess(false); - THashMap<TBlobRange, ui32> nullBlocks; - NeedFetchColumns(columnIds, readAction, nullBlocks, nullptr); + { + THashMap<TBlobRange, ui32> nullBlocks; + NeedFetchColumns(columnIds, readAction, nullBlocks, StageData->GetAppliedFilter()); + StageData->AddNulls(std::move(nullBlocks)); + } std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; - auto constructor = std::make_shared<TEFTaskConstructor>(GetContext(), actions, std::move(nullBlocks), FetchingPlan->GetFilterStage(), *this, sourcePtr, FetchingPlan->CanUseEarlyFilterImmediately(), "ReaderFilter"); -// NActors::TActivationContext::AsActorContext().Send(GetContext()->GetCommonContext()->GetReadCoordinatorActorId(), new NOlap::NBlobOperations::NRead::TEvStartReadTask(constructor)); + auto constructor = std::make_shared<TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); + return true; } -void TPortionDataSource::DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) { - Y_ABORT_UNLESS(!FetchStageData); - Y_ABORT_UNLESS(FilterStageData); - if (FetchingPlan->GetFetchingStage()->GetSize() && !FilterStageData->IsEmptyFilter()) { - auto& columnIds = FetchingPlan->GetFetchingStage()->GetColumnIds(); +void TPortionDataSource::DoAbort() { +} - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "RealStartFetchStage")("fetching_info", FetchingPlan->DebugString()); - auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FETCHING"); - readAction->SetIsBackgroundProcess(false); - THashMap<TBlobRange, ui32> nullBlocks; - NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetAppliedFilter()); - if (readAction->GetExpectedBlobsCount()) { - std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; - auto constructor = std::make_shared<TFFColumnsTaskConstructor>(GetContext(), actions, std::move(nullBlocks), FetchingPlan->GetFetchingStage(), *this, sourcePtr, "ReaderFetcher"); - NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); - return; - } - } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DontStartFetchStage")("fetching_info", FetchingPlan->DebugString()); +bool TCommittedDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& /*columns*/) { + if (ReadStarted) { + return false; } - InitFetchStageData(nullptr); -} + ReadStarted = true; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString()); -void TPortionDataSource::DoAbort() { -} + std::shared_ptr<IBlobsStorageOperator> storageOperator = GetContext()->GetCommonContext()->GetStoragesManager()->GetInsertOperator(); + auto readAction = storageOperator->StartReadingAction("CS::READ::" + step->GetName()); -void TCommittedDataSource::DoFetch(const std::shared_ptr<IDataSource>& sourcePtr) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetch"); - if (!ReadStarted) { - Y_ABORT_UNLESS(!ResultReady); - ReadStarted = true; + readAction->SetIsBackgroundProcess(false); + readAction->AddRange(CommittedBlob.GetBlobRange()); - std::shared_ptr<IBlobsStorageOperator> storageOperator = GetContext()->GetCommonContext()->GetStoragesManager()->GetInsertOperator(); - auto readAction = storageOperator->StartReadingAction("CS::READ::COMMITTED"); - readAction->SetIsBackgroundProcess(false); - readAction->AddRange(CommittedBlob.GetBlobRange()); + std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; + auto constructor = std::make_shared<TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), ""); + NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); + return true; +} - THashMap<TBlobRange, ui32> nullBlocks; - std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; - auto constructor = std::make_shared<TCommittedColumnsTaskConstructor>(GetContext(), actions, std::move(nullBlocks), *this, sourcePtr, "ReaderCommitted"); - NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); +void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) { + if (!GetStageData().GetTable()) { + Y_ABORT_UNLESS(GetStageData().GetBlobs().size() == 1); + auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first); + auto batch = NArrow::DeserializeBatch(bData, GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion())); + Y_ABORT_UNLESS(batch); + batch = GetContext()->GetReadMetadata()->GetIndexInfo().AddSpecialColumns(batch, CommittedBlob.GetSnapshot()); + MutableStageData().AddBatch(batch); } + MutableStageData().SyncTableColumns(columns->GetSchema()->fields()); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index 693a4353bc..34f6a1fe04 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -7,6 +7,7 @@ #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/core/tx/columnshard/engines/insert_table/data.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> #include <ydb/core/formats/arrow/arrow_helpers.h> namespace NKikimr::NOlap { @@ -18,6 +19,7 @@ namespace NKikimr::NOlap::NPlainReader { class TFetchingInterval; class TPlainReadData; class IFetchTaskConstructor; +class IFetchingStep; class IDataSource { private: @@ -25,6 +27,8 @@ private: YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start); YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish); YDB_READONLY_DEF(std::shared_ptr<TSpecialReadContext>, Context); + YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero()); + std::optional<ui32> RecordsCount; YDB_READONLY(ui32, IntervalsCount, 0); virtual NJson::TJsonValue DoDebugJson() const = 0; bool MergingStartedFlag = false; @@ -32,21 +36,38 @@ private: protected: THashMap<ui32, TFetchingInterval*> Intervals; - std::shared_ptr<TFilterStageData> FilterStageData; - std::shared_ptr<TFetchStageData> FetchStageData; - - std::optional<TFetchingPlan> FetchingPlan; + std::shared_ptr<TFetchedData> StageData; TAtomic FilterStageFlag = 0; + bool IsReadyFlag = false; bool IsAborted() const { return AbortedFlag; } - virtual void DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) = 0; - virtual void DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) = 0; + virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) = 0; + virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) = 0; virtual void DoAbort() = 0; public: + void SetIsReady(); + + bool IsEmptyData() const { + return GetStageData().IsEmpty(); + } + + void AssembleColumns(const std::shared_ptr<TColumnsSet>& columns) { + if (columns->IsEmpty()) { + return; + } + DoAssembleColumns(columns); + } + + bool StartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) { + AFL_VERIFY(columns); + return DoStartFetchingColumns(sourcePtr, step, columns); + } + void InitFetchingPlan(const std::shared_ptr<IFetchingStep>& fetchingFirstStep, const std::shared_ptr<IDataSource>& sourcePtr, const bool isExclusive); + std::shared_ptr<arrow::RecordBatch> GetLastPK() const { return Finish.ExtractSortingPosition(); } @@ -56,11 +77,6 @@ public: virtual ui64 GetRawBytes(const std::set<ui32>& columnIds) const = 0; - const TFetchingPlan& GetFetchingPlan() const { - Y_ABORT_UNLESS(FetchingPlan); - return *FetchingPlan; - } - bool IsMergingStarted() const { return MergingStartedFlag; } @@ -87,35 +103,37 @@ public: bool OnIntervalFinished(const ui32 intervalIdx); - std::shared_ptr<arrow::RecordBatch> GetBatch() const { - if (!FilterStageData || !FetchStageData) { - return nullptr; - } - return NArrow::MergeColumns({FilterStageData->GetBatch(), FetchStageData->GetBatch()}); - } - bool IsDataReady() const { - return !!FilterStageData && !!FetchStageData; + return IsReadyFlag; } - const TFilterStageData& GetFilterStageData() const { - Y_ABORT_UNLESS(FilterStageData); - return *FilterStageData; + const TFetchedData& GetStageData() const { + AFL_VERIFY(StageData); + return *StageData; } - void InitFetchingPlan(const TFetchingPlan& fetchingPlan, const std::shared_ptr<IDataSource>& sourcePtr); + TFetchedData& MutableStageData() { + AFL_VERIFY(StageData); + return *StageData; + } - void InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch - , const std::shared_ptr<IDataSource>& sourcePtr); - void InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& batch); + ui32 GetRecordsCount() const { + AFL_VERIFY(RecordsCount); + return *RecordsCount; + } void RegisterInterval(TFetchingInterval& interval); - IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context, const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) + IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context, + const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, + const TSnapshot& recordSnapshotMax, const std::optional<ui32> recordsCount + ) : SourceIdx(sourceIdx) , Start(start) , Finish(finish) , Context(context) + , RecordSnapshotMax(recordSnapshotMax) + , RecordsCount(recordsCount) { if (Start.IsReverseSort()) { std::swap(Start, Finish); @@ -137,8 +155,11 @@ private: const std::shared_ptr<IBlobsReadingAction>& readingAction, THashMap<TBlobRange, ui32>& nullBlocks, const std::shared_ptr<NArrow::TColumnFilter>& filter); - virtual void DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) override; - virtual void DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) override; + virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) override; + virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) override { + auto blobSchema = GetContext()->GetReadMetadata()->GetLoadSchema(Portion->GetMinSnapshot()); + MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, columns->GetFilteredSchemaVerified(), MutableStageData().MutableBlobs()).AssembleTable()); + } virtual NJson::TJsonValue DoDebugJson() const override { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("type", "portion"); @@ -162,7 +183,7 @@ public: TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context, const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) - : TBase(sourceIdx, context, start, finish) + : TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount()) , Portion(portion) { } @@ -173,20 +194,14 @@ private: using TBase = IDataSource; TCommittedBlob CommittedBlob; bool ReadStarted = false; - bool ResultReady = false; - void DoFetch(const std::shared_ptr<IDataSource>& sourcePtr); virtual void DoAbort() override { } - virtual void DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) override { - DoFetch(sourcePtr); - } - - virtual void DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) override { - DoFetch(sourcePtr); - } + virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, + const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) override; + virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) override; virtual NJson::TJsonValue DoDebugJson() const override { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("type", "commit"); @@ -204,7 +219,7 @@ public: TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context, const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) - : TBase(sourceIdx, context, start, finish) + : TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {}) , CommittedBlob(committed) { } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make index c9224b8d78..4a4db941aa 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make @@ -2,16 +2,14 @@ LIBRARY() SRCS( scanner.cpp - source.cpp constructor.cpp + source.cpp interval.cpp fetched_data.cpp plain_read_data.cpp - filter_assembler.cpp - column_assembler.cpp - committed_assembler.cpp columns_set.cpp context.cpp + fetching.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h index 8164f28f82..bc5d5fa1ba 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/read_context.h @@ -57,7 +57,6 @@ class TReadContext { private: YDB_READONLY_DEF(std::shared_ptr<IStoragesManager>, StoragesManager); const NColumnShard::TConcreteScanCounters Counters; - YDB_READONLY(bool, IsInternalRead, false); TReadMetadataBase::TConstPtr ReadMetadata; NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext; const TActorId ScanActorId; @@ -97,11 +96,10 @@ public: return ResourcesTaskContext; } - TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const bool isInternalRead, const TReadMetadataBase::TConstPtr& readMetadata, + TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const NOlap::TComputeShardingPolicy& computeShardingPolicy) : StoragesManager(storagesManager) , Counters(counters) - , IsInternalRead(isInternalRead) , ReadMetadata(readMetadata) , ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters) , ScanActorId(scanActorId) @@ -165,8 +163,6 @@ public: TString DebugString(const bool verbose) const { TStringBuilder sb; - sb << "internal:" << Context->GetIsInternalRead() << ";" - ; sb << DoDebugString(verbose); return sb; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 729a50ae7a..988d6553b3 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -56,13 +56,6 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); } } - if (Snapshot.GetPlanStep()) { - auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); - for (auto&& i : snapSchema->fields()) { - result.emplace(indexInfo.GetColumnId(i->name())); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name()); - } - } return result; } @@ -87,7 +80,6 @@ void TReadStats::PrintToLog() { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) ("event", "statistic") ("begin", BeginTimestamp) - ("selected", SelectedIndex) ("index_granules", IndexGranules) ("index_portions", IndexPortions) ("index_batches", IndexBatches) diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 4d322c3716..4b0580a6bb 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -25,7 +25,6 @@ class TReadContext; struct TReadStats { TInstant BeginTimestamp; - ui32 SelectedIndex{0}; ui64 IndexGranules{0}; ui64 IndexPortions{0}; ui64 IndexBatches{0}; @@ -42,9 +41,8 @@ struct TReadStats { ui32 SelectedRows = 0; - TReadStats(ui32 indexNo = 0) + TReadStats() : BeginTimestamp(TInstant::Now()) - , SelectedIndex(indexNo) {} void PrintToLog(); @@ -166,7 +164,7 @@ public: , IndexVersions(info) , Snapshot(snapshot) , ResultIndexSchema(info.GetSchema(Snapshot)) - , ReadStats(std::make_shared<TReadStats>(info.GetLastSchema()->GetIndexInfo().GetId())) + , ReadStats(std::make_shared<TReadStats>()) { } @@ -271,7 +269,7 @@ public: if (!ResultIndexSchema) { return {}; } - auto f = ResultIndexSchema->GetFieldByColumnId(columnId); + auto f = ResultIndexSchema->GetFieldByColumnIdOptional(columnId); if (!f) { return {}; } diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp index 13dc7a35f1..a80496460e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp @@ -12,7 +12,7 @@ std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByIndex(const int index) } return schema->field(index); } -std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnId(const ui32 columnId) const { +std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnIdOptional(const ui32 columnId) const { return GetFieldByIndex(GetFieldIndex(columnId)); } @@ -111,8 +111,8 @@ ui32 ISnapshotSchema::GetColumnId(const std::string& columnName) const { } std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnIdVerified(const ui32 columnId) const { - auto result = GetFieldByColumnId(columnId); - AFL_VERIFY(result)("event", "unknown_column")("column_id", columnId); + auto result = GetFieldByColumnIdOptional(columnId); + AFL_VERIFY(result)("event", "unknown_column")("column_id", columnId)("schema", DebugString()); return result; } diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h index 9232edcf1d..b063ba7d02 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h @@ -51,7 +51,7 @@ public: ui32 GetColumnId(const std::string& columnName) const; std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const; - std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const; + std::shared_ptr<arrow::Field> GetFieldByColumnIdOptional(const ui32 columnId) const; std::shared_ptr<arrow::Field> GetFieldByColumnIdVerified(const ui32 columnId) const; TString DebugString() const { diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 1e72a2e7ea..b742348509 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -206,8 +206,7 @@ public: std::shared_ptr<NOlap::TSerializationStats> BuildSerializationStats(ISnapshotSchema::TPtr schema) const { auto result = std::make_shared<NOlap::TSerializationStats>(); for (auto&& i : GetAdditiveSummary().GetCompacted().GetColumnStats()) { - auto field = schema->GetFieldByColumnId(i.first); - AFL_VERIFY(field)("column_id", i.first)("schema", schema->DebugString()); + auto field = schema->GetFieldByColumnIdVerified(i.first); NOlap::TColumnSerializationStat columnInfo(i.first, field->name()); columnInfo.Merge(i.second); result->AddStat(columnInfo); diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index 5edc9c18d1..feda743b26 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -63,7 +63,7 @@ public: AFL_VERIFY(Stats); } virtual std::shared_ptr<arrow::Field> GetField(const ui32 columnId) const override { - return Schema->GetFieldByColumnId(columnId); + return Schema->GetFieldByColumnIdOptional(columnId); } virtual bool NeedMinMaxForColumn(const ui32 columnId) const override { return Schema->GetIndexInfo().GetMinMaxIdxColumns().contains(columnId); diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index dfc554c4ad..d08e222589 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -685,6 +685,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt std::unique_ptr<NOlap::NTests::TShardReader> reader; if (!misconfig) { reader = std::make_unique<NOlap::NTests::TShardReader>(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>())); + reader->SetReplyColumns({specs[i].TtlColumn}); counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here counter.WaitReadsCaptured(runtime); reader->InitializeScanner(); |