diff options
author | ivanmorozov <[email protected]> | 2023-05-09 20:56:34 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-05-09 20:56:34 +0300 |
commit | 397ebd37186df6996be059e4d7cbe28d0a5c3c26 (patch) | |
tree | a172b80d309f5cf00867251e06a9024d074ce124 | |
parent | 520e5b3048293285df02d1d268d9b1d035bb803e (diff) |
use pools and min/max sorting for merge
26 files changed, 668 insertions, 441 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index f25780812a9..70fa1187b63 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -132,16 +132,10 @@ void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatc } } -std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter() const { +std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter(const ui32 expectedSize) const { arrow::BooleanBuilder builder; - auto res = builder.Reserve(Count); - Y_VERIFY_OK(res); - bool currentFilter = GetStartValue(); - for (auto&& i : Filter) { - Y_VERIFY_OK(builder.AppendValues(i, currentFilter)); - currentFilter = !currentFilter; - } - + auto res = builder.Reserve(expectedSize); + Y_VERIFY_OK(builder.AppendValues(BuildSimpleFilter(expectedSize))); std::shared_ptr<arrow::BooleanArray> out; res = builder.Finish(&out); Y_VERIFY_OK(res); @@ -168,9 +162,11 @@ bool TColumnFilter::IsTotalDenyFilter() const { return false; } -void TColumnFilter::Reset(const ui32 /*count*/) { +void TColumnFilter::Reset(const ui32 count) { Count = 0; + FilterPlain.reset(); Filter.clear(); + Filter.reserve(count / 4); } void TColumnFilter::Add(const bool value, const ui32 count) { @@ -218,33 +214,33 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D break; } - NArrow::TColumnFilter result; - result.Reset(cmps.size()); + std::vector<bool> bits; + bits.reserve(cmps.size()); switch (compareType) { case ECompareType::LESS: for (size_t i = 0; i < cmps.size(); ++i) { - result.Add(cmps[i] < ECompareResult::BORDER); + bits.emplace_back(cmps[i] < ECompareResult::BORDER); } break; case ECompareType::LESS_OR_EQUAL: for (size_t i = 0; i < cmps.size(); ++i) { - result.Add(cmps[i] <= ECompareResult::BORDER); + bits.emplace_back(cmps[i] <= ECompareResult::BORDER); } break; case ECompareType::GREATER: for (size_t i = 0; i < cmps.size(); ++i) { - result.Add(cmps[i] > ECompareResult::BORDER); + bits.emplace_back(cmps[i] > ECompareResult::BORDER); } break; case ECompareType::GREATER_OR_EQUAL: for (size_t i = 0; i < cmps.size(); ++i) { - result.Add(cmps[i] >= ECompareResult::BORDER); + bits.emplace_back(cmps[i] >= ECompareResult::BORDER); } break; } - return result; + return NArrow::TColumnFilter(std::move(bits)); } bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) { @@ -259,29 +255,159 @@ bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) { if (IsTotalAllowFilter()) { return true; } - auto res = arrow::compute::Filter(batch, BuildArrowFilter()); + auto res = arrow::compute::Filter(batch, BuildArrowFilter(batch->num_rows())); Y_VERIFY_S(res.ok(), res.status().message()); Y_VERIFY((*res).kind() == arrow::Datum::RECORD_BATCH); batch = (*res).record_batch(); return batch->num_rows(); } -void TColumnFilter::CombineSequential(const TColumnFilter& extFilter) { +const std::vector<bool>& TColumnFilter::BuildSimpleFilter(const ui32 expectedSize) const { + if (!FilterPlain) { + Y_VERIFY(expectedSize == Count || !Count); + std::vector<bool> result; + if (Count) { + result.resize(Count, true); + bool currentValue = GetStartValue(); + ui32 currentPosition = 0; + for (auto&& i : Filter) { + if (!currentValue) { + memset(&result[currentPosition], 0, sizeof(bool) * i); + } + currentPosition += i; + currentValue = !currentValue; + } + } else { + result.resize(expectedSize, DefaultFilterValue); + } + FilterPlain = std::move(result); + } + Y_VERIFY(FilterPlain->size() == expectedSize); + return *FilterPlain; +} + +class TMergePolicyAnd { +private: +public: + static bool Calc(const bool a, const bool b) { + return a && b; + } + static TColumnFilter MergeWithSimple(const TColumnFilter& filter, const bool simpleValue) { + if (simpleValue) { + return filter; + } else { + return TColumnFilter::BuildStopFilter(); + } + } +}; + +class TMergePolicyOr { +private: +public: + static bool Calc(const bool a, const bool b) { + return a || b; + } + static TColumnFilter MergeWithSimple(const TColumnFilter& filter, const bool simpleValue) { + if (simpleValue) { + return TColumnFilter::BuildAllowFilter(); + } else { + return filter; + } + } +}; + +class TColumnFilter::TMergerImpl { +private: + const TColumnFilter& Filter1; + const TColumnFilter& Filter2; +public: + TMergerImpl(const TColumnFilter& filter1, const TColumnFilter& filter2) + : Filter1(filter1) + , Filter2(filter2) + { + + } + + template <class TMergePolicy> + TColumnFilter Merge() const { + if (Filter1.empty() && Filter2.empty()) { + return TColumnFilter(TMergePolicy::Calc(Filter1.DefaultFilterValue, Filter2.DefaultFilterValue)); + } else if (Filter1.empty()) { + return TMergePolicy::MergeWithSimple(Filter2, Filter1.DefaultFilterValue); + } else if (Filter2.empty()) { + return TMergePolicy::MergeWithSimple(Filter1, Filter2.DefaultFilterValue); + } else { + Y_VERIFY(Filter1.Count == Filter2.Count); + auto it1 = Filter1.Filter.cbegin(); + auto it2 = Filter2.Filter.cbegin(); + + std::vector<ui32> resultFilter; + resultFilter.reserve(Filter1.Filter.size() + Filter2.Filter.size()); + ui32 pos1 = 0; + ui32 pos2 = 0; + bool curValue1 = Filter1.GetStartValue(); + bool curValue2 = Filter2.GetStartValue(); + bool curCurrent = false; + ui32 count = 0; + + while (it1 != Filter1.Filter.end() && it2 != Filter2.Filter.cend()) { + const ui32 delta = TColumnFilter::CrossSize(pos2, pos2 + *it2, pos1, pos1 + *it1); + if (delta) { + if (!count || curCurrent != TMergePolicy::Calc(curValue1, curValue2)) { + resultFilter.emplace_back(delta); + curCurrent = TMergePolicy::Calc(curValue1, curValue2); + } else { + resultFilter.back() += delta; + } + count += delta; + } + if (pos1 + *it1 < pos2 + *it2) { + pos1 += *it1; + curValue1 = !curValue1; + ++it1; + } else if (pos1 + *it1 > pos2 + *it2) { + pos2 += *it2; + curValue2 = !curValue2; + ++it2; + } else { + curValue2 = !curValue2; + curValue1 = !curValue1; + ++it1; + ++it2; + } + } + Y_VERIFY(it1 == Filter1.Filter.end() && it2 == Filter2.Filter.cend()); + TColumnFilter result = TColumnFilter::BuildAllowFilter(); + std::swap(resultFilter, result.Filter); + std::swap(curCurrent, result.CurrentValue); + std::swap(count, result.Count); + return result; + } + } + +}; + +TColumnFilter TColumnFilter::And(const TColumnFilter& extFilter) const { + FilterPlain.reset(); + return TMergerImpl(*this, extFilter).Merge<TMergePolicyAnd>(); +} + +TColumnFilter TColumnFilter::Or(const TColumnFilter& extFilter) const { + FilterPlain.reset(); + return TMergerImpl(*this, extFilter).Merge<TMergePolicyOr>(); +} + +TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter) const { if (Filter.empty()) { - DefaultFilterValue = DefaultFilterValue && extFilter.DefaultFilterValue; - Filter = extFilter.Filter; - Count = extFilter.Count; + return TMergePolicyAnd::MergeWithSimple(extFilter, DefaultFilterValue); } else if (extFilter.Filter.empty()) { - if (!extFilter.DefaultFilterValue) { - DefaultFilterValue = DefaultFilterValue && extFilter.DefaultFilterValue; - Filter.clear(); - Count = 0; - } + return TMergePolicyAnd::MergeWithSimple(*this, extFilter.DefaultFilterValue); } else { auto itSelf = Filter.begin(); auto itExt = extFilter.Filter.cbegin(); - std::deque<ui32> result; + std::vector<ui32> resultFilter; + resultFilter.reserve(Filter.size() + extFilter.Filter.size()); ui32 selfPos = 0; ui32 extPos = 0; bool curSelf = GetStartValue(); @@ -294,10 +420,10 @@ void TColumnFilter::CombineSequential(const TColumnFilter& extFilter) { const ui32 delta = curSelf ? CrossSize(extPos, extPos + *itExt, selfPos, selfPos + *itSelf) : *itSelf; if (delta) { if (!count || curCurrent != (curSelf && curExt)) { - result.emplace_back(delta); + resultFilter.emplace_back(delta); curCurrent = curSelf && curExt; } else { - result.back() += delta; + resultFilter.back() += delta; } count += delta; } @@ -322,23 +448,12 @@ void TColumnFilter::CombineSequential(const TColumnFilter& extFilter) { } } Y_VERIFY(itSelf == Filter.end() && itExt == extFilter.Filter.cend()); - std::swap(result, Filter); - std::swap(curCurrent, CurrentValue); - std::swap(count, Count); - } -} - -std::vector<bool> TColumnFilter::BuildSimpleFilter() const { - std::vector<bool> result; - result.reserve(Count); - bool currentValue = GetStartValue(); - for (auto&& i : Filter) { - for (ui32 idx = 0; idx < i; ++idx) { - result.emplace_back(currentValue); - } - currentValue = !currentValue; + TColumnFilter result = TColumnFilter::BuildAllowFilter(); + std::swap(resultFilter, result.Filter); + std::swap(curCurrent, result.CurrentValue); + std::swap(count, result.Count); + return result; } - return result; } } diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h index b7df1d633b8..aeb4993d0b2 100644 --- a/ydb/core/formats/arrow/arrow_filter.h +++ b/ydb/core/formats/arrow/arrow_filter.h @@ -20,39 +20,58 @@ private: bool DefaultFilterValue = true; bool CurrentValue = true; ui32 Count = 0; - std::deque<ui32> Filter; + std::vector<ui32> Filter; + mutable std::optional<std::vector<bool>> FilterPlain; TColumnFilter(const bool defaultFilterValue) : DefaultFilterValue(defaultFilterValue) { } - bool GetStartValue() const { + bool GetStartValue(const bool reverse = false) const { if (Filter.empty()) { return DefaultFilterValue; } - bool value = CurrentValue; - if (Filter.size() % 2 == 0) { - value = !value; + if (reverse) { + return CurrentValue; + } else { + if (Filter.size() % 2 == 0) { + return !CurrentValue; + } else { + return CurrentValue; + } } - return value; } - template <class TIterator> - class TIteratorImpl { + static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2); + class TMergerImpl; + void Add(const bool value, const ui32 count = 1); + void Reset(const ui32 count); +public: + + class TIterator { private: ui32 InternalPosition = 0; ui32 CurrentRemainVolume = 0; - TIterator It; - TIterator ItEnd; + const std::vector<ui32>& Filter; + i32 Position = 0; bool CurrentValue; + const i32 FinishPosition; + const i32 DeltaPosition; public: - TIteratorImpl(TIterator itBegin, TIterator itEnd, const bool startValue) - : It(itBegin) - , ItEnd(itEnd) - , CurrentValue(startValue) { - if (It != ItEnd) { - CurrentRemainVolume = *It; + TIterator(const bool reverse, const std::vector<ui32>& filter, const bool startValue) + : Filter(filter) + , CurrentValue(startValue) + , FinishPosition(reverse ? -1 : Filter.size()) + , DeltaPosition(reverse ? -1 : 1) + { + if (!Filter.size()) { + Position = FinishPosition; + } else { + if (reverse) { + Position = Filter.size() - 1; + } + CurrentRemainVolume = Filter[Position]; } } @@ -67,22 +86,24 @@ private: } bool Next(const ui32 size) { + Y_VERIFY(size); if (CurrentRemainVolume > size) { InternalPosition += size; CurrentRemainVolume -= size; return true; } ui32 sizeRemain = size; - while (It != ItEnd) { - if (*It - InternalPosition > sizeRemain) { + while (Position != FinishPosition) { + const ui32 currentVolume = Filter[Position]; + if (currentVolume - InternalPosition > sizeRemain) { InternalPosition = sizeRemain; - CurrentRemainVolume = *It - InternalPosition - sizeRemain; + CurrentRemainVolume = currentVolume - InternalPosition - sizeRemain; return true; } else { - sizeRemain -= *It - InternalPosition; + sizeRemain -= currentVolume - InternalPosition; InternalPosition = 0; CurrentValue = !CurrentValue; - ++It; + Position += DeltaPosition; } } CurrentRemainVolume = 0; @@ -90,44 +111,22 @@ private: } }; - static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2); -public: - - using TIterator = TIteratorImpl<std::deque<ui32>::const_iterator>; - using TReverseIterator = TIteratorImpl<std::deque<ui32>::const_reverse_iterator>; - - template <bool ForReverse> - class TIteratorSelector { - - }; - - template <> - class TIteratorSelector<true> { - public: - using TIterator = TReverseIterator; - }; - - template <> - class TIteratorSelector<false> { - public: - using TIterator = TIterator; - }; - - TIterator GetIterator() const { - return TIterator(Filter.cbegin(), Filter.cend(), GetStartValue()); + TIterator GetIterator(const bool reverse) const { + return TIterator(reverse, Filter, GetStartValue(reverse)); } - TReverseIterator GetReverseIterator() const { - return TReverseIterator(Filter.crbegin(), Filter.crend(), CurrentValue); + bool empty() const { + return Filter.empty(); } TColumnFilter(std::vector<bool>&& values) { const ui32 count = values.size(); - Reset(count, std::move(values)); + Reset(count, values); + FilterPlain = std::move(values); } template <class TGetter> - void Reset(const ui32 count, TGetter&& getter) { + void Reset(const ui32 count, const TGetter& getter) { Reset(count); if (!count) { return; @@ -149,11 +148,11 @@ public: return Count; } - std::vector<bool> BuildSimpleFilter() const; + const std::vector<bool>& BuildSimpleFilter(const ui32 expectedSize) const; TColumnFilter() = default; - std::shared_ptr<arrow::BooleanArray> BuildArrowFilter() const; + std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(const ui32 expectedSize) const; bool IsTotalAllowFilter() const; @@ -167,83 +166,8 @@ public: return TColumnFilter(true); } - void Reset(const ui32 count); - - void Add(const bool value, const ui32 count = 1); - - template <class TCalcer> - void Merge(const TColumnFilter& extFilter, const TCalcer actor) { - if (Filter.empty() && extFilter.Filter.empty()) { - DefaultFilterValue = (extFilter.DefaultFilterValue && DefaultFilterValue); - } else if (Filter.empty()) { - if (DefaultFilterValue) { - Filter = extFilter.Filter; - Count = extFilter.Count; - CurrentValue = extFilter.CurrentValue; - } - } else if (extFilter.Filter.empty()) { - if (!extFilter.DefaultFilterValue) { - DefaultFilterValue = false; - Filter.clear(); - Count = 0; - } - } else { - Y_VERIFY(extFilter.Count == Count); - auto itSelf = Filter.begin(); - auto itExt = extFilter.Filter.cbegin(); - - std::deque<ui32> result; - ui32 selfPos = 0; - ui32 extPos = 0; - bool curSelf = GetStartValue(); - bool curExt = extFilter.GetStartValue(); - bool curCurrent = false; - ui32 count = 0; - - while (itSelf != Filter.end() && itExt != extFilter.Filter.cend()) { - const ui32 delta = CrossSize(extPos, extPos + *itExt, selfPos, selfPos + *itSelf); - if (delta) { - if (!count || curCurrent != actor(curSelf, curExt)) { - result.emplace_back(delta); - curCurrent = actor(curSelf, curExt); - } else { - result.back() += delta; - } - count += delta; - } - if (selfPos + *itSelf < extPos + *itExt) { - selfPos += *itSelf; - curSelf = !curSelf; - ++itSelf; - } else if (selfPos + *itSelf > extPos + *itExt) { - extPos += *itExt; - curExt = !curExt; - ++itExt; - } else { - curExt = !curExt; - curSelf = !curSelf; - ++itSelf; - ++itExt; - } - } - Y_VERIFY(itSelf == Filter.end() && itExt == extFilter.Filter.cend()); - std::swap(result, Filter); - std::swap(curCurrent, CurrentValue); - std::swap(count, Count); - } - } - - - void And(const TColumnFilter& extFilter) { - return Merge(extFilter, [](const bool selfBool, const bool extBool) { - return selfBool && extBool; - }); - } - void Or(const TColumnFilter& extFilter) { - return Merge(extFilter, [](const bool selfBool, const bool extBool) { - return selfBool || extBool; - }); - } + TColumnFilter And(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT; + TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT; // It makes a filter using composite predicate static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, @@ -251,7 +175,8 @@ public: bool Apply(std::shared_ptr<arrow::RecordBatch>& batch); - void CombineSequential(const TColumnFilter& extFilter); + // Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions) + TColumnFilter CombineSequentialAnd(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT; }; } diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 456426c4df0..04d5fbd7ce2 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -917,15 +917,21 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b return true; } -int ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow) { - auto result = TRawReplaceKey(&x, xRow).CompareNotNull(TRawReplaceKey(&y, yRow)); - if (result == std::partial_ordering::greater) { - return 1; - } else if (result == std::partial_ordering::less) { - return -1; - } else { - return 0; +std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow) { + return TRawReplaceKey(&x, xRow).CompareNotNull(TRawReplaceKey(&y, yRow)); +} + +std::shared_ptr<arrow::RecordBatch> BuildSingleRecordBatch(const std::shared_ptr<arrow::Schema> schema, const std::vector<std::shared_ptr<arrow::Scalar>>& recordData) { + std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders = MakeBuilders(schema, 1); + Y_VERIFY(builders.size() == recordData.size()); + for (ui32 i = 0; i < recordData.size(); ++i) { + Y_VERIFY(recordData[i]); + Y_VERIFY_OK(builders[i]->AppendScalar(*recordData[i])); } + + auto arrays = NArrow::Finish(std::move(builders)); + Y_VERIFY(arrays.size() == builders.size()); + return arrow::RecordBatch::Make(schema, 1, arrays); } } diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index 39cf5079fad..c6bebcce3d6 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -133,10 +133,12 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x); int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y); int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y); -int ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow); +std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow); bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y); bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y); +std::shared_ptr<arrow::RecordBatch> BuildSingleRecordBatch(const std::shared_ptr<arrow::Schema> schema, const std::vector<std::shared_ptr<arrow::Scalar>>& recordData); + inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) { return column->null_bitmap_data(); } diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index e82fdbf8899..6c1cfe875d4 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -659,7 +659,7 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { } } - auto filter = bits.BuildArrowFilter(); + auto filter = bits.BuildArrowFilter(batch.Rows); for (int64_t i = 0; i < batch.Schema->num_fields(); ++i) { bool needed = (allColumns || neededColumns.contains(batch.Schema->field(i)->name())); if (batch.Datums[i].is_array() && needed) { diff --git a/ydb/core/formats/arrow/ut_arrow.cpp b/ydb/core/formats/arrow/ut_arrow.cpp index f0d7eb19dd3..d1d77b1578b 100644 --- a/ydb/core/formats/arrow/ut_arrow.cpp +++ b/ydb/core/formats/arrow/ut_arrow.cpp @@ -633,10 +633,10 @@ Y_UNIT_TEST_SUITE(ArrowTest) { const NArrow::TColumnFilter gt = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER); const NArrow::TColumnFilter ge = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER_OR_EQUAL); - UNIT_ASSERT(CheckFilter(lt.BuildSimpleFilter(), 234, true)); - UNIT_ASSERT(CheckFilter(le.BuildSimpleFilter(), 235, true)); - UNIT_ASSERT(CheckFilter(gt.BuildSimpleFilter(), 235, false)); - UNIT_ASSERT(CheckFilter(ge.BuildSimpleFilter(), 234, false)); + UNIT_ASSERT(CheckFilter(lt.BuildSimpleFilter(table->num_rows()), 234, true)); + UNIT_ASSERT(CheckFilter(le.BuildSimpleFilter(table->num_rows()), 235, true)); + UNIT_ASSERT(CheckFilter(gt.BuildSimpleFilter(table->num_rows()), 235, false)); + UNIT_ASSERT(CheckFilter(ge.BuildSimpleFilter(table->num_rows()), 234, false)); } Y_UNIT_TEST(SortWithCompositeKey) { diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 4b30d7a5ff6..8595380343d 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -435,13 +435,11 @@ private: void NextReadMetadata() { auto g = Stats.MakeGuard("NextReadMetadata"); - ScanIterator.reset(); - ++ReadMetadataIndex; - - if (ReadMetadataIndex == ReadMetadataRanges.size()) { + if (++ReadMetadataIndex == ReadMetadataRanges.size()) { // Send empty batch with "finished" flag MakeResult(); SendResult(false, true); + ScanIterator.reset(); return Finish(); } diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp index 2285dcd3e4b..3f375626694 100644 --- a/ydb/core/tx/columnshard/engines/filter.cpp +++ b/ydb/core/tx/columnshard/engines/filter.cpp @@ -44,45 +44,12 @@ NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatc return result; } -NArrow::TColumnFilter MakeReplaceFilter(const std::shared_ptr<arrow::RecordBatch>& batch, - THashSet<NArrow::TReplaceKey>& keys) { - NArrow::TColumnFilter bits; - bits.Reset(batch->num_rows()); - - auto columns = std::make_shared<NArrow::TArrayVec>(batch->columns()); - - for (int i = 0; i < batch->num_rows(); ++i) { - NArrow::TReplaceKey key(columns, i); - bits.Add(keys.emplace(key).second); - } - return bits; -} - -NArrow::TColumnFilter MakeReplaceFilterLastWins(const std::shared_ptr<arrow::RecordBatch>& batch, - THashSet<NArrow::TReplaceKey>& keys) { - if (!batch->num_rows()) { - return {}; - } - - NArrow::TColumnFilter result; - result.Reset(batch->num_rows()); - - auto columns = std::make_shared<NArrow::TArrayVec>(batch->columns()); - - for (int i = batch->num_rows() - 1; i >= 0; --i) { - NArrow::TReplaceKey key(columns, i); - result.Add(keys.emplace(key).second); - } - - return result; -} - NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata) { Y_VERIFY(portion); NArrow::TColumnFilter result = readMetadata.GetPKRangesFilter().BuildFilter(portion); if (readMetadata.GetSnapshot().GetPlanStep()) { auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); - result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot())); + result = result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot())); } return result; @@ -96,19 +63,4 @@ NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& bat return ssa->MakeEarlyFilter(batch, NArrow::GetCustomExecContext()); } -void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::Schema>& replaceSchema, bool lastWins) { - THashSet<NArrow::TReplaceKey> replaces; - - auto keyBatch = NArrow::ExtractColumns(batch, replaceSchema); - - NArrow::TColumnFilter bits; - if (lastWins) { - bits = MakeReplaceFilterLastWins(keyBatch, replaces); - } else { - bits = MakeReplaceFilter(keyBatch, replaces); - } - Y_VERIFY(bits.Apply(batch)); -} - } diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h index f9628fe90bc..9d5c412d7ec 100644 --- a/ydb/core/tx/columnshard/engines/filter.h +++ b/ydb/core/tx/columnshard/engines/filter.h @@ -10,12 +10,6 @@ NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatc const std::shared_ptr<arrow::Schema>& snapSchema, const TSnapshot& snapshot); -NArrow::TColumnFilter MakeReplaceFilter(const std::shared_ptr<arrow::RecordBatch>& batch, THashSet<NArrow::TReplaceKey>& keys); -NArrow::TColumnFilter MakeReplaceFilterLastWins(const std::shared_ptr<arrow::RecordBatch>& batch, THashSet<NArrow::TReplaceKey>& keys); - -void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::Schema>& replaceSchema, bool lastWins = false); - struct TReadMetadata; NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 806d39bb09d..0ee36f922ed 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -146,7 +146,14 @@ public: void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; } const TCompression& GetDefaultCompression() const { return DefaultCompression; } - + static const std::vector<std::string>& GetSpecialColumnNames() { + static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) }; + return result; + } + static const std::vector<ui32>& GetSpecialColumnIds() { + static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; + return result; + } private: ui32 Id; TString Name; diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 3e12e32006b..ec970378229 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -109,10 +109,6 @@ struct TPortionInfo { bool CanIntersectOthers() const { return !Valid() || IsInserted(); } size_t NumRecords() const { return Records.size(); } - bool IsSortableInGranule() const { - return !CanIntersectOthers(); - } - bool AllowEarlyFilter() const { return Meta.Produced == TPortionMeta::COMPACTED || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; diff --git a/ydb/core/tx/columnshard/engines/predicate/container.cpp b/ydb/core/tx/columnshard/engines/predicate/container.cpp index c5e1140bcfb..85b8ed827dc 100644 --- a/ydb/core/tx/columnshard/engines/predicate/container.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/container.cpp @@ -2,7 +2,7 @@ #include <library/cpp/actors/core/log.h> namespace NKikimr::NOlap { -int TPredicateContainer::ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r) { +std::partial_ordering TPredicateContainer::ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r) { Y_VERIFY(l.Batch); Y_VERIFY(r.Batch); Y_VERIFY(l.Batch->num_columns()); @@ -99,10 +99,10 @@ bool TPredicateContainer::CrossRanges(const TPredicateContainer& ext) { if (IsForwardInterval() == ext.IsForwardInterval()) { return true; } - const int result = ComparePredicatesSamePrefix(*Object, *ext.Object); - if (result < 0) { + const std::partial_ordering result = ComparePredicatesSamePrefix(*Object, *ext.Object); + if (result == std::partial_ordering::less) { return IsForwardInterval(); - } else if (result > 0) { + } else if (result == std::partial_ordering::greater) { return ext.IsForwardInterval(); } else if (Object->Batch->num_columns() == ext.Object->Batch->num_columns()) { return IsInclude() && ext.IsInclude(); diff --git a/ydb/core/tx/columnshard/engines/predicate/container.h b/ydb/core/tx/columnshard/engines/predicate/container.h index 51b0e66ba93..46ef03665de 100644 --- a/ydb/core/tx/columnshard/engines/predicate/container.h +++ b/ydb/core/tx/columnshard/engines/predicate/container.h @@ -23,7 +23,7 @@ private: : CompareType(compareType) { } - static int ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r); + static std::partial_ordering ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r); public: diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.cpp b/ydb/core/tx/columnshard/engines/predicate/filter.cpp index e1a798eafbe..4efd14a7996 100644 --- a/ydb/core/tx/columnshard/engines/predicate/filter.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/filter.cpp @@ -9,7 +9,7 @@ NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(std::shared_ptr<arro } NArrow::TColumnFilter result = SortedRanges.front().BuildFilter(data); for (ui32 i = 1; i < SortedRanges.size(); ++i) { - result.Or(SortedRanges[i].BuildFilter(data)); + result = result.Or(SortedRanges[i].BuildFilter(data)); } return result; } diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp index f9decb2189f..acd16bea9a8 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp @@ -36,8 +36,7 @@ std::set<std::string> TPKRangeFilter::GetColumnNames() const { NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const { NArrow::TColumnFilter result = PredicateTo.BuildFilter(data); - result.And(PredicateFrom.BuildFilter(data)); - return result; + return result.And(PredicateFrom.BuildFilter(data)); } bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const { diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 6b8b4988f27..912ce96f24f 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -113,7 +113,7 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { for (auto&& columnInfo : orderedObjects) { ui32 expected = 0; - auto it = FetchedInfo.GetFilter()->GetIterator(); + auto it = FetchedInfo.GetFilter()->GetIterator(false); bool undefinedShift = false; bool itFinished = false; for (auto&& [chunk, rec] : columnInfo.second) { @@ -163,4 +163,63 @@ ui64 TBatch::GetUsefulBytes(const ui64 bytes) const { return bytes * FetchedInfo.GetUsefulDataKff(); } +std::shared_ptr<TSortableBatchPosition> TBatch::GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const { + if (!FirstPK || !LastPK) { + std::shared_ptr<TSortableBatchPosition> from; + std::shared_ptr<TSortableBatchPosition> to; + GetPKBorders(reverse, indexInfo, from, to); + } + if (reverse) { + return *ReverseFirstPK; + } else { + return *FirstPK; + } +} + +void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const { + from = nullptr; + to = nullptr; + if (!FirstPK || !LastPK) { + std::vector<std::shared_ptr<arrow::Scalar>> minRecord; + std::vector<std::shared_ptr<arrow::Scalar>> maxRecord; + for (auto&& i : indexInfo.GetReplaceKey()->fields()) { + const ui32 columnId = indexInfo.GetColumnId(i->name()); + std::shared_ptr<arrow::Scalar> minScalar; + std::shared_ptr<arrow::Scalar> maxScalar; + PortionInfo->MinMaxValue(columnId, minScalar, maxScalar); + if (!FirstPK && !minScalar) { + FirstPK = nullptr; + ReverseLastPK = nullptr; + } else { + minRecord.emplace_back(minScalar); + } + if (!LastPK && !maxScalar) { + LastPK = nullptr; + ReverseFirstPK = nullptr; + } else { + maxRecord.emplace_back(maxScalar); + } + } + if (!FirstPK) { + auto batch = NArrow::BuildSingleRecordBatch(indexInfo.GetReplaceKey(), minRecord); + Y_VERIFY(batch); + FirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexInfo.GetReplaceKey()->field_names(), false); + ReverseLastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexInfo.GetReplaceKey()->field_names(), true); + } + if (!LastPK) { + auto batch = NArrow::BuildSingleRecordBatch(indexInfo.GetReplaceKey(), maxRecord); + Y_VERIFY(batch); + LastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexInfo.GetReplaceKey()->field_names(), false); + ReverseFirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexInfo.GetReplaceKey()->field_names(), true); + } + } + if (reverse) { + from = *ReverseFirstPK; + to = *ReverseLastPK; + } else { + from = *FirstPK; + to = *LastPK; + } +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 583466d7867..86b767c88a9 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -1,6 +1,7 @@ #pragma once #include "common.h" #include "conveyor_task.h" +#include "read_filter_merger.h" #include <ydb/core/formats/arrow/arrow_filter.h> #include <ydb/core/tx/columnshard/blob.h> @@ -77,7 +78,11 @@ private: YDB_READONLY(ui64, FetchedBytes, 0); THashSet<TBlobRange> WaitIndexed; - + mutable std::optional<std::shared_ptr<TSortableBatchPosition>> FirstPK; + mutable std::optional<std::shared_ptr<TSortableBatchPosition>> LastPK; + mutable std::optional<std::shared_ptr<TSortableBatchPosition>> ReverseFirstPK; + mutable std::optional<std::shared_ptr<TSortableBatchPosition>> ReverseLastPK; + YDB_READONLY_FLAG(DuplicationsAvailable, false); YDB_READONLY_DEF(TBatchFetchedInfo, FetchedInfo); THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; @@ -90,6 +95,9 @@ private: ui64 GetUsefulBytes(const ui64 bytes) const; public: + std::shared_ptr<TSortableBatchPosition> GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const; + void GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const; + bool AllowEarlyFilter() const { return PortionInfo->AllowEarlyFilter(); } @@ -105,9 +113,6 @@ public: return GetUsefulBytes(FetchedBytes); } - bool IsSortableInGranule() const { - return PortionInfo->IsSortableInGranule(); - } TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo); bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData); bool AskedColumnsAlready(const std::set<ui32>& columnIds) const; diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index 7d2b7e15487..2b7cb49bbed 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -16,6 +16,7 @@ TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMe UsedColumns = ReadMetadata->GetUsedColumnIds(); EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(); FilterStageColumns = SortingPolicy->GetFilterStageColumns(); + PKColumnNames = ReadMetadata->GetReplaceKey()->field_names(); PostFilterColumns = ReadMetadata->GetUsedColumnIds(); for (auto&& i : FilterStageColumns) { diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index d57a6c0b9d7..1ee7bd46b90 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -12,6 +12,7 @@ namespace NKikimr::NOlap::NIndexedReader { class TGranulesFillingContext { private: + YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames); bool AbortedFlag = false; TReadMetadata::TConstPtr ReadMetadata; const bool InternalReading = false; diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index 8f9079d3c30..24a29917d86 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -23,7 +23,7 @@ bool TAssembleFilter::DoExecuteImpl() { if (ReadMetadata->Program) { if (AllowEarlyFilter) { auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program)); - Filter->CombineSequential(*earlyFilter); + Filter = std::make_shared<NArrow::TColumnFilter>(Filter->CombineSequentialAnd(*earlyFilter)); if (!earlyFilter->Apply(batch)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount); FilteredBatch = nullptr; diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index 8ad6a8b871a..ff24281f35c 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -17,11 +17,7 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco Y_VERIFY(!ReadyFlag); Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchAddress().GetBatchGranuleIdx())); if (batch && batch->num_rows()) { - if (batchInfo.IsSortableInGranule()) { - SortableBatches.emplace_back(batch); - } else { - NonSortableBatches.emplace_back(batch); - } + RecordBatches.emplace_back(batch); auto& indexInfo = Owner->GetReadMetadata()->GetIndexInfo(); if (!batchInfo.IsDuplicationsAvailable()) { @@ -62,35 +58,40 @@ bool TGranule::OnFilterReady(TBatch& batchInfo) { return Owner->GetSortingPolicy()->OnFilterReady(batchInfo, *this, *Owner); } -std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata) { - std::deque<TBatch*> batches; +std::deque<TGranule::TBatchForMerge> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata) { + std::deque<TBatchForMerge> batches; for (auto&& i : Batches) { - batches.emplace_back(&i); + std::shared_ptr<TSortableBatchPosition> from; + std::shared_ptr<TSortableBatchPosition> to; + i.GetPKBorders(reverse, readMetadata->GetIndexInfo(), from, to); + batches.emplace_back(TBatchForMerge(&i, from, to)); } - const int reverseKff = reverse ? -1 : 0; - auto& indexInfo = readMetadata->GetIndexInfo(); - const auto pred = [reverseKff, indexInfo](const TBatch* l, const TBatch* r) { - if (l->IsSortableInGranule() && r->IsSortableInGranule()) { - return l->GetPortionInfo().CompareMinByPk(r->GetPortionInfo(), indexInfo) * reverseKff < 0; - } else if (l->IsSortableInGranule()) { - return false; - } else if (r->IsSortableInGranule()) { - return true; - } else { - return false; + std::sort(batches.begin(), batches.end()); + ui32 currentPoolId = 0; + std::map<TSortableBatchPosition, ui32> poolIds; + for (auto&& i : batches) { + if (!i.GetFrom() && !i.GetTo()) { + continue; } - }; - std::sort(batches.begin(), batches.end(), pred); - bool nonCompactedSerial = true; - for (ui32 i = 0; i + 1 < batches.size(); ++i) { - if (batches[i]->IsSortableInGranule()) { - auto& l = *batches[i]; - auto& r = *batches[i + 1]; - Y_VERIFY(r.IsSortableInGranule()); - Y_VERIFY(l.GetPortionInfo().CompareSelfMaxItemMinByPk(r.GetPortionInfo(), indexInfo) * reverseKff <= 0); - nonCompactedSerial = false; - } else { - Y_VERIFY(nonCompactedSerial); + if (i.GetFrom()) { + auto it = poolIds.rbegin(); + for (; it != poolIds.rend(); ++it) { + if (it->first.Compare(*i.GetFrom()) < 0) { + break; + } + } + if (it != poolIds.rend()) { + i.SetPoolId(it->second); + if (i.GetTo()) { + poolIds.erase(it->first); + poolIds.emplace(*i.GetTo(), *i.GetPoolId()); + } else { + poolIds.erase(it->first); + } + } else if (i.GetTo()) { + i.SetPoolId(++currentPoolId); + poolIds.emplace(*i.GetTo(), *i.GetPoolId()); + } } } return batches; @@ -107,6 +108,9 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { if (batch && batch->num_rows()) { Y_VERIFY(!NotIndexedBatch); NotIndexedBatch = batch; + if (NotIndexedBatch) { + RecordBatches.emplace_back(NotIndexedBatch); + } if (Owner->GetReadMetadata()->Program) { NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program)); } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index 442af41ed99..4c8f12b4e73 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -20,8 +20,7 @@ private: std::shared_ptr<arrow::RecordBatch> NotIndexedBatch; std::shared_ptr<NArrow::TColumnFilter> NotIndexedBatchFutureFilter; - std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches; - std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches; + std::vector<std::shared_ptr<arrow::RecordBatch>> RecordBatches; bool DuplicationsAvailableFlag = false; bool ReadyFlag = false; std::deque<TBatch> Batches; @@ -71,14 +70,7 @@ public: } std::vector<std::shared_ptr<arrow::RecordBatch>> GetReadyBatches() const { - std::vector<std::shared_ptr<arrow::RecordBatch>> result; - result.reserve(SortableBatches.size() + NonSortableBatches.size() + 1); - if (NotIndexedBatch) { - result.emplace_back(NotIndexedBatch); - } - result.insert(result.end(), NonSortableBatches.begin(), NonSortableBatches.end()); - result.insert(result.end(), SortableBatches.begin(), SortableBatches.end()); - return result; + return RecordBatches; } TBatch& GetBatchInfo(const ui32 batchIdx) { @@ -91,7 +83,44 @@ public: const TGranulesFillingContext& GetOwner() const { return *Owner; } - std::deque<TBatch*> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata); + + class TBatchForMerge { + private: + TBatch* Batch = nullptr; + YDB_ACCESSOR_DEF(std::optional<ui32>, PoolId); + YDB_ACCESSOR_DEF(std::shared_ptr<TSortableBatchPosition>, From); + YDB_ACCESSOR_DEF(std::shared_ptr<TSortableBatchPosition>, To); + public: + TBatch* operator->() { + return Batch; + } + + TBatch& operator*() { + return *Batch; + } + + TBatchForMerge(TBatch* batch, std::shared_ptr<TSortableBatchPosition> from, std::shared_ptr<TSortableBatchPosition> to) + : Batch(batch) + , From(from) + , To(to) + { + Y_VERIFY(Batch); + } + + bool operator<(const TBatchForMerge& item) const { + if (!From && !item.From) { + return false; + } else if (From && item.From) { + return From->Compare(*item.From) < 0; + } else if (!From) { + return true; + } else { + return false; + } + } + }; + + std::deque<TBatchForMerge> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata); const std::set<ui32>& GetEarlyFilterColumns() const; void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp index ead178faca5..e183f7b53e2 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp @@ -26,25 +26,34 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont } auto& batches = g.GetOrderedBatches(); while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) { - TBatch* b = batches.front(); - std::optional<ui32> batchPoolId; - if (b->IsSortableInGranule()) { - ++CountSorted; - batchPoolId = 0; + TGranule::TBatchForMerge& b = batches.front(); + if (!b.GetPoolId()) { + ++CountNotSortedPortions; } else { - ++CountNotSorted; + ++CountBatchesByPools[*b.GetPoolId()]; } - MergeStream.AddPoolSource(batchPoolId, b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter()); + ++CountProcessedBatches; + MergeStream.AddPoolSource(b.GetPoolId(), b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter()); OnBatchFilterInitialized(*b, context); batches.pop_front(); - while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.DrainCurrent()) { - if (!--CurrentItemsLimit) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "stop_on_limit") - ("limit", ReadMetadata->Limit)("sorted_count", CountSorted)("unsorted_count", CountNotSorted)("granules_count", CountProcessedGranules); + if (batches.size()) { + auto nextBatchControlPoint = batches.front()->GetFirstPK(ReadMetadata->IsDescSorted(), ReadMetadata->GetIndexInfo()); + if (!nextBatchControlPoint) { + continue; } + MergeStream.PutControlPoint(nextBatchControlPoint); + } + while (CurrentItemsLimit && MergeStream.DrainCurrent()) { + --CurrentItemsLimit; + } + if (MergeStream.ControlPointEnriched()) { + MergeStream.RemoveControlPoint(); + } else if (batches.size()) { + Y_VERIFY(!CurrentItemsLimit); } } if (batches.empty()) { + Y_VERIFY(MergeStream.IsEmpty() || !CurrentItemsLimit); GranulesOutOrderForPortions.pop_front(); } else { break; @@ -57,11 +66,23 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont while (batches.size()) { auto b = batches.front(); context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns())); + ++CountSkippedBatches; b->InitBatch(nullptr); batches.pop_front(); } + ++CountSkippedGranules; GranulesOutOrderForPortions.pop_front(); } + if (GranulesOutOrderForPortions.empty()) { + if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "stop_on_limit") + ("limit", ReadMetadata->Limit)("limit_reached", !CurrentItemsLimit) + ("processed_batches", CountProcessedBatches)("processed_granules", CountProcessedGranules) + ("skipped_batches", CountSkippedBatches)("skipped_granules", CountSkippedGranules) + ("pools_count", CountBatchesByPools.size())("bad_pool_size", CountNotSortedPortions) + ; + } + } return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h index 36ec23b9c94..79c22238974 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h +++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h @@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NIndexedReader { class TGranuleOrdered { private: bool StartedFlag = false; - std::deque<TBatch*> OrderedBatches; + std::deque<TGranule::TBatchForMerge> OrderedBatches; TGranule* Granule = nullptr; public: bool Start() { @@ -20,13 +20,13 @@ public: } - TGranuleOrdered(std::deque<TBatch*>&& orderedBatches, TGranule* granule) + TGranuleOrdered(std::deque<TGranule::TBatchForMerge>&& orderedBatches, TGranule* granule) : OrderedBatches(std::move(orderedBatches)) , Granule(granule) { } - std::deque<TBatch*>& GetOrderedBatches() noexcept { + std::deque<TGranule::TBatchForMerge>& GetOrderedBatches() noexcept { return OrderedBatches; } @@ -41,9 +41,12 @@ private: std::deque<TGranule*> GranulesOutOrder; std::deque<TGranuleOrdered> GranulesOutOrderForPortions; ui32 CurrentItemsLimit = 0; + THashMap<ui32, ui32> CountBatchesByPools; ui32 CountProcessedGranules = 0; - ui32 CountNotSorted = 0; - ui32 CountSorted = 0; + ui32 CountSkippedBatches = 0; + ui32 CountProcessedBatches = 0; + ui32 CountNotSortedPortions = 0; + ui32 CountSkippedGranules = 0; TMergePartialStream MergeStream; protected: virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override; diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp index 7456cdc0b72..9540904a4cf 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -2,4 +2,71 @@ namespace NKikimr::NOlap::NIndexedReader { +bool TSortableBatchPosition::IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const { + if (Fields.size() != (size_t)schema->num_fields()) { + return false; + } + for (ui32 i = 0; i < Fields.size(); ++i) { + if (Fields[i]->type() != schema->field(i)->type()) { + return false; + } + if (Fields[i]->name() != schema->field(i)->name()) { + return false; + } + } + return true; +} + +void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) { + Y_VERIFY(point); + Y_VERIFY(point->IsSameSchema(SortSchema)); + Y_VERIFY(++ControlPoints == 1); + SortHeap.emplace_back(TBatchIterator(*point)); + if (SortHeap.size() > 1) { + Y_VERIFY(SortHeap.front().GetKeyColumns().Compare(SortHeap.back().GetKeyColumns()) != std::partial_ordering::greater); + } + std::push_heap(SortHeap.begin(), SortHeap.end()); +} + +void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { + if (!batch || !batch->num_rows()) { + return; + } + Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema)); + if (!poolId) { + IndependentBatches.emplace_back(batch); + AddNewToHeap(poolId, batch, filter, true); + } else { + auto it = BatchPools.find(*poolId); + if (it == BatchPools.end()) { + it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first; + } + it->second.emplace_back(batch, filter); + if (it->second.size() == 1) { + AddNewToHeap(poolId, batch, filter, true); + } + } +} + +void TMergePartialStream::AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) { + if (!filter || filter->IsTotalAllowFilter()) { + SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), Reverse, poolId)); + } else if (filter->IsTotalDenyFilter()) { + return; + } else { + SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), Reverse, poolId)); + } + if (restoreHeap) { + std::push_heap(SortHeap.begin(), SortHeap.end()); + } +} + +void TMergePartialStream::RemoveControlPoint() { + Y_VERIFY(ControlPoints == 1); + Y_VERIFY(ControlPointEnriched()); + Y_VERIFY(-- ControlPoints == 0); + std::pop_heap(SortHeap.begin(), SortHeap.end()); + SortHeap.pop_back(); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index 1c481b528c4..ef5de23549f 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -9,22 +9,87 @@ namespace NKikimr::NOlap::NIndexedReader { +class TSortableBatchPosition { +protected: + i64 Position = 0; + i64 RecordsCount = 0; + bool ReverseSort = false; + std::vector<std::shared_ptr<arrow::Array>> Columns; + std::vector<std::shared_ptr<arrow::Field>> Fields; + std::shared_ptr<arrow::RecordBatch> Batch; +public: + TSortableBatchPosition() = default; + + bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const; + + TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& columns, const bool reverseSort) + : Position(position) + , RecordsCount(batch->num_rows()) + , ReverseSort(reverseSort) + , Batch(batch) + { + Y_VERIFY(batch->num_rows()); + Y_VERIFY_DEBUG(batch->ValidateFull().ok()); + for (auto&& i : columns) { + auto c = batch->GetColumnByName(i); + Y_VERIFY(c); + Columns.emplace_back(c); + auto f = batch->schema()->GetFieldByName(i); + Fields.emplace_back(f); + } + Y_VERIFY(Columns.size()); + } + + std::partial_ordering Compare(const TSortableBatchPosition& item) const { + Y_VERIFY(item.ReverseSort == ReverseSort); + Y_VERIFY_DEBUG(item.Columns.size() == Columns.size()); + const auto directResult = NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position); + if (ReverseSort) { + if (directResult == std::partial_ordering::less) { + return std::partial_ordering::greater; + } else if (directResult == std::partial_ordering::greater) { + return std::partial_ordering::less; + } else { + return std::partial_ordering::equivalent; + } + } else { + return directResult; + } + } + + bool operator<(const TSortableBatchPosition& item) const { + return Compare(item) == std::partial_ordering::less; + } + + bool NextPosition(const i64 delta) { + return InitPosition(Position + delta); + } + + bool InitPosition(const i64 position) { + if (position < RecordsCount && position >= 0) { + Position = position; + return true; + } else { + return false; + } + + } + +}; + class TMergePartialStream { private: class TBatchIterator { private: - YDB_ACCESSOR(i64, Position, 0); + bool ControlPointFlag; + TSortableBatchPosition KeyColumns; + TSortableBatchPosition VersionColumns; + i64 RecordsCount; + int ReverseSortKff; YDB_OPT(ui32, PoolId); - std::shared_ptr<arrow::RecordBatch> Batch; std::shared_ptr<NArrow::TColumnFilter> Filter; std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator; - std::shared_ptr<NArrow::TColumnFilter::TReverseIterator> ReverseFilterIterator; - - std::vector<std::shared_ptr<arrow::Array>> Columns; - std::vector<std::shared_ptr<arrow::Array>> VersionColumns; - int ReverseSortKff; - i64 RecordsCount; i32 GetFirstPosition() const { if (ReverseSortKff > 0) { @@ -34,123 +99,110 @@ private: } } - i32 GetLastPosition() const { - if (ReverseSortKff > 0) { - return RecordsCount - 1; - } else { - return 0; - } + public: + bool IsControlPoint() const { + return ControlPointFlag; + } + + const TSortableBatchPosition& GetKeyColumns() const { + return KeyColumns; + } + + TBatchIterator(const TSortableBatchPosition& keyColumns) + : ControlPointFlag(true) + , KeyColumns(keyColumns) + { + } - public: TBatchIterator(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, - std::shared_ptr<arrow::Schema> sortSchema, const bool reverseSort, const std::optional<ui32> poolId) - : PoolId(poolId) - , Batch(batch) + const std::vector<std::string>& keyColumns, const bool reverseSort, const std::optional<ui32> poolId) + : ControlPointFlag(false) + , KeyColumns(batch, 0, keyColumns, reverseSort) + , VersionColumns(batch, 0, TIndexInfo::GetSpecialColumnNames(), false) + , RecordsCount(batch->num_rows()) + , ReverseSortKff(reverseSort ? 1 : -1) + , PoolId(poolId) , Filter(filter) - , ReverseSortKff(reverseSort ? -1 : 1) - , RecordsCount(batch->num_rows()) { + { + Y_VERIFY(KeyColumns.InitPosition(GetFirstPosition())); + Y_VERIFY(VersionColumns.InitPosition(GetFirstPosition())); if (Filter) { - if (reverseSort) { - ReverseFilterIterator = std::make_shared<NArrow::TColumnFilter::TReverseIterator>(Filter->GetReverseIterator()); - } else { - FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator()); - } + FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort)); Y_VERIFY(Filter->Size() == RecordsCount); } - Position = GetFirstPosition(); - Y_UNUSED(Batch); - Y_VERIFY(batch->num_rows()); - Y_VERIFY_DEBUG(batch->ValidateFull().ok()); - for (auto&& i : sortSchema->fields()) { - auto c = batch->GetColumnByName(i->name()); - Y_VERIFY(c); - Columns.emplace_back(c); - } - { - auto c = batch->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); - Y_VERIFY(c); - VersionColumns.emplace_back(c); - } - { - auto c = batch->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); - Y_VERIFY(c); - VersionColumns.emplace_back(c); - } } bool CheckNextBatch(const TBatchIterator& nextIterator) { - Y_VERIFY_DEBUG(nextIterator.Columns.size() == Columns.size()); - return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, nextIterator.GetFirstPosition()) * ReverseSortKff < 0; + return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less; } class TPosition { private: - const TBatchIterator* Owner; - ui32 Position = 0; - bool DeletedFlag = false; + TSortableBatchPosition KeyColumns; + TSortableBatchPosition VersionColumns; + bool DeletedFlag; + bool ControlPointFlag; public: + const TSortableBatchPosition& GetKeyColumns() const { + return KeyColumns; + } + + bool IsControlPoint() const { + return ControlPointFlag; + } + bool IsDeleted() const { return DeletedFlag; } void TakeIfMoreActual(const TBatchIterator& anotherIterator) { - if (NArrow::ColumnsCompare(Owner->VersionColumns, Position, anotherIterator.VersionColumns, anotherIterator.Position) < 0) { - Owner = &anotherIterator; - Position = anotherIterator.Position; - DeletedFlag = Owner->IsDeleted(); + Y_VERIFY_DEBUG(KeyColumns.Compare(anotherIterator.KeyColumns) == std::partial_ordering::equivalent); + if (VersionColumns.Compare(anotherIterator.VersionColumns) == std::partial_ordering::less) { + DeletedFlag = anotherIterator.IsDeleted(); + ControlPointFlag = anotherIterator.IsControlPoint(); } } TPosition(const TBatchIterator& owner) - : Owner(&owner) - , Position(Owner->Position) + : KeyColumns(owner.KeyColumns) + , VersionColumns(owner.VersionColumns) + , DeletedFlag(owner.IsDeleted()) + , ControlPointFlag(owner.IsControlPoint()) { - DeletedFlag = Owner->IsDeleted(); - } - - int CompareNoVersion(const TBatchIterator& item) const { - Y_VERIFY_DEBUG(item.Columns.size() == Owner->Columns.size()); - return NArrow::ColumnsCompare(Owner->Columns, Position, item.Columns, item.Position); } }; - int CompareNoVersion(const TBatchIterator& item) const { - Y_VERIFY_DEBUG(item.Columns.size() == Columns.size()); - return NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position); - } - bool IsDeleted() const { - if (FilterIterator) { - return FilterIterator->GetCurrentAcceptance(); - } else if (ReverseFilterIterator) { - return ReverseFilterIterator->GetCurrentAcceptance(); - } else { + if (!FilterIterator) { return false; } + return FilterIterator->GetCurrentAcceptance(); } bool Next() { - bool result = false; - if (ReverseSortKff > 0) { - result = ++Position < RecordsCount; - } else { - result = --Position >= 0; - } + const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff); if (FilterIterator) { Y_VERIFY(result == FilterIterator->Next(1)); - } else if (ReverseFilterIterator) { - Y_VERIFY(result == ReverseFilterIterator->Next(1)); } return result; } bool operator<(const TBatchIterator& item) const { - const int result = CompareNoVersion(item) * ReverseSortKff; - if (result == 0) { - return NArrow::ColumnsCompare(VersionColumns, Position, item.VersionColumns, item.Position) < 0; + const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns); + if (result == std::partial_ordering::equivalent) { + if (IsControlPoint() && item.IsControlPoint()) { + return false; + } else if (IsControlPoint()) { + return false; + } else if (item.IsControlPoint()) { + return true; + } + //don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns) + return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less; } else { - return result > 0; + //inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns) + return result == std::partial_ordering::greater; } } }; @@ -189,7 +241,7 @@ private: it->second.pop_front(); TBatchIterator oldIterator = std::move(SortHeap.back()); SortHeap.pop_back(); - AddToHeap(SortHeap.back().GetPoolIdUnsafe(), it->second.front().GetBatch(), it->second.front().GetFilter(), false); + AddNewToHeap(SortHeap.back().GetPoolIdUnsafe(), it->second.front().GetBatch(), it->second.front().GetFilter(), false); oldIterator.CheckNextBatch(SortHeap.back()); std::push_heap(SortHeap.begin(), SortHeap.end()); } @@ -202,33 +254,27 @@ private: std::vector<TBatchIterator> SortHeap; std::shared_ptr<arrow::Schema> SortSchema; const bool Reverse; + ui32 ControlPoints = 0; TBatchIterator::TPosition DrainCurrentPosition() { Y_VERIFY(SortHeap.size()); auto position = TBatchIterator::TPosition(SortHeap.front()); + if (SortHeap.front().IsControlPoint()) { + return position; + } bool isFirst = true; - while (SortHeap.size() && (isFirst || !position.CompareNoVersion(SortHeap.front()))) { + while (SortHeap.size() && (isFirst || position.GetKeyColumns().Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) { if (!isFirst) { position.TakeIfMoreActual(SortHeap.front()); } + Y_VERIFY(!SortHeap.front().IsControlPoint()); NextInHeap(true); isFirst = false; } return position; } - void AddToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) { - if (!filter || filter->IsTotalAllowFilter()) { - SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, poolId)); - } else if (filter->IsTotalDenyFilter()) { - return; - } else { - SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema, Reverse, poolId)); - } - if (restoreHeap) { - std::push_heap(SortHeap.begin(), SortHeap.end()); - } - } + void AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap); public: TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse) : SortSchema(sortSchema) @@ -248,24 +294,18 @@ public: return it->second.size(); } - void AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { - if (!batch || !batch->num_rows()) { - return; - } - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema)); - if (!poolId) { - IndependentBatches.emplace_back(batch); - AddToHeap(poolId, batch, filter, true); - } else { - auto it = BatchPools.find(*poolId); - if (it == BatchPools.end()) { - it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first; - } - it->second.emplace_back(batch, filter); - if (it->second.size() == 1) { - AddToHeap(poolId, batch, filter, true); - } - } + void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point); + + void RemoveControlPoint(); + + bool ControlPointEnriched() const { + return SortHeap.size() && SortHeap.front().IsControlPoint(); + } + + void AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter); + + bool IsEmpty() const { + return SortHeap.empty(); } bool DrainCurrent() { @@ -274,6 +314,9 @@ public: } while (SortHeap.size()) { auto currentPosition = DrainCurrentPosition(); + if (currentPosition.IsControlPoint()) { + return false; + } if (currentPosition.IsDeleted()) { continue; } |