diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-25 08:36:56 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-25 09:17:49 +0300 |
commit | a13962045e5307dc6ab38d061e75b1f518a8ae29 (patch) | |
tree | f60b1182eb0fd833c4cc83daf023d2a6dacc4237 | |
parent | 01216eb22028efa53b8d26d3c9a84be78cef40ba (diff) | |
download | ydb-a13962045e5307dc6ab38d061e75b1f518a8ae29.tar.gz |
KIKIMR-19213: use conveyors on merge results
27 files changed, 284 insertions, 101 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 568ea59f6a..f9951b9432 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -161,6 +161,9 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const std::vector<TString>& columnNames) { + if (!srcBatch) { + return srcBatch; + } if (columnNames.empty()) { return nullptr; } diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h index b8f97af026..1c05f52796 100644 --- a/ydb/core/formats/arrow/reader/read_filter_merger.h +++ b/ydb/core/formats/arrow/reader/read_filter_merger.h @@ -100,6 +100,9 @@ public: bool IsEqual() const { return !GreaterIfNotEqual; } + bool IsLess() const { + return !!GreaterIfNotEqual && !*GreaterIfNotEqual; + } bool IsGreater() const { return !!GreaterIfNotEqual && *GreaterIfNotEqual; } diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 2664631c23..fb7e93ade7 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -87,10 +87,10 @@ public: return ReadyResults.size(); } - virtual TString DebugString() const override { + virtual TString DebugString(const bool verbose) const override { return TStringBuilder() << "ready_results:(" << ReadyResults.DebugString() << ");" - << "indexed_data:(" << IndexedData->DebugString() << ")" + << "indexed_data:(" << IndexedData->DebugString(verbose) << ")" ; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 40ad858f28..a7c9873356 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -281,13 +281,7 @@ private: const i64 maxSteps = ReadMetadataRanges.size(); for (i64 step = 0; step <= maxSteps; ++step) { ContinueProcessingStep(); - - // Only exist the loop if either: - // * we have finished scanning ALL the ranges - // * or there is an in-flight blob read or ScanData message for which - // we will get a reply and will be able to proceed further - if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads - || MemoryAccessor->InWaiting()) { + if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads || MemoryAccessor->InWaiting() || ScanCountersPool.InWaiting()) { return; } } @@ -436,7 +430,7 @@ private: << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " bytes: " << Bytes << " rows: " << Rows << " page faults: " << Result->PageFaults << " finished: " << Result->Finished << " pageFault: " << Result->PageFault - << " stats:" << Stats.DebugString(); + << " stats:" << Stats.DebugString() << ";iterator:" << (ScanIterator ? ScanIterator->DebugString(false) : "NO"); } else { Y_ABORT_UNLESS(ChunksLimiter.Take(Bytes)); Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore(); diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 5ec177b6ae..f3a4fdc946 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -122,7 +122,8 @@ public: } virtual std::shared_ptr<NOlap::NBlobOperations::NRead::ITask> GetNextTaskToRead() { return nullptr; } - virtual TString DebugString() const { + virtual TString DebugString(const bool verbose = false) const { + Y_UNUSED(verbose); return "NO_DATA"; } }; diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index 437080b402..8a3d6db77d 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -162,12 +162,54 @@ public: TScanAggregations BuildAggregations(); }; +class TCounterGuard: TNonCopyable { +private: + std::shared_ptr<TAtomicCounter> Counter; +public: + TCounterGuard(TCounterGuard&& guard) { + Counter = guard.Counter; + guard.Counter = nullptr; + } + + TCounterGuard(const std::shared_ptr<TAtomicCounter>& counter) + : Counter(counter) + { + AFL_VERIFY(Counter); + Counter->Inc(); + } + ~TCounterGuard() { + if (Counter) { + AFL_VERIFY(Counter->Dec() >= 0); + } + } + +}; + class TConcreteScanCounters: public TScanCounters { private: using TBase = TScanCounters; + std::shared_ptr<TAtomicCounter> MergeTasksCount; + std::shared_ptr<TAtomicCounter> AssembleTasksCount; + std::shared_ptr<TAtomicCounter> ReadTasksCount; public: TScanAggregations Aggregations; + TCounterGuard GetMergeTasksGuard() const { + return TCounterGuard(MergeTasksCount); + } + + TCounterGuard GetReadTasksGuard() const { + return TCounterGuard(ReadTasksCount); + } + + TCounterGuard GetAssembleTasksGuard() const { + return TCounterGuard(AssembleTasksCount); + } + + bool InWaiting() const { + return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val(); + } + void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const { TBase::OnBlobsWaitDuration(d); Aggregations.OnBlobWaitingDuration(d, fullScanDuration); @@ -175,6 +217,9 @@ public: TConcreteScanCounters(const TScanCounters& counters) : TBase(counters) + , MergeTasksCount(std::make_shared<TAtomicCounter>()) + , AssembleTasksCount(std::make_shared<TAtomicCounter>()) + , ReadTasksCount(std::make_shared<TAtomicCounter>()) , Aggregations(TBase::BuildAggregations()) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt index 10911162af..067782793e 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt @@ -13,6 +13,7 @@ target_link_libraries(engines-reader-plain_reader PUBLIC yutil core-formats-arrow tx-columnshard-blobs_action + tx-conveyor-usage ) target_sources(engines-reader-plain_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt index 52f5d3db88..7907ec8619 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt @@ -14,6 +14,7 @@ target_link_libraries(engines-reader-plain_reader PUBLIC yutil core-formats-arrow tx-columnshard-blobs_action + tx-conveyor-usage ) target_sources(engines-reader-plain_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt index 52f5d3db88..7907ec8619 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(engines-reader-plain_reader PUBLIC yutil core-formats-arrow tx-columnshard-blobs_action + tx-conveyor-usage ) target_sources(engines-reader-plain_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt index 10911162af..067782793e 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt @@ -13,6 +13,7 @@ target_link_libraries(engines-reader-plain_reader PUBLIC yutil core-formats-arrow tx-columnshard-blobs_action + tx-conveyor-usage ) target_sources(engines-reader-plain_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp index c755f3263b..047dbf52ab 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp @@ -27,11 +27,12 @@ bool TAssembleFFBatch::DoApply(IDataReader& owner) const { } TAssembleBatch::TAssembleBatch(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor, - const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter) + const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter, NColumnShard::TCounterGuard&& taskGuard) : TBase(scanActorId) , BatchConstructor(batchConstructor) , Filter(filter) , SourceIdx(sourceIdx) + , TaskGuard(std::move(taskGuard)) { TBase::SetPriority(TBase::EPriority::High); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h index 3a8262b2ce..fc3df48012 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h @@ -3,6 +3,7 @@ #include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/core/tx/columnshard/counters/common/object_counter.h> +#include <ydb/core/tx/columnshard/counters/scan.h> #include <ydb/core/formats/arrow/arrow_filter.h> namespace NKikimr::NOlap::NPlainReader { @@ -15,6 +16,7 @@ private: protected: std::shared_ptr<arrow::RecordBatch> Result; const ui32 SourceIdx; + const NColumnShard::TCounterGuard TaskGuard; virtual bool DoExecute() override; public: virtual TString GetTaskClassIdentifier() const override { @@ -22,7 +24,7 @@ public: } TAssembleBatch(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor, - const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter); + const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter, NColumnShard::TCounterGuard&& taskGuard); }; class TAssembleFFBatch: public TAssembleBatch { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp index ab5e5372c5..69520d4bdd 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp @@ -21,13 +21,14 @@ bool TCommittedAssembler::DoApply(IDataReader& owner) const { } TCommittedAssembler::TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const ui32 sourceIdx, - const TCommittedBlob& cBlob) + const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard) : TBase(scanActorId) , BlobData(blobData) , ReadMetadata(readMetadata) , SourceIdx(sourceIdx) , SchemaVersion(cBlob.GetSchemaVersion()) , DataSnapshot(cBlob.GetSnapshot()) + , TaskGuard(std::move(taskGuard)) { } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h index f0b00d990c..322a17efbf 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h @@ -4,6 +4,7 @@ #include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/core/tx/columnshard/counters/common/object_counter.h> +#include <ydb/core/tx/columnshard/counters/scan.h> #include <ydb/core/formats/arrow/arrow_filter.h> namespace NKikimr::NOlap::NPlainReader { @@ -18,6 +19,7 @@ private: std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; std::shared_ptr<arrow::RecordBatch> ResultBatch; + const NColumnShard::TCounterGuard TaskGuard; protected: virtual bool DoExecute() override; virtual bool DoApply(IDataReader& owner) const override; @@ -27,6 +29,6 @@ public: } TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const ui32 sourceIdx, - const TCommittedBlob& cBlob); + const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard); }; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp index b00f3ef17b..b0d1dd2d36 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp @@ -32,12 +32,12 @@ TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAsse void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFilter>(ScanActorId, BuildBatchAssembler(), - ReadMetadata, SourceIdx, ColumnIds, UseEarlyFilter)); + ReadMetadata, SourceIdx, ColumnIds, UseEarlyFilter, Context.GetCounters().GetAssembleTasksGuard())); } void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFFBatch>(ScanActorId, BuildBatchAssembler(), - SourceIdx, AppliedFilter)); + SourceIdx, AppliedFilter, Context.GetCounters().GetAssembleTasksGuard())); } void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { @@ -45,7 +45,7 @@ void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NReso Y_ABORT_UNLESS(NullBlocks.size() == 0); Y_ABORT_UNLESS(blobs.size() == 1); NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TCommittedAssembler>(ScanActorId, blobs.begin()->second, - ReadMetadata, SourceIdx, CommittedBlob)); + ReadMetadata, SourceIdx, CommittedBlob, Context.GetCounters().GetAssembleTasksGuard())); } bool IFetchTaskConstructor::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h index c24d463720..a9d71aa349 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h @@ -17,6 +17,7 @@ protected: std::shared_ptr<const TReadMetadata> ReadMetadata; TReadContext Context; THashMap<TBlobRange, ui32> NullBlocks; + NColumnShard::TCounterGuard TasksGuard; virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override; public: IFetchTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, const IDataSource& source, const TString& taskCustomer) @@ -26,6 +27,7 @@ public: , ReadMetadata(reader.GetReadMetadata()) , Context(reader.GetContext()) , NullBlocks(std::move(nullBlocks)) + , TasksGuard(reader.GetCounters().GetReadTasksGuard()) { } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h index 30ece24411..ec7eb52c2b 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h @@ -21,6 +21,7 @@ namespace NKikimr::NOlap::NPlainReader { bool AllowEarlyFilter = false; std::set<ui32> FilterColumnIds; const bool UseFilter = true; + const NColumnShard::TCounterGuard TaskGuard; protected: virtual bool DoApply(IDataReader& owner) const override; virtual bool DoExecute() override; @@ -31,13 +32,14 @@ namespace NKikimr::NOlap::NPlainReader { } TAssembleFilter(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, - const ui32 sourceIdx, const std::set<ui32>& filterColumnIds, const bool useFilter) + const ui32 sourceIdx, const std::set<ui32>& filterColumnIds, const bool useFilter, NColumnShard::TCounterGuard&& taskGuard) : TBase(scanActorId) , BatchConstructor(batchConstructor) , SourceIdx(sourceIdx) , ReadMetadata(readMetadata) , FilterColumnIds(filterColumnIds) , UseFilter(useFilter) + , TaskGuard(std::move(taskGuard)) { TBase::SetPriority(TBase::EPriority::Normal); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index b55dbba5e3..e43fa12260 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -1,5 +1,7 @@ #include "interval.h" #include "scanner.h" +#include "plain_read_data.h" +#include <ydb/core/tx/conveyor/usage/service.h> namespace NKikimr::NOlap::NPlainReader { @@ -7,40 +9,84 @@ bool TFetchingInterval::IsExclusiveSource() const { return IncludeStart && Sources.size() == 1 && IncludeFinish; } +class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask, public TMergingContext { +private: + std::shared_ptr<NIndexedReader::TMergePartialStream> Merger; + std::shared_ptr<arrow::RecordBatch> ResultBatch; + NColumnShard::TConcreteScanCounters Counters; + const NColumnShard::TCounterGuard Guard; + const bool IsExclusiveSource = false; + const ui32 OriginalSourcesCount; +protected: + virtual bool DoApply(NOlap::IDataReader& indexedDataRead) const override { + auto& reader = static_cast<TPlainReadData&>(indexedDataRead); + if (Merger->GetSourcesCount() == 1 && ResultBatch) { + auto batch = NArrow::ExtractColumnsValidate(ResultBatch, reader.GetScanner().GetResultFieldNames()); + AFL_VERIFY(batch); + reader.MutableScanner().OnIntervalResult(batch, IntervalIdx); + } else { + reader.MutableScanner().OnIntervalResult(ResultBatch, IntervalIdx); + } + return true; + } + virtual bool DoExecute() override { + Merger->SkipToLowerBound(Start, IncludeStart); + if (Merger->GetSourcesCount() == 1) { + ResultBatch = Merger->SingleSourceDrain(Finish, IncludeFinish); + if (ResultBatch) { + if (IsExclusiveSource) { + Counters.OnNoScanInterval(ResultBatch->num_rows()); + } else { + Counters.OnLogScanInterval(ResultBatch->num_rows()); + } + } + if (IncludeFinish && OriginalSourcesCount == 1) { + Y_ABORT_UNLESS(Merger->IsEmpty()); + } + } else { + Merger->DrainCurrentTo(*RBBuilder, Finish, IncludeFinish); + Counters.OnLinearScanInterval(RBBuilder->GetRecordsCount()); + ResultBatch = RBBuilder->Finalize(); + } + return true; + } +public: + virtual TString GetTaskClassIdentifier() const override { + return "CS::MERGE_RESULT"; + } + + TMergeTask(const std::shared_ptr<NIndexedReader::TMergePartialStream>& merger, + const TMergingContext& context, const NColumnShard::TConcreteScanCounters& counters, const bool isExclusiveSource, const ui32 sourcesCount) + : TMergingContext(context) + , Merger(merger) + , Counters(counters) + , Guard(Counters.GetMergeTasksGuard()) + , IsExclusiveSource(isExclusiveSource) + , OriginalSourcesCount(sourcesCount) + { + } +}; + void TFetchingInterval::ConstructResult() { - if (!Merger || !IsSourcesReady()) { + if (!IsSourcesReady()) { return; } + AFL_VERIFY(!ResultConstructionInProgress); + ResultConstructionInProgress = true; + auto merger = Scanner.BuildMerger(); for (auto&& [_, i] : Sources) { - if (i->GetStart().Compare(Start) == std::partial_ordering::equivalent && !i->IsMergingStarted()) { - if (auto rb = i->GetBatch()) { - Merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter()); - } - i->StartMerging(); + if (auto rb = i->GetBatch()) { + merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter()); } } - std::shared_ptr<arrow::RecordBatch> simpleBatch; - AFL_VERIFY(Merger->GetSourcesCount() <= Sources.size()); - if (Sources.size() == 1) { - simpleBatch = Merger->SingleSourceDrain(Finish, IncludeFinish); - if (simpleBatch) { - if (IsExclusiveSource()) { - Scanner.GetContext().GetCounters().OnNoScanInterval(simpleBatch->num_rows()); - } else { - Scanner.GetContext().GetCounters().OnLogScanInterval(simpleBatch->num_rows()); - } - simpleBatch = NArrow::ExtractColumnsValidate(simpleBatch, Scanner.GetResultFieldNames()); - AFL_VERIFY(simpleBatch); - } - if (IncludeFinish) { - Y_ABORT_UNLESS(Merger->IsEmpty()); - } + AFL_VERIFY(merger->GetSourcesCount() <= Sources.size()); + if (merger->GetSourcesCount() == 0) { + Scanner.OnIntervalResult(nullptr, IntervalIdx); } else { - Merger->DrainCurrentTo(*RBBuilder, Finish, IncludeFinish); - Scanner.GetContext().GetCounters().OnLinearScanInterval(RBBuilder->GetRecordsCount()); - simpleBatch = RBBuilder->Finalize(); + auto task = std::make_shared<TMergeTask>(merger, *this, Scanner.GetContext().GetCounters(), IsExclusiveSource(), Sources.size()); + task->SetPriority(NConveyor::ITask::EPriority::High); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); } - Scanner.OnIntervalResult(simpleBatch, GetIntervalIdx()); } void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) { @@ -51,23 +97,12 @@ void TFetchingInterval::OnSourceFilterStageReady(const ui32 /*sourceIdx*/) { ConstructResult(); } -void TFetchingInterval::StartMerge(std::shared_ptr<NIndexedReader::TMergePartialStream> merger) { - Y_ABORT_UNLESS(!Merger); - Merger = merger; - ConstructResult(); -} - TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, TScanHead& scanner, std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish, const bool includeStart) - : Scanner(scanner) - , Start(start) - , Finish(finish) - , IncludeFinish(includeFinish) - , IncludeStart(includeStart) + : TBase(start, finish, intervalIdx, builder, includeFinish, includeStart) + , Scanner(scanner) , Sources(sources) - , IntervalIdx(intervalIdx) - , RBBuilder(builder) { Y_ABORT_UNLESS(Sources.size()); for (auto&& [_, i] : Sources) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h index c67eced0ea..17977320ee 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h @@ -6,17 +6,37 @@ namespace NKikimr::NOlap::NPlainReader { class TScanHead; -class TFetchingInterval: TNonCopyable { -private: - TScanHead& Scanner; +class TMergingContext { +protected: NIndexedReader::TSortableBatchPosition Start; NIndexedReader::TSortableBatchPosition Finish; const bool IncludeFinish = true; const bool IncludeStart = false; - std::map<ui32, std::shared_ptr<IDataSource>> Sources; - YDB_READONLY(ui32, IntervalIdx, 0); - std::shared_ptr<NIndexedReader::TMergePartialStream> Merger; std::shared_ptr<NIndexedReader::TRecordBatchBuilder> RBBuilder; + ui32 IntervalIdx = 0; +public: + TMergingContext(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, + const ui32 intervalIdx, std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish, const bool includeStart) + : Start(start) + , Finish(finish) + , IncludeFinish(includeFinish) + , IncludeStart(includeStart) + , RBBuilder(builder) + , IntervalIdx(intervalIdx) { + + } + + ui32 GetIntervalIdx() const { + return IntervalIdx; + } +}; + +class TFetchingInterval: public TMergingContext, TNonCopyable { +private: + using TBase = TMergingContext; + bool ResultConstructionInProgress = false; + TScanHead& Scanner; + std::map<ui32, std::shared_ptr<IDataSource>> Sources; bool IsExclusiveSource() const; void ConstructResult(); @@ -58,10 +78,6 @@ public: return result; } - bool HasMerger() const { - return !!Merger; - } - void OnSourceFetchStageReady(const ui32 sourceIdx); void OnSourceFilterStageReady(const ui32 sourceIdx); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp index 501c714751..193b4cac84 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp @@ -60,7 +60,6 @@ TPlainReadData::TPlainReadData(TReadMetadata::TConstPtr readMetadata, const TRea } std::vector<NKikimr::NOlap::TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) { - Scanner->DrainResults(); if (ReadyResultsCount < maxRowsInBatch && !Scanner->IsFinished()) { return {}; } @@ -104,7 +103,7 @@ std::shared_ptr<NBlobOperations::NRead::ITask> TPlainReadData::DoExtractNextRead return nullptr; } -void TPlainReadData::OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch) { +void TPlainReadData::OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch) { if (batch && batch->num_rows()) { TPartialReadResult result(std::make_shared<TScanMemoryLimiter::TGuard>(Context.GetMemoryAccessor()), batch); ReadyResultsCount += result.GetRecordsCount(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h index d2505ddf44..6332bb36ae 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h @@ -28,12 +28,16 @@ private: TFetchBlobsQueue PriorityQueue; bool AbortedFlag = false; protected: - virtual TString DoDebugString() const override { - return TStringBuilder() << + virtual TString DoDebugString(const bool verbose) const override { + TStringBuilder sb; + sb << "ef=" << EFColumns->DebugString() << ";" << "pk=" << PKColumns->DebugString() << ";" << - "ff=" << FFColumns->DebugString() << ";" - ; + "ff=" << FFColumns->DebugString() << ";"; + if (verbose) { + sb << "intervals_schema=" << Scanner->DebugString(); + } + return sb; } virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) override; @@ -64,7 +68,15 @@ public: } } - void OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch); + const TScanHead& GetScanner() const { + return *Scanner; + } + + TScanHead& MutableScanner() { + return *Scanner; + } + + void OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch); TPlainReadData(TReadMetadata::TConstPtr readMetadata, const TReadContext& context); ~TPlainReadData() { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp index f34b4fe716..6c5f60fe14 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp @@ -6,11 +6,22 @@ namespace NKikimr::NOlap::NPlainReader { void TScanHead::OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 intervalIdx) { + AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, batch).second); Y_ABORT_UNLESS(FetchingIntervals.size()); - Y_ABORT_UNLESS(FetchingIntervals.front().GetIntervalIdx() == intervalIdx); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "interval_result")("interval", FetchingIntervals.front().GetIntervalIdx())("count", batch ? batch->num_rows() : 0); - FetchingIntervals.pop_front(); - Reader.OnIntervalResult(batch); + while (FetchingIntervals.size()) { + auto it = ReadyIntervals.find(FetchingIntervals.front().GetIntervalIdx()); + if (it == ReadyIntervals.end()) { + break; + } + const std::shared_ptr<arrow::RecordBatch>& batch = it->second; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "interval_result")("interval", FetchingIntervals.front().GetIntervalIdx())("count", batch ? batch->num_rows() : 0); + FetchingIntervals.pop_front(); + Reader.OnIntervalResult(batch); + ReadyIntervals.erase(it); + } + if (FetchingIntervals.empty()) { + AFL_VERIFY(ReadyIntervals.empty()); + } } TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, TPlainReadData& reader) @@ -22,15 +33,14 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, TPlainR ResultFields.emplace_back(resultSchema->GetFieldByColumnIdVerified(f)); ResultFieldNames.emplace_back(ResultFields.back()->name()); } - Merger = std::make_shared<NIndexedReader::TMergePartialStream>(reader.GetReadMetadata()->GetReplaceKey(), std::make_shared<arrow::Schema>(ResultFields), reader.GetReadMetadata()->IsDescSorted()); + ResultSchema = std::make_shared<arrow::Schema>(ResultFields); DrainSources(); } bool TScanHead::BuildNextInterval() { while (BorderPoints.size()) { - auto position = BorderPoints.begin()->first; auto firstBorderPointInfo = std::move(BorderPoints.begin()->second); - const bool isIncludeStart = CurrentSegments.empty(); + bool includeStart = firstBorderPointInfo.GetStartSources().size(); for (auto&& i : firstBorderPointInfo.GetStartSources()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("add_source", i->GetSourceIdx()); @@ -38,9 +48,11 @@ bool TScanHead::BuildNextInterval() { } if (firstBorderPointInfo.GetStartSources().size() && firstBorderPointInfo.GetFinishSources().size()) { + includeStart = false; FetchingIntervals.emplace_back( BorderPoints.begin()->first, BorderPoints.begin()->first, SegmentIdxCounter++, CurrentSegments, *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), true, true); + IntervalStats.emplace_back(CurrentSegments.size(), true); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "new_interval")("interval", FetchingIntervals.back().DebugJson()); } @@ -56,25 +68,18 @@ bool TScanHead::BuildNextInterval() { const bool includeFinish = BorderPoints.begin()->second.GetStartSources().empty(); FetchingIntervals.emplace_back( *CurrentStart, BorderPoints.begin()->first, SegmentIdxCounter++, CurrentSegments, - *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), includeFinish, isIncludeStart); + *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), includeFinish, includeStart); + IntervalStats.emplace_back(CurrentSegments.size(), false); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "new_interval")("interval", FetchingIntervals.back().DebugJson()); return true; + } else { + IntervalStats.emplace_back(CurrentSegments.size(), false); } } return false; } -void TScanHead::DrainResults() { - while (FetchingIntervals.size()) { - if (!FetchingIntervals.front().HasMerger()) { - FetchingIntervals.front().StartMerge(Merger); - } else { - break; - } - } -} - void TScanHead::DrainSources() { while (Sources.size()) { auto source = Sources.front(); @@ -96,4 +101,8 @@ NKikimr::NOlap::NPlainReader::TFetchingPlan TScanHead::GetColumnsFetchingPlan(co return Reader.GetColumnsFetchingPlan(exclusiveSource); } +std::shared_ptr<NKikimr::NOlap::NIndexedReader::TMergePartialStream> TScanHead::BuildMerger() const { + return std::make_shared<NIndexedReader::TMergePartialStream>(Reader.GetReadMetadata()->GetReplaceKey(), ResultSchema, Reader.GetReadMetadata()->IsDescSorted()); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h index 74f20ec565..20579b6452 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h @@ -20,20 +20,34 @@ public: } }; +class TIntervalStat { +private: + YDB_READONLY(ui32, SourcesCount, 0); + YDB_READONLY(bool, IsPoint, false); +public: + TIntervalStat(const ui32 sourcesCount, const bool isPoint) + : SourcesCount(sourcesCount) + , IsPoint(isPoint) + { + + } +}; + class TScanHead { private: TPlainReadData& Reader; std::deque<std::shared_ptr<IDataSource>> Sources; std::vector<std::shared_ptr<arrow::Field>> ResultFields; + std::shared_ptr<arrow::Schema> ResultSchema; YDB_READONLY_DEF(std::vector<TString>, ResultFieldNames); THashMap<ui32, std::shared_ptr<IDataSource>> SourceByIdx; std::map<NIndexedReader::TSortableBatchPosition, TDataSourceEndpoint> BorderPoints; std::map<ui32, std::shared_ptr<IDataSource>> CurrentSegments; std::optional<NIndexedReader::TSortableBatchPosition> CurrentStart; std::deque<TFetchingInterval> FetchingIntervals; + THashMap<ui32, std::shared_ptr<arrow::RecordBatch>> ReadyIntervals; ui32 SegmentIdxCounter = 0; - std::shared_ptr<NIndexedReader::TMergePartialStream> Merger; - + std::vector<TIntervalStat> IntervalStats; void DrainSources(); public: @@ -67,6 +81,14 @@ public: TReadContext& GetContext(); + TString DebugString() const { + TStringBuilder sb; + for (auto&& i : IntervalStats) { + sb << (i.GetIsPoint() ? "^" : "") << i.GetSourcesCount() << ";"; + } + return sb; + } + void OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 intervalIdx); std::shared_ptr<IDataSource> GetSourceVerified(const ui32 idx) const { auto it = SourceByIdx.find(idx); @@ -78,7 +100,7 @@ public: bool BuildNextInterval(); - void DrainResults(); + std::shared_ptr<NIndexedReader::TMergePartialStream> BuildMerger() const; }; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make index e500dac1ef..7a1e596b79 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make @@ -16,6 +16,7 @@ SRCS( PEERDIR( ydb/core/formats/arrow ydb/core/tx/columnshard/blobs_action + ydb/core/tx/conveyor/usage ) END() diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h index 59492588a7..30b40b2112 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/read_context.h @@ -55,7 +55,7 @@ protected: TReadContext Context; std::shared_ptr<const TReadMetadata> ReadMetadata; virtual std::shared_ptr<NBlobOperations::NRead::ITask> DoExtractNextReadTask(const bool hasReadyResults) = 0; - virtual TString DoDebugString() const = 0; + virtual TString DoDebugString(const bool verbose) const = 0; virtual void DoAbort() = 0; virtual bool DoIsFinished() const = 0; virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0; @@ -109,12 +109,12 @@ public: return DoIsFinished(); } - TString DebugString() const { + TString DebugString(const bool verbose) const { TStringBuilder sb; sb << "internal:" << Context.GetIsInternalRead() << ";" << "has_buffer:" << (GetMemoryAccessor() ? GetMemoryAccessor()->HasBuffer() : true) << ";" ; - sb << DoDebugString(); + sb << DoDebugString(verbose); return sb; } std::shared_ptr<NBlobOperations::NRead::ITask> ExtractNextReadTask(const bool hasReadyResults) { diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index 3c99dc6d69..c1e68c034d 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -291,6 +291,35 @@ public: Y_ABORT_UNLESS(!DataSchema || DataSchema->num_fields()); } + void SkipToLowerBound(const TSortableBatchPosition& pos, const bool include) { + if (SortHeap.Empty()) { + return; + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("pos", pos.DebugJson().GetStringRobust())("heap", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); + while (!SortHeap.Empty()) { + const auto cmpResult = SortHeap.Current().GetKeyColumns().Compare(pos); + if (cmpResult == std::partial_ordering::greater) { + break; + } + if (cmpResult == std::partial_ordering::equivalent && include) { + break; + } + const TSortableBatchPosition::TFoundPosition skipPos = SortHeap.MutableCurrent().SkipToLower(pos); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("pos", pos.DebugJson().GetStringRobust())("heap", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); + if (skipPos.IsEqual()) { + if (!include && !SortHeap.MutableCurrent().Next()) { + SortHeap.RemoveTop(); + } else { + SortHeap.UpdateTop(); + } + } else if (skipPos.IsLess()) { + SortHeap.RemoveTop(); + } else { + SortHeap.UpdateTop(); + } + } + } + void SetPossibleSameVersion(const bool value) { PossibleSameVersionFlag = value; } @@ -299,7 +328,7 @@ public: return SortHeap.Size(); } - bool GetSourcesCount() const { + ui32 GetSourcesCount() const { return SortHeap.Size(); } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index bcf6fceda9..0b5fae38da 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -148,7 +148,7 @@ public: void Bootstrap(const TActorContext& ctx) { IndexedData = ReadMetadata->BuildReader(NOlap::TReadContext(Storages, Counters, true), ReadMetadata); - LOG_S_DEBUG("Starting read (" << IndexedData->DebugString() << ") at tablet " << TabletId); + LOG_S_DEBUG("Starting read (" << IndexedData->DebugString(false) << ") at tablet " << TabletId); bool earlyExit = false; if (Deadline != TInstant::Max()) { |