diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-02 15:22:51 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-02 15:22:51 +0300 |
commit | 4cb4a9b3b10be8abd9a00540c5e92ec3b99af140 (patch) | |
tree | c9fbf1f621d1363804411ed235546ee7db362443 | |
parent | 74fc0d19a74a14170ab2a1db5c7934fd067b096f (diff) | |
download | ydb-4cb4a9b3b10be8abd9a00540c5e92ec3b99af140.tar.gz |
merge items for limit usage (instead of pessimistic additive mark)
24 files changed, 478 insertions, 79 deletions
diff --git a/ydb/core/formats/arrow_filter.h b/ydb/core/formats/arrow_filter.h index a119ed59b1a..86c33949083 100644 --- a/ydb/core/formats/arrow_filter.h +++ b/ydb/core/formats/arrow_filter.h @@ -38,32 +38,45 @@ private: return value; } -public: - - class TIterator { + template <class TIterator> + class TIteratorImpl { private: ui32 InternalPosition = 0; - std::deque<ui32>::const_iterator It; - std::deque<ui32>::const_iterator ItEnd; + ui32 CurrentRemainVolume = 0; + TIterator It; + TIterator ItEnd; bool CurrentValue; public: - TIterator(const std::deque<ui32>& filter, const bool startValue) - : It(filter.begin()) - , ItEnd(filter.end()) - , CurrentValue(startValue) - { + TIteratorImpl(TIterator itBegin, TIterator itEnd, const bool startValue) + : It(itBegin) + , ItEnd(itEnd) + , CurrentValue(startValue) { + if (It != ItEnd) { + CurrentRemainVolume = *It; + } + } + bool GetCurrentAcceptance() const { + Y_VERIFY_DEBUG(CurrentRemainVolume); + return CurrentValue; } bool IsBatchForSkip(const ui32 size) const { - return !CurrentValue && (*It - InternalPosition) >= size; + Y_VERIFY_DEBUG(CurrentRemainVolume); + return !CurrentValue && CurrentRemainVolume >= size; } bool Next(const ui32 size) { + if (CurrentRemainVolume > size) { + InternalPosition += size; + CurrentRemainVolume -= size; + return true; + } ui32 sizeRemain = size; while (It != ItEnd) { if (*It - InternalPosition > sizeRemain) { - InternalPosition += sizeRemain; + InternalPosition = sizeRemain; + CurrentRemainVolume = *It - InternalPosition - sizeRemain; return true; } else { sizeRemain -= *It - InternalPosition; @@ -72,12 +85,39 @@ public: ++It; } } + CurrentRemainVolume = 0; return false; } }; +public: + + using TIterator = TIteratorImpl<std::deque<ui32>::const_iterator>; + using TReverseIterator = TIteratorImpl<std::deque<ui32>::const_reverse_iterator>; + + template <bool ForReverse> + class TIteratorSelector { + + }; + + template <> + class TIteratorSelector<true> { + public: + using TIterator = TReverseIterator; + }; + + template <> + class TIteratorSelector<false> { + public: + using TIterator = TIterator; + }; + TIterator GetIterator() const { - return TIterator(Filter, GetStartValue()); + return TIterator(Filter.cbegin(), Filter.cend(), GetStartValue()); + } + + TReverseIterator GetReverseIterator() const { + return TReverseIterator(Filter.crbegin(), Filter.crend(), CurrentValue); } TColumnFilter(std::vector<bool>&& values) { diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 64f1f0f6c7b..456426c4df0 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -917,4 +917,15 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b return true; } +int ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow) { + auto result = TRawReplaceKey(&x, xRow).CompareNotNull(TRawReplaceKey(&y, yRow)); + if (result == std::partial_ordering::greater) { + return 1; + } else if (result == std::partial_ordering::less) { + return -1; + } else { + return 0; + } +} + } diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 97b381e98eb..54ae4b58348 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -132,6 +132,7 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x); int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y); int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y); +int ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow); bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y); bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 4f4b18f5045..22e319727fa 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -232,7 +232,14 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR marksGranules.MakePrecedingMark(IndexInfo()); Y_VERIFY(!marksGranules.Empty()); - OutNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, IndexInfo()); + auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, IndexInfo()); + GranulesContext->AddNotIndexedBatches(outNotIndexed); + Y_VERIFY(outNotIndexed.size() <= 1); + if (outNotIndexed.size() == 1) { + auto it = outNotIndexed.find(0); + Y_VERIFY(it != outNotIndexed.end()); + NotIndexedOutscopeBatch = it->second; + } } NotIndexed.clear(); ReadyNotIndexed = 0; @@ -260,37 +267,26 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: out.reserve(ready.size() + 1); // Prepend not indexed data (less then first granule) before granules for ASC sorting - if (ReadMetadata->IsAscSorted() && OutNotIndexed.count(0)) { + if (ReadMetadata->IsAscSorted() && NotIndexedOutscopeBatch) { out.push_back({}); - out.back().push_back(OutNotIndexed[0]); - OutNotIndexed.erase(0); + out.back().push_back(NotIndexedOutscopeBatch); + NotIndexedOutscopeBatch = nullptr; } for (auto&& granule : ready) { - bool canHaveDups = granule->IsDuplicationsAvailable(); std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = granule->GetReadyBatches(); - // Append not indexed data to granules - auto itNotIndexed = OutNotIndexed.find(granule->GetGranuleId()); - if (itNotIndexed != OutNotIndexed.end()) { - auto batch = itNotIndexed->second; - if (batch && batch->num_rows()) { // TODO: check why it could be empty - inGranule.push_back(batch); - canHaveDups = true; - } - OutNotIndexed.erase(granule->GetGranuleId()); - } if (inGranule.empty()) { continue; } - if (canHaveDups) { + if (granule->IsDuplicationsAvailable()) { for (auto& batch : inGranule) { Y_VERIFY(batch->num_rows()); Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey)); } #if 1 // optimization - auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, BatchesToDedup); + auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, granule->GetBatchesToDedup()); out.emplace_back(std::move(deduped)); #else out.push_back({}); @@ -302,10 +298,10 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: } // Append not indexed data (less then first granule) after granules for DESC sorting - if (GranulesContext->GetSortingPolicy()->ReadyForAddNotIndexedToEnd() && OutNotIndexed.count(0)) { + if (GranulesContext->GetSortingPolicy()->ReadyForAddNotIndexedToEnd() && NotIndexedOutscopeBatch) { out.push_back({}); - out.back().push_back(OutNotIndexed[0]); - OutNotIndexed.erase(0); + out.back().push_back(NotIndexedOutscopeBatch); + NotIndexedOutscopeBatch = nullptr; } return out; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index e8dba569740..ffdc27fb5b1 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -28,11 +28,10 @@ private: bool OnePhaseReadMode = false; std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; - THashSet<const void*> BatchesToDedup; THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch THashSet<TBlobRange> IndexedBlobs; ui32 ReadyNotIndexed{ 0 }; - THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append + std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch; // outscope granules batch std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; public: @@ -80,15 +79,9 @@ public: } void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); - void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { + void OnBatchReady(const NIndexedReader::TBatch& /*batchInfo*/, std::shared_ptr<arrow::RecordBatch> batch) { if (batch && batch->num_rows()) { ReadMetadata->ReadStats->SelectedRows += batch->num_rows(); - if (batchInfo.IsDuplicationsAvailable()) { - Y_VERIFY(batchInfo.GetOwner().IsDuplicationsAvailable()); - BatchesToDedup.insert(batch.get()); - } else { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, IndexInfo().GetReplaceKey(), false)); - } } } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index e3cd2e4244d..4efd4c181d1 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -63,8 +63,7 @@ struct TPortionInfo { size_t NumRecords() const { return Records.size(); } bool IsSortableInGranule() const { - return Meta.Produced == TPortionMeta::COMPACTED - || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; + return !CanIntersectOthers(); } bool AllowEarlyFilter() const { diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index ab853275e1b..d03e55eb670 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -28,4 +28,5 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index e0b9dfc7be9..503e7703de0 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -29,4 +29,5 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index e0b9dfc7be9..503e7703de0 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -29,4 +29,5 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index ab853275e1b..d03e55eb670 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -28,4 +28,5 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 976dd31dac5..6257c1788e4 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -133,13 +133,15 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { } } -bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, const ui32 originalRecordsCount) { +bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, + const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> futureFilter) { Y_VERIFY(filter); Y_VERIFY(!Filter); Y_VERIFY(!FilterBatch); Filter = filter; FilterBatch = filterBatch; OriginalRecordsCount = originalRecordsCount; + FutureFilter = futureFilter; return Owner->OnFilterReady(*this); } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 4198c864543..25f6002a44d 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -29,6 +29,8 @@ private: YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch); YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch); YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); + YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, FutureFilter); + ui32 OriginalRecordsCount = 0; YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); @@ -73,7 +75,8 @@ public: return FilterBatch->num_rows(); } } - bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, const ui32 originalRecordsCount); + bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, + const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> futureFilter); void InitBatch(std::shared_ptr<arrow::RecordBatch> batch); NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata); diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index 83293a384d1..ea1c4ee2f15 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -14,9 +14,11 @@ TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMe SortingPolicy = InternalReading ? std::make_shared<TNonSorting>(ReadMetadata) : ReadMetadata->BuildSortingPolicy(); UsedColumns = ReadMetadata->GetUsedColumnIds(); - PostFilterColumns = ReadMetadata->GetUsedColumnIds(); EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(); - for (auto&& i : EarlyFilterColumns) { + FilterStageColumns = SortingPolicy->GetFilterStageColumns(); + + PostFilterColumns = ReadMetadata->GetUsedColumnIds(); + for (auto&& i : FilterStageColumns) { PostFilterColumns.erase(i); } } @@ -53,4 +55,22 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get return Owner.GetTasksProcessor(); } +void TGranulesFillingContext::AddNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>& batches) { + std::shared_ptr<arrow::RecordBatch> externalBatch; + for (auto it = batches.begin(); it != batches.end(); ++it) { + if (!it->first) { + externalBatch = it->second; + continue; + } + auto itGranule = Granules.find(it->first); + Y_VERIFY(itGranule != Granules.end()); + itGranule->second.AddNotIndexedBatch(it->second); + } + THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> resultLocal; + if (externalBatch) { + resultLocal.emplace(0, externalBatch); + } + std::swap(batches, resultLocal); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index 2b71adcb028..cab84fc290c 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -21,6 +21,7 @@ private: THashMap<ui64, NIndexedReader::TGranule> Granules; YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns); YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns); + std::set<ui32> FilterStageColumns; std::set<ui32> UsedColumns; YDB_READONLY_DEF(IOrderPolicy::TPtr, SortingPolicy); YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters); @@ -33,6 +34,7 @@ public: NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const; + void AddNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>& batches); TBatch& GetBatchInfo(const ui32 batchNo); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); @@ -48,7 +50,7 @@ public: void OnNewBatch(TBatch& batch) { if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) { - batch.ResetNoFilter(EarlyFilterColumns); + batch.ResetNoFilter(FilterStageColumns); } else { batch.ResetNoFilter(UsedColumns); } diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index bff7044c5b7..7531c0dd176 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -20,13 +20,17 @@ bool TAssembleFilter::DoExecuteImpl() { FilteredBatch = nullptr; return true; } - if (ReadMetadata->Program && AllowEarlyFilter) { - auto filter = NOlap::EarlyFilter(batch, ReadMetadata->Program); - Filter->CombineSequential(filter); - if (!filter.Apply(batch)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount); - FilteredBatch = nullptr; - return true; + if (ReadMetadata->Program) { + auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program)); + if (AllowEarlyFilter) { + Filter->CombineSequential(*earlyFilter); + if (!earlyFilter->Apply(batch)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount); + FilteredBatch = nullptr; + return true; + } + } else { + EarlyFilter = earlyFilter; } } @@ -51,7 +55,7 @@ bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const { Y_VERIFY(OriginalCount); owner.GetCounters().GetOriginalRowsCount()->Add(OriginalCount); owner.GetCounters().GetAssembleFilterCount()->Add(1); - batch.InitFilter(Filter, FilteredBatch, OriginalCount); + batch.InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h index c2f65c6d6dc..80ee2430c7b 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h @@ -16,6 +16,7 @@ namespace NKikimr::NOlap::NIndexedReader { std::shared_ptr<arrow::RecordBatch> FilteredBatch; NOlap::TReadMetadata::TConstPtr ReadMetadata; std::shared_ptr<NArrow::TColumnFilter> Filter; + std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; const ui32 BatchNo; ui32 OriginalCount = 0; bool AllowEarlyFilter = false; diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index 7302e6add52..58283b6051d 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -2,6 +2,7 @@ #include "filling_context.h" #include <ydb/core/tx/columnshard/engines/portion_info.h> #include <ydb/core/tx/columnshard/engines/indexed_read_data.h> +#include <ydb/core/tx/columnshard/engines/filter.h> namespace NKikimr::NOlap::NIndexedReader { @@ -14,7 +15,13 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco Y_VERIFY(!ReadyFlag); Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo())); if (batch && batch->num_rows()) { - ReadyBatches.emplace_back(batch); + if (batchInfo.IsSortableInGranule()) { + SortableBatches.emplace_back(batch); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false)); + } else { + NonSortableBatches.emplace_back(batch); + BatchesToDedup.insert(batch.get()); + } } Owner->OnBatchReady(batchInfo, batch); if (WaitBatches.empty()) { @@ -80,4 +87,18 @@ std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata: return batches; } +void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { + if (!batch || !batch->num_rows()) { + return; + } + Y_VERIFY(NonSortableBatches.empty()); + Y_VERIFY(SortableBatches.empty()); + Y_VERIFY(!NotIndexedBatch); + NotIndexedBatch = batch; + if (Owner->GetReadMetadata()->Program) { + NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program)); + } + DuplicationsAvailableFlag = true; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index b7400339d30..cc6763a364d 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -14,19 +14,36 @@ class TGranulesFillingContext; class TGranule { private: YDB_READONLY(ui64, GranuleId, 0); - YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, ReadyBatches); + std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches; + std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches; + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, NotIndexedBatch); + YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotIndexedBatchFutureFilter); YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); YDB_READONLY_FLAG(Ready, false); std::deque<TBatch> Batches; std::set<ui32> WaitBatches; std::set<ui32> GranuleBatchNumbers; TGranulesFillingContext* Owner = nullptr; + YDB_READONLY_DEF(THashSet<const void*>, BatchesToDedup); public: TGranule(const ui64 granuleId, TGranulesFillingContext& owner) : GranuleId(granuleId) , Owner(&owner) { } + std::vector<std::shared_ptr<arrow::RecordBatch>> GetReadyBatches() const { + std::vector<std::shared_ptr<arrow::RecordBatch>> result; + result.reserve(SortableBatches.size() + NonSortableBatches.size() + 1); + if (NotIndexedBatch) { + result.emplace_back(NotIndexedBatch); + } + result.insert(result.end(), NonSortableBatches.begin(), NonSortableBatches.end()); + result.insert(result.end(), SortableBatches.begin(), SortableBatches.end()); + return result; + } + + void AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch); + const TGranulesFillingContext& GetOwner() const { return *Owner; } diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp index 1c667356728..a20f94f1c95 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp @@ -46,28 +46,34 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& } while (GranulesOutOrderForPortions.size()) { auto it = OrderedBatches.find(GranulesOutOrderForPortions.front()->GetGranuleId()); + auto g = GranulesOutOrderForPortions.front(); Y_VERIFY(it != OrderedBatches.end()); - while (it->second.size() && it->second.front()->IsFiltered() && CurrentItemsLimit) { - auto b = it->second.front(); + if (!it->second.GetStarted()) { + MergeStream.AddIndependentSource(g->GetNotIndexedBatch(), g->GetNotIndexedBatchFutureFilter()); + it->second.SetStarted(true); + } + auto& batches = it->second.MutableBatches(); + while (batches.size() && batches.front()->IsFiltered() && CurrentItemsLimit) { + auto b = batches.front(); if (b->IsSortableInGranule()) { - if (CurrentItemsLimit <= b->GetFilteredRecordsCount()) { - CurrentItemsLimit = 0; - } else { - CurrentItemsLimit -= b->GetFilteredRecordsCount(); - } + MergeStream.AddPoolSource(0, b->GetFilterBatch()); } else { - CurrentItemsLimit += b->GetFilteredRecordsCount(); + MergeStream.AddIndependentSource(b->GetFilterBatch(), b->GetFutureFilter()); } OnBatchFilterInitialized(*b, context); - - it->second.pop_front(); + batches.pop_front(); } - if (!CurrentItemsLimit || it->second.empty()) { - while (it->second.size()) { - auto b = it->second.front(); + if (MergeStream.IsValid()) { + while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.Next()) { + --CurrentItemsLimit; + } + } + if (!CurrentItemsLimit || batches.empty()) { + while (batches.size()) { + auto b = batches.front(); context.GetCounters().GetSkippedBytes()->Add(b->GetFetchBytes(context.GetPostFilterColumns())); b->InitBatch(nullptr); - it->second.pop_front(); + batches.pop_front(); } OrderedBatches.erase(it); GranulesOutOrderForPortions.pop_front(); @@ -83,7 +89,7 @@ void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { for (ui64 granule : granulesOrder) { TGranule& g = context.GetGranuleVerified(granule); GranulesOutOrder.emplace_back(&g); - Y_VERIFY(OrderedBatches.emplace(granule, g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata)).second); + Y_VERIFY(OrderedBatches.emplace(granule, TGranuleScanInfo(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata))).second); } GranulesOutOrderForPortions = GranulesOutOrder; } @@ -102,6 +108,13 @@ std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, return result; } +TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) + :TBase(readMetadata) + , MergeStream(readMetadata->IndexInfo.GetReplaceKey(), readMetadata->IsDescSorted()) +{ + CurrentItemsLimit = ReadMetadata->Limit; +} + void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context) { Y_VERIFY(!!batch.GetFilter()); if (!batch.GetFilteredRecordsCount()) { diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h index ab14fef95ca..2f697219b66 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.h +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h @@ -1,6 +1,7 @@ #pragma once #include "granule.h" #include "read_metadata.h" +#include "read_filter_merger.h" namespace NKikimr::NOlap::NIndexedReader { @@ -21,6 +22,10 @@ public: using TPtr = std::shared_ptr<IOrderPolicy>; virtual ~IOrderPolicy() = default; + virtual std::set<ui32> GetFilterStageColumns() { + return ReadMetadata->GetEarlyFilterColumnIds(); + } + IOrderPolicy(TReadMetadata::TConstPtr readMetadata) : ReadMetadata(readMetadata) { @@ -84,26 +89,44 @@ public: } }; +class TGranuleScanInfo { +private: + YDB_ACCESSOR(bool, Started, false); + YDB_ACCESSOR_DEF(std::deque<TBatch*>, Batches); +public: + TGranuleScanInfo(std::deque<TBatch*>&& batches) + : Batches(std::move(batches)) + { + + } +}; + class TPKSortingWithLimit: public IOrderPolicy { private: using TBase = IOrderPolicy; std::deque<TGranule*> GranulesOutOrder; std::deque<TGranule*> GranulesOutOrderForPortions; - THashMap<ui64, std::deque<TBatch*>> OrderedBatches; + THashMap<ui64, TGranuleScanInfo> OrderedBatches; ui32 CurrentItemsLimit = 0; + TMergePartialStream MergeStream; protected: virtual void DoFill(TGranulesFillingContext& context) override; virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override; public: + virtual std::set<ui32> GetFilterStageColumns() override { + std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds(); + for (auto&& i : ReadMetadata->GetPKColumnIds()) { + result.emplace(i); + } + return result; + } + virtual bool CanInterrupt() const override { return true; } - TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) - :TBase(readMetadata) { - CurrentItemsLimit = ReadMetadata->Limit; - } + TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata); virtual bool ReadyForAddNotIndexedToEnd() const override { return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); } diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp new file mode 100644 index 00000000000..7456cdc0b72 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -0,0 +1,5 @@ +#include "read_filter_merger.h" + +namespace NKikimr::NOlap::NIndexedReader { + +} diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h new file mode 100644 index 00000000000..d88dda1a689 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -0,0 +1,232 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/formats/arrow_filter.h> +#include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/tx/columnshard/engines/index_info.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <util/generic/hash.h> +#include <set> + +namespace NKikimr::NOlap::NIndexedReader { + +class TMergePartialStream { +private: + class TBatchIterator { + private: + YDB_ACCESSOR(i64, Position, 0); + YDB_OPT(ui32, PoolId); + std::shared_ptr<arrow::RecordBatch> Batch; + + std::shared_ptr<NArrow::TColumnFilter> Filter; + std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator; + std::shared_ptr<NArrow::TColumnFilter::TReverseIterator> ReverseFilterIterator; + + std::vector<std::shared_ptr<arrow::Array>> Columns; + std::vector<std::shared_ptr<arrow::Array>> VersionColumns; + int ReverseSortKff; + i64 RecordsCount; + + i32 GetFirstPosition() const { + if (ReverseSortKff > 0) { + return 0; + } else { + return RecordsCount - 1; + } + } + + i32 GetLastPosition() const { + if (ReverseSortKff > 0) { + return RecordsCount - 1; + } else { + return 0; + } + } + + public: + TBatchIterator(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, + std::shared_ptr<arrow::Schema> sortSchema, const bool reverseSort, const std::optional<ui32> poolId) + : PoolId(poolId) + , Batch(batch) + , Filter(filter) + , ReverseSortKff(reverseSort ? -1 : 1) + , RecordsCount(batch->num_rows()) { + if (Filter) { + if (reverseSort) { + ReverseFilterIterator = std::make_shared<NArrow::TColumnFilter::TReverseIterator>(Filter->GetReverseIterator()); + } else { + FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator()); + } + Y_VERIFY(Filter->Size() == RecordsCount); + } + Position = GetFirstPosition(); + Y_UNUSED(Batch); + Y_VERIFY(batch->num_rows()); + Y_VERIFY_DEBUG(batch->ValidateFull().ok()); + for (auto&& i : sortSchema->fields()) { + auto c = batch->GetColumnByName(i->name()); + Y_VERIFY(c); + Columns.emplace_back(c); + } + { + auto c = batch->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); + Y_VERIFY(c); + VersionColumns.emplace_back(c); + } + { + auto c = batch->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); + Y_VERIFY(c); + VersionColumns.emplace_back(c); + } + } + + bool CheckNextBatch(const TBatchIterator& nextIterator) { + Y_VERIFY_DEBUG(nextIterator.Columns.size() == Columns.size()); + return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, 0) * ReverseSortKff < 0; + } + + int CompareNoVersion(const TBatchIterator& item) const { + Y_VERIFY_DEBUG(item.Columns.size() == Columns.size()); + return NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position); + } + + bool IsDeleted() const { + if (FilterIterator) { + return FilterIterator->GetCurrentAcceptance(); + } else if (ReverseFilterIterator) { + return ReverseFilterIterator->GetCurrentAcceptance(); + } else { + return false; + } + } + + bool Next() { + bool result = false; + if (ReverseSortKff > 0) { + result = ++Position < RecordsCount; + } else { + result = --Position >= 0; + } + if (FilterIterator) { + Y_VERIFY(result == FilterIterator->Next(1)); + } else if (ReverseFilterIterator) { + Y_VERIFY(result == ReverseFilterIterator->Next(1)); + } + return result; + } + + bool operator<(const TBatchIterator& item) const { + const int result = CompareNoVersion(item) * ReverseSortKff; + if (result == 0) { + return NArrow::ColumnsCompare(VersionColumns, Position, item.VersionColumns, item.Position) > 0; + } else { + return result > 0; + } + } + }; + + bool NextInHeap(const bool needPop) { + if (SortHeap.empty()) { + return false; + } + if (needPop) { + std::pop_heap(SortHeap.begin(), SortHeap.end()); + } + if (SortHeap.back().Next()) { + std::push_heap(SortHeap.begin(), SortHeap.end()); + } else if (!SortHeap.back().HasPoolId()) { + SortHeap.pop_back(); + } else { + auto it = BatchPools.find(SortHeap.back().GetPoolIdUnsafe()); + Y_VERIFY(it->second.size()); + if (it->second.size() == 1) { + BatchPools.erase(it); + SortHeap.pop_back(); + } else { + it->second.pop_front(); + TBatchIterator newIterator(it->second.front(), nullptr, SortSchema, Reverse, SortHeap.back().GetPoolIdUnsafe()); + SortHeap.back().CheckNextBatch(newIterator); + std::swap(SortHeap.back(), newIterator); + std::push_heap(SortHeap.begin(), SortHeap.end()); + } + } + return SortHeap.size(); + } + + THashMap<ui32, std::deque<std::shared_ptr<arrow::RecordBatch>>> BatchPools; + std::vector<std::shared_ptr<arrow::RecordBatch>> IndependentBatches; + std::vector<TBatchIterator> SortHeap; + std::shared_ptr<arrow::Schema> SortSchema; + const bool Reverse; +public: + TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse) + : SortSchema(sortSchema) + , Reverse(reverse) { + Y_VERIFY(SortSchema->num_fields()); + } + + bool IsValid() const { + return SortHeap.size(); + } + + void AddIndependentSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { + if (!batch || !batch->num_rows()) { + return; + } + Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema)); + IndependentBatches.emplace_back(batch); + if (!filter || filter->IsTotalAllowFilter()) { + SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, {})); + } else if (filter->IsTotalDenyFilter()) { + return; + } else { + SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema, Reverse, {})); + } + std::push_heap(SortHeap.begin(), SortHeap.end()); + } + + bool HasRecordsInPool(const ui32 poolId) const { + auto it = BatchPools.find(poolId); + if (it == BatchPools.end()) { + return false; + } + return it->second.size(); + } + + void AddPoolSource(const ui32 poolId, std::shared_ptr<arrow::RecordBatch> batch) { + if (!batch || !batch->num_rows()) { + return; + } + auto it = BatchPools.find(poolId); + if (it == BatchPools.end()) { + it = BatchPools.emplace(poolId, std::deque<std::shared_ptr<arrow::RecordBatch>>()).first; + } + it->second.emplace_back(batch); + if (it->second.size() == 1) { + SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, poolId)); + std::push_heap(SortHeap.begin(), SortHeap.end()); + } + } + + + + bool Next() { + while (SortHeap.size()) { + std::pop_heap(SortHeap.begin(), SortHeap.end()); + TBatchIterator mainIterator = std::move(SortHeap.back()); + SortHeap.pop_back(); + while (SortHeap.size() && !mainIterator.CompareNoVersion(SortHeap.front())) { + NextInHeap(true); + } + const bool isDeleted = mainIterator.IsDeleted(); + SortHeap.emplace_back(std::move(mainIterator)); + NextInHeap(false); + if (!isDeleted) { + break; + } + } + return SortHeap.size(); + } +}; + + +} diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index eff5afd970a..14c771b45ca 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -42,6 +42,14 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { return result; } +std::set<ui32> TReadMetadata::GetPKColumnIds() const { + std::set<ui32> result; + for (auto&& i : IndexInfo.GetPrimaryKey()) { + Y_VERIFY(result.emplace(IndexInfo.GetColumnId(i.first)).second); + } + return result; +} + std::set<ui32> TReadMetadata::GetUsedColumnIds() const { std::set<ui32> result; if (PlanStep) { @@ -55,6 +63,9 @@ std::set<ui32> TReadMetadata::GetUsedColumnIds() const { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name()); result.emplace(IndexInfo.GetColumnId(f->name())); } + for (auto&& i : IndexInfo.GetPrimaryKey()) { + Y_VERIFY(result.contains(IndexInfo.GetColumnId(i.first))); + } return result; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 7ea2898dadc..1f1a6f96e0c 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -117,6 +117,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_ std::set<ui32> GetEarlyFilterColumnIds() const; std::set<ui32> GetUsedColumnIds() const; + std::set<ui32> GetPKColumnIds() const; bool Empty() const { Y_VERIFY(SelectInfo); |