summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-05-09 20:56:34 +0300
committerivanmorozov <[email protected]>2023-05-09 20:56:34 +0300
commit397ebd37186df6996be059e4d7cbe28d0a5c3c26 (patch)
treea172b80d309f5cf00867251e06a9024d074ce124
parent520e5b3048293285df02d1d268d9b1d035bb803e (diff)
use pools and min/max sorting for merge
-rw-r--r--ydb/core/formats/arrow/arrow_filter.cpp207
-rw-r--r--ydb/core/formats/arrow/arrow_filter.h185
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp22
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.h4
-rw-r--r--ydb/core/formats/arrow/program.cpp2
-rw-r--r--ydb/core/formats/arrow/ut_arrow.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/filter.cpp50
-rw-r--r--ydb/core/tx/columnshard/engines/filter.h6
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h9
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h4
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/container.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/container.h2
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/filter.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/range.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp61
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp66
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h51
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp67
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h273
26 files changed, 668 insertions, 441 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp
index f25780812a9..70fa1187b63 100644
--- a/ydb/core/formats/arrow/arrow_filter.cpp
+++ b/ydb/core/formats/arrow/arrow_filter.cpp
@@ -132,16 +132,10 @@ void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatc
}
}
-std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter() const {
+std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter(const ui32 expectedSize) const {
arrow::BooleanBuilder builder;
- auto res = builder.Reserve(Count);
- Y_VERIFY_OK(res);
- bool currentFilter = GetStartValue();
- for (auto&& i : Filter) {
- Y_VERIFY_OK(builder.AppendValues(i, currentFilter));
- currentFilter = !currentFilter;
- }
-
+ auto res = builder.Reserve(expectedSize);
+ Y_VERIFY_OK(builder.AppendValues(BuildSimpleFilter(expectedSize)));
std::shared_ptr<arrow::BooleanArray> out;
res = builder.Finish(&out);
Y_VERIFY_OK(res);
@@ -168,9 +162,11 @@ bool TColumnFilter::IsTotalDenyFilter() const {
return false;
}
-void TColumnFilter::Reset(const ui32 /*count*/) {
+void TColumnFilter::Reset(const ui32 count) {
Count = 0;
+ FilterPlain.reset();
Filter.clear();
+ Filter.reserve(count / 4);
}
void TColumnFilter::Add(const bool value, const ui32 count) {
@@ -218,33 +214,33 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D
break;
}
- NArrow::TColumnFilter result;
- result.Reset(cmps.size());
+ std::vector<bool> bits;
+ bits.reserve(cmps.size());
switch (compareType) {
case ECompareType::LESS:
for (size_t i = 0; i < cmps.size(); ++i) {
- result.Add(cmps[i] < ECompareResult::BORDER);
+ bits.emplace_back(cmps[i] < ECompareResult::BORDER);
}
break;
case ECompareType::LESS_OR_EQUAL:
for (size_t i = 0; i < cmps.size(); ++i) {
- result.Add(cmps[i] <= ECompareResult::BORDER);
+ bits.emplace_back(cmps[i] <= ECompareResult::BORDER);
}
break;
case ECompareType::GREATER:
for (size_t i = 0; i < cmps.size(); ++i) {
- result.Add(cmps[i] > ECompareResult::BORDER);
+ bits.emplace_back(cmps[i] > ECompareResult::BORDER);
}
break;
case ECompareType::GREATER_OR_EQUAL:
for (size_t i = 0; i < cmps.size(); ++i) {
- result.Add(cmps[i] >= ECompareResult::BORDER);
+ bits.emplace_back(cmps[i] >= ECompareResult::BORDER);
}
break;
}
- return result;
+ return NArrow::TColumnFilter(std::move(bits));
}
bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) {
@@ -259,29 +255,159 @@ bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) {
if (IsTotalAllowFilter()) {
return true;
}
- auto res = arrow::compute::Filter(batch, BuildArrowFilter());
+ auto res = arrow::compute::Filter(batch, BuildArrowFilter(batch->num_rows()));
Y_VERIFY_S(res.ok(), res.status().message());
Y_VERIFY((*res).kind() == arrow::Datum::RECORD_BATCH);
batch = (*res).record_batch();
return batch->num_rows();
}
-void TColumnFilter::CombineSequential(const TColumnFilter& extFilter) {
+const std::vector<bool>& TColumnFilter::BuildSimpleFilter(const ui32 expectedSize) const {
+ if (!FilterPlain) {
+ Y_VERIFY(expectedSize == Count || !Count);
+ std::vector<bool> result;
+ if (Count) {
+ result.resize(Count, true);
+ bool currentValue = GetStartValue();
+ ui32 currentPosition = 0;
+ for (auto&& i : Filter) {
+ if (!currentValue) {
+ memset(&result[currentPosition], 0, sizeof(bool) * i);
+ }
+ currentPosition += i;
+ currentValue = !currentValue;
+ }
+ } else {
+ result.resize(expectedSize, DefaultFilterValue);
+ }
+ FilterPlain = std::move(result);
+ }
+ Y_VERIFY(FilterPlain->size() == expectedSize);
+ return *FilterPlain;
+}
+
+class TMergePolicyAnd {
+private:
+public:
+ static bool Calc(const bool a, const bool b) {
+ return a && b;
+ }
+ static TColumnFilter MergeWithSimple(const TColumnFilter& filter, const bool simpleValue) {
+ if (simpleValue) {
+ return filter;
+ } else {
+ return TColumnFilter::BuildStopFilter();
+ }
+ }
+};
+
+class TMergePolicyOr {
+private:
+public:
+ static bool Calc(const bool a, const bool b) {
+ return a || b;
+ }
+ static TColumnFilter MergeWithSimple(const TColumnFilter& filter, const bool simpleValue) {
+ if (simpleValue) {
+ return TColumnFilter::BuildAllowFilter();
+ } else {
+ return filter;
+ }
+ }
+};
+
+class TColumnFilter::TMergerImpl {
+private:
+ const TColumnFilter& Filter1;
+ const TColumnFilter& Filter2;
+public:
+ TMergerImpl(const TColumnFilter& filter1, const TColumnFilter& filter2)
+ : Filter1(filter1)
+ , Filter2(filter2)
+ {
+
+ }
+
+ template <class TMergePolicy>
+ TColumnFilter Merge() const {
+ if (Filter1.empty() && Filter2.empty()) {
+ return TColumnFilter(TMergePolicy::Calc(Filter1.DefaultFilterValue, Filter2.DefaultFilterValue));
+ } else if (Filter1.empty()) {
+ return TMergePolicy::MergeWithSimple(Filter2, Filter1.DefaultFilterValue);
+ } else if (Filter2.empty()) {
+ return TMergePolicy::MergeWithSimple(Filter1, Filter2.DefaultFilterValue);
+ } else {
+ Y_VERIFY(Filter1.Count == Filter2.Count);
+ auto it1 = Filter1.Filter.cbegin();
+ auto it2 = Filter2.Filter.cbegin();
+
+ std::vector<ui32> resultFilter;
+ resultFilter.reserve(Filter1.Filter.size() + Filter2.Filter.size());
+ ui32 pos1 = 0;
+ ui32 pos2 = 0;
+ bool curValue1 = Filter1.GetStartValue();
+ bool curValue2 = Filter2.GetStartValue();
+ bool curCurrent = false;
+ ui32 count = 0;
+
+ while (it1 != Filter1.Filter.end() && it2 != Filter2.Filter.cend()) {
+ const ui32 delta = TColumnFilter::CrossSize(pos2, pos2 + *it2, pos1, pos1 + *it1);
+ if (delta) {
+ if (!count || curCurrent != TMergePolicy::Calc(curValue1, curValue2)) {
+ resultFilter.emplace_back(delta);
+ curCurrent = TMergePolicy::Calc(curValue1, curValue2);
+ } else {
+ resultFilter.back() += delta;
+ }
+ count += delta;
+ }
+ if (pos1 + *it1 < pos2 + *it2) {
+ pos1 += *it1;
+ curValue1 = !curValue1;
+ ++it1;
+ } else if (pos1 + *it1 > pos2 + *it2) {
+ pos2 += *it2;
+ curValue2 = !curValue2;
+ ++it2;
+ } else {
+ curValue2 = !curValue2;
+ curValue1 = !curValue1;
+ ++it1;
+ ++it2;
+ }
+ }
+ Y_VERIFY(it1 == Filter1.Filter.end() && it2 == Filter2.Filter.cend());
+ TColumnFilter result = TColumnFilter::BuildAllowFilter();
+ std::swap(resultFilter, result.Filter);
+ std::swap(curCurrent, result.CurrentValue);
+ std::swap(count, result.Count);
+ return result;
+ }
+ }
+
+};
+
+TColumnFilter TColumnFilter::And(const TColumnFilter& extFilter) const {
+ FilterPlain.reset();
+ return TMergerImpl(*this, extFilter).Merge<TMergePolicyAnd>();
+}
+
+TColumnFilter TColumnFilter::Or(const TColumnFilter& extFilter) const {
+ FilterPlain.reset();
+ return TMergerImpl(*this, extFilter).Merge<TMergePolicyOr>();
+}
+
+TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter) const {
if (Filter.empty()) {
- DefaultFilterValue = DefaultFilterValue && extFilter.DefaultFilterValue;
- Filter = extFilter.Filter;
- Count = extFilter.Count;
+ return TMergePolicyAnd::MergeWithSimple(extFilter, DefaultFilterValue);
} else if (extFilter.Filter.empty()) {
- if (!extFilter.DefaultFilterValue) {
- DefaultFilterValue = DefaultFilterValue && extFilter.DefaultFilterValue;
- Filter.clear();
- Count = 0;
- }
+ return TMergePolicyAnd::MergeWithSimple(*this, extFilter.DefaultFilterValue);
} else {
auto itSelf = Filter.begin();
auto itExt = extFilter.Filter.cbegin();
- std::deque<ui32> result;
+ std::vector<ui32> resultFilter;
+ resultFilter.reserve(Filter.size() + extFilter.Filter.size());
ui32 selfPos = 0;
ui32 extPos = 0;
bool curSelf = GetStartValue();
@@ -294,10 +420,10 @@ void TColumnFilter::CombineSequential(const TColumnFilter& extFilter) {
const ui32 delta = curSelf ? CrossSize(extPos, extPos + *itExt, selfPos, selfPos + *itSelf) : *itSelf;
if (delta) {
if (!count || curCurrent != (curSelf && curExt)) {
- result.emplace_back(delta);
+ resultFilter.emplace_back(delta);
curCurrent = curSelf && curExt;
} else {
- result.back() += delta;
+ resultFilter.back() += delta;
}
count += delta;
}
@@ -322,23 +448,12 @@ void TColumnFilter::CombineSequential(const TColumnFilter& extFilter) {
}
}
Y_VERIFY(itSelf == Filter.end() && itExt == extFilter.Filter.cend());
- std::swap(result, Filter);
- std::swap(curCurrent, CurrentValue);
- std::swap(count, Count);
- }
-}
-
-std::vector<bool> TColumnFilter::BuildSimpleFilter() const {
- std::vector<bool> result;
- result.reserve(Count);
- bool currentValue = GetStartValue();
- for (auto&& i : Filter) {
- for (ui32 idx = 0; idx < i; ++idx) {
- result.emplace_back(currentValue);
- }
- currentValue = !currentValue;
+ TColumnFilter result = TColumnFilter::BuildAllowFilter();
+ std::swap(resultFilter, result.Filter);
+ std::swap(curCurrent, result.CurrentValue);
+ std::swap(count, result.Count);
+ return result;
}
- return result;
}
}
diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h
index b7df1d633b8..aeb4993d0b2 100644
--- a/ydb/core/formats/arrow/arrow_filter.h
+++ b/ydb/core/formats/arrow/arrow_filter.h
@@ -20,39 +20,58 @@ private:
bool DefaultFilterValue = true;
bool CurrentValue = true;
ui32 Count = 0;
- std::deque<ui32> Filter;
+ std::vector<ui32> Filter;
+ mutable std::optional<std::vector<bool>> FilterPlain;
TColumnFilter(const bool defaultFilterValue)
: DefaultFilterValue(defaultFilterValue)
{
}
- bool GetStartValue() const {
+ bool GetStartValue(const bool reverse = false) const {
if (Filter.empty()) {
return DefaultFilterValue;
}
- bool value = CurrentValue;
- if (Filter.size() % 2 == 0) {
- value = !value;
+ if (reverse) {
+ return CurrentValue;
+ } else {
+ if (Filter.size() % 2 == 0) {
+ return !CurrentValue;
+ } else {
+ return CurrentValue;
+ }
}
- return value;
}
- template <class TIterator>
- class TIteratorImpl {
+ static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2);
+ class TMergerImpl;
+ void Add(const bool value, const ui32 count = 1);
+ void Reset(const ui32 count);
+public:
+
+ class TIterator {
private:
ui32 InternalPosition = 0;
ui32 CurrentRemainVolume = 0;
- TIterator It;
- TIterator ItEnd;
+ const std::vector<ui32>& Filter;
+ i32 Position = 0;
bool CurrentValue;
+ const i32 FinishPosition;
+ const i32 DeltaPosition;
public:
- TIteratorImpl(TIterator itBegin, TIterator itEnd, const bool startValue)
- : It(itBegin)
- , ItEnd(itEnd)
- , CurrentValue(startValue) {
- if (It != ItEnd) {
- CurrentRemainVolume = *It;
+ TIterator(const bool reverse, const std::vector<ui32>& filter, const bool startValue)
+ : Filter(filter)
+ , CurrentValue(startValue)
+ , FinishPosition(reverse ? -1 : Filter.size())
+ , DeltaPosition(reverse ? -1 : 1)
+ {
+ if (!Filter.size()) {
+ Position = FinishPosition;
+ } else {
+ if (reverse) {
+ Position = Filter.size() - 1;
+ }
+ CurrentRemainVolume = Filter[Position];
}
}
@@ -67,22 +86,24 @@ private:
}
bool Next(const ui32 size) {
+ Y_VERIFY(size);
if (CurrentRemainVolume > size) {
InternalPosition += size;
CurrentRemainVolume -= size;
return true;
}
ui32 sizeRemain = size;
- while (It != ItEnd) {
- if (*It - InternalPosition > sizeRemain) {
+ while (Position != FinishPosition) {
+ const ui32 currentVolume = Filter[Position];
+ if (currentVolume - InternalPosition > sizeRemain) {
InternalPosition = sizeRemain;
- CurrentRemainVolume = *It - InternalPosition - sizeRemain;
+ CurrentRemainVolume = currentVolume - InternalPosition - sizeRemain;
return true;
} else {
- sizeRemain -= *It - InternalPosition;
+ sizeRemain -= currentVolume - InternalPosition;
InternalPosition = 0;
CurrentValue = !CurrentValue;
- ++It;
+ Position += DeltaPosition;
}
}
CurrentRemainVolume = 0;
@@ -90,44 +111,22 @@ private:
}
};
- static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2);
-public:
-
- using TIterator = TIteratorImpl<std::deque<ui32>::const_iterator>;
- using TReverseIterator = TIteratorImpl<std::deque<ui32>::const_reverse_iterator>;
-
- template <bool ForReverse>
- class TIteratorSelector {
-
- };
-
- template <>
- class TIteratorSelector<true> {
- public:
- using TIterator = TReverseIterator;
- };
-
- template <>
- class TIteratorSelector<false> {
- public:
- using TIterator = TIterator;
- };
-
- TIterator GetIterator() const {
- return TIterator(Filter.cbegin(), Filter.cend(), GetStartValue());
+ TIterator GetIterator(const bool reverse) const {
+ return TIterator(reverse, Filter, GetStartValue(reverse));
}
- TReverseIterator GetReverseIterator() const {
- return TReverseIterator(Filter.crbegin(), Filter.crend(), CurrentValue);
+ bool empty() const {
+ return Filter.empty();
}
TColumnFilter(std::vector<bool>&& values) {
const ui32 count = values.size();
- Reset(count, std::move(values));
+ Reset(count, values);
+ FilterPlain = std::move(values);
}
template <class TGetter>
- void Reset(const ui32 count, TGetter&& getter) {
+ void Reset(const ui32 count, const TGetter& getter) {
Reset(count);
if (!count) {
return;
@@ -149,11 +148,11 @@ public:
return Count;
}
- std::vector<bool> BuildSimpleFilter() const;
+ const std::vector<bool>& BuildSimpleFilter(const ui32 expectedSize) const;
TColumnFilter() = default;
- std::shared_ptr<arrow::BooleanArray> BuildArrowFilter() const;
+ std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(const ui32 expectedSize) const;
bool IsTotalAllowFilter() const;
@@ -167,83 +166,8 @@ public:
return TColumnFilter(true);
}
- void Reset(const ui32 count);
-
- void Add(const bool value, const ui32 count = 1);
-
- template <class TCalcer>
- void Merge(const TColumnFilter& extFilter, const TCalcer actor) {
- if (Filter.empty() && extFilter.Filter.empty()) {
- DefaultFilterValue = (extFilter.DefaultFilterValue && DefaultFilterValue);
- } else if (Filter.empty()) {
- if (DefaultFilterValue) {
- Filter = extFilter.Filter;
- Count = extFilter.Count;
- CurrentValue = extFilter.CurrentValue;
- }
- } else if (extFilter.Filter.empty()) {
- if (!extFilter.DefaultFilterValue) {
- DefaultFilterValue = false;
- Filter.clear();
- Count = 0;
- }
- } else {
- Y_VERIFY(extFilter.Count == Count);
- auto itSelf = Filter.begin();
- auto itExt = extFilter.Filter.cbegin();
-
- std::deque<ui32> result;
- ui32 selfPos = 0;
- ui32 extPos = 0;
- bool curSelf = GetStartValue();
- bool curExt = extFilter.GetStartValue();
- bool curCurrent = false;
- ui32 count = 0;
-
- while (itSelf != Filter.end() && itExt != extFilter.Filter.cend()) {
- const ui32 delta = CrossSize(extPos, extPos + *itExt, selfPos, selfPos + *itSelf);
- if (delta) {
- if (!count || curCurrent != actor(curSelf, curExt)) {
- result.emplace_back(delta);
- curCurrent = actor(curSelf, curExt);
- } else {
- result.back() += delta;
- }
- count += delta;
- }
- if (selfPos + *itSelf < extPos + *itExt) {
- selfPos += *itSelf;
- curSelf = !curSelf;
- ++itSelf;
- } else if (selfPos + *itSelf > extPos + *itExt) {
- extPos += *itExt;
- curExt = !curExt;
- ++itExt;
- } else {
- curExt = !curExt;
- curSelf = !curSelf;
- ++itSelf;
- ++itExt;
- }
- }
- Y_VERIFY(itSelf == Filter.end() && itExt == extFilter.Filter.cend());
- std::swap(result, Filter);
- std::swap(curCurrent, CurrentValue);
- std::swap(count, Count);
- }
- }
-
-
- void And(const TColumnFilter& extFilter) {
- return Merge(extFilter, [](const bool selfBool, const bool extBool) {
- return selfBool && extBool;
- });
- }
- void Or(const TColumnFilter& extFilter) {
- return Merge(extFilter, [](const bool selfBool, const bool extBool) {
- return selfBool || extBool;
- });
- }
+ TColumnFilter And(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
+ TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
// It makes a filter using composite predicate
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border,
@@ -251,7 +175,8 @@ public:
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch);
- void CombineSequential(const TColumnFilter& extFilter);
+ // Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions)
+ TColumnFilter CombineSequentialAnd(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
};
}
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 456426c4df0..04d5fbd7ce2 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -917,15 +917,21 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b
return true;
}
-int ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow) {
- auto result = TRawReplaceKey(&x, xRow).CompareNotNull(TRawReplaceKey(&y, yRow));
- if (result == std::partial_ordering::greater) {
- return 1;
- } else if (result == std::partial_ordering::less) {
- return -1;
- } else {
- return 0;
+std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow) {
+ return TRawReplaceKey(&x, xRow).CompareNotNull(TRawReplaceKey(&y, yRow));
+}
+
+std::shared_ptr<arrow::RecordBatch> BuildSingleRecordBatch(const std::shared_ptr<arrow::Schema> schema, const std::vector<std::shared_ptr<arrow::Scalar>>& recordData) {
+ std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders = MakeBuilders(schema, 1);
+ Y_VERIFY(builders.size() == recordData.size());
+ for (ui32 i = 0; i < recordData.size(); ++i) {
+ Y_VERIFY(recordData[i]);
+ Y_VERIFY_OK(builders[i]->AppendScalar(*recordData[i]));
}
+
+ auto arrays = NArrow::Finish(std::move(builders));
+ Y_VERIFY(arrays.size() == builders.size());
+ return arrow::RecordBatch::Make(schema, 1, arrays);
}
}
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index 39cf5079fad..c6bebcce3d6 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -133,10 +133,12 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar
bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
-int ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
+std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
+std::shared_ptr<arrow::RecordBatch> BuildSingleRecordBatch(const std::shared_ptr<arrow::Schema> schema, const std::vector<std::shared_ptr<arrow::Scalar>>& recordData);
+
inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) {
return column->null_bitmap_data();
}
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp
index e82fdbf8899..6c1cfe875d4 100644
--- a/ydb/core/formats/arrow/program.cpp
+++ b/ydb/core/formats/arrow/program.cpp
@@ -659,7 +659,7 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const {
}
}
- auto filter = bits.BuildArrowFilter();
+ auto filter = bits.BuildArrowFilter(batch.Rows);
for (int64_t i = 0; i < batch.Schema->num_fields(); ++i) {
bool needed = (allColumns || neededColumns.contains(batch.Schema->field(i)->name()));
if (batch.Datums[i].is_array() && needed) {
diff --git a/ydb/core/formats/arrow/ut_arrow.cpp b/ydb/core/formats/arrow/ut_arrow.cpp
index f0d7eb19dd3..d1d77b1578b 100644
--- a/ydb/core/formats/arrow/ut_arrow.cpp
+++ b/ydb/core/formats/arrow/ut_arrow.cpp
@@ -633,10 +633,10 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
const NArrow::TColumnFilter gt = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER);
const NArrow::TColumnFilter ge = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER_OR_EQUAL);
- UNIT_ASSERT(CheckFilter(lt.BuildSimpleFilter(), 234, true));
- UNIT_ASSERT(CheckFilter(le.BuildSimpleFilter(), 235, true));
- UNIT_ASSERT(CheckFilter(gt.BuildSimpleFilter(), 235, false));
- UNIT_ASSERT(CheckFilter(ge.BuildSimpleFilter(), 234, false));
+ UNIT_ASSERT(CheckFilter(lt.BuildSimpleFilter(table->num_rows()), 234, true));
+ UNIT_ASSERT(CheckFilter(le.BuildSimpleFilter(table->num_rows()), 235, true));
+ UNIT_ASSERT(CheckFilter(gt.BuildSimpleFilter(table->num_rows()), 235, false));
+ UNIT_ASSERT(CheckFilter(ge.BuildSimpleFilter(table->num_rows()), 234, false));
}
Y_UNIT_TEST(SortWithCompositeKey) {
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 4b30d7a5ff6..8595380343d 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -435,13 +435,11 @@ private:
void NextReadMetadata() {
auto g = Stats.MakeGuard("NextReadMetadata");
- ScanIterator.reset();
- ++ReadMetadataIndex;
-
- if (ReadMetadataIndex == ReadMetadataRanges.size()) {
+ if (++ReadMetadataIndex == ReadMetadataRanges.size()) {
// Send empty batch with "finished" flag
MakeResult();
SendResult(false, true);
+ ScanIterator.reset();
return Finish();
}
diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp
index 2285dcd3e4b..3f375626694 100644
--- a/ydb/core/tx/columnshard/engines/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/filter.cpp
@@ -44,45 +44,12 @@ NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatc
return result;
}
-NArrow::TColumnFilter MakeReplaceFilter(const std::shared_ptr<arrow::RecordBatch>& batch,
- THashSet<NArrow::TReplaceKey>& keys) {
- NArrow::TColumnFilter bits;
- bits.Reset(batch->num_rows());
-
- auto columns = std::make_shared<NArrow::TArrayVec>(batch->columns());
-
- for (int i = 0; i < batch->num_rows(); ++i) {
- NArrow::TReplaceKey key(columns, i);
- bits.Add(keys.emplace(key).second);
- }
- return bits;
-}
-
-NArrow::TColumnFilter MakeReplaceFilterLastWins(const std::shared_ptr<arrow::RecordBatch>& batch,
- THashSet<NArrow::TReplaceKey>& keys) {
- if (!batch->num_rows()) {
- return {};
- }
-
- NArrow::TColumnFilter result;
- result.Reset(batch->num_rows());
-
- auto columns = std::make_shared<NArrow::TArrayVec>(batch->columns());
-
- for (int i = batch->num_rows() - 1; i >= 0; --i) {
- NArrow::TReplaceKey key(columns, i);
- result.Add(keys.emplace(key).second);
- }
-
- return result;
-}
-
NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata) {
Y_VERIFY(portion);
NArrow::TColumnFilter result = readMetadata.GetPKRangesFilter().BuildFilter(portion);
if (readMetadata.GetSnapshot().GetPlanStep()) {
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
- result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot()));
+ result = result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot()));
}
return result;
@@ -96,19 +63,4 @@ NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& bat
return ssa->MakeEarlyFilter(batch, NArrow::GetCustomExecContext());
}
-void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::Schema>& replaceSchema, bool lastWins) {
- THashSet<NArrow::TReplaceKey> replaces;
-
- auto keyBatch = NArrow::ExtractColumns(batch, replaceSchema);
-
- NArrow::TColumnFilter bits;
- if (lastWins) {
- bits = MakeReplaceFilterLastWins(keyBatch, replaces);
- } else {
- bits = MakeReplaceFilter(keyBatch, replaces);
- }
- Y_VERIFY(bits.Apply(batch));
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h
index f9628fe90bc..9d5c412d7ec 100644
--- a/ydb/core/tx/columnshard/engines/filter.h
+++ b/ydb/core/tx/columnshard/engines/filter.h
@@ -10,12 +10,6 @@ NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatc
const std::shared_ptr<arrow::Schema>& snapSchema,
const TSnapshot& snapshot);
-NArrow::TColumnFilter MakeReplaceFilter(const std::shared_ptr<arrow::RecordBatch>& batch, THashSet<NArrow::TReplaceKey>& keys);
-NArrow::TColumnFilter MakeReplaceFilterLastWins(const std::shared_ptr<arrow::RecordBatch>& batch, THashSet<NArrow::TReplaceKey>& keys);
-
-void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::Schema>& replaceSchema, bool lastWins = false);
-
struct TReadMetadata;
NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 806d39bb09d..0ee36f922ed 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -146,7 +146,14 @@ public:
void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; }
const TCompression& GetDefaultCompression() const { return DefaultCompression; }
-
+ static const std::vector<std::string>& GetSpecialColumnNames() {
+ static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) };
+ return result;
+ }
+ static const std::vector<ui32>& GetSpecialColumnIds() {
+ static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID };
+ return result;
+ }
private:
ui32 Id;
TString Name;
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 3e12e32006b..ec970378229 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -109,10 +109,6 @@ struct TPortionInfo {
bool CanIntersectOthers() const { return !Valid() || IsInserted(); }
size_t NumRecords() const { return Records.size(); }
- bool IsSortableInGranule() const {
- return !CanIntersectOthers();
- }
-
bool AllowEarlyFilter() const {
return Meta.Produced == TPortionMeta::COMPACTED
|| Meta.Produced == TPortionMeta::SPLIT_COMPACTED;
diff --git a/ydb/core/tx/columnshard/engines/predicate/container.cpp b/ydb/core/tx/columnshard/engines/predicate/container.cpp
index c5e1140bcfb..85b8ed827dc 100644
--- a/ydb/core/tx/columnshard/engines/predicate/container.cpp
+++ b/ydb/core/tx/columnshard/engines/predicate/container.cpp
@@ -2,7 +2,7 @@
#include <library/cpp/actors/core/log.h>
namespace NKikimr::NOlap {
-int TPredicateContainer::ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r) {
+std::partial_ordering TPredicateContainer::ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r) {
Y_VERIFY(l.Batch);
Y_VERIFY(r.Batch);
Y_VERIFY(l.Batch->num_columns());
@@ -99,10 +99,10 @@ bool TPredicateContainer::CrossRanges(const TPredicateContainer& ext) {
if (IsForwardInterval() == ext.IsForwardInterval()) {
return true;
}
- const int result = ComparePredicatesSamePrefix(*Object, *ext.Object);
- if (result < 0) {
+ const std::partial_ordering result = ComparePredicatesSamePrefix(*Object, *ext.Object);
+ if (result == std::partial_ordering::less) {
return IsForwardInterval();
- } else if (result > 0) {
+ } else if (result == std::partial_ordering::greater) {
return ext.IsForwardInterval();
} else if (Object->Batch->num_columns() == ext.Object->Batch->num_columns()) {
return IsInclude() && ext.IsInclude();
diff --git a/ydb/core/tx/columnshard/engines/predicate/container.h b/ydb/core/tx/columnshard/engines/predicate/container.h
index 51b0e66ba93..46ef03665de 100644
--- a/ydb/core/tx/columnshard/engines/predicate/container.h
+++ b/ydb/core/tx/columnshard/engines/predicate/container.h
@@ -23,7 +23,7 @@ private:
: CompareType(compareType) {
}
- static int ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r);
+ static std::partial_ordering ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r);
public:
diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.cpp b/ydb/core/tx/columnshard/engines/predicate/filter.cpp
index e1a798eafbe..4efd14a7996 100644
--- a/ydb/core/tx/columnshard/engines/predicate/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/predicate/filter.cpp
@@ -9,7 +9,7 @@ NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(std::shared_ptr<arro
}
NArrow::TColumnFilter result = SortedRanges.front().BuildFilter(data);
for (ui32 i = 1; i < SortedRanges.size(); ++i) {
- result.Or(SortedRanges[i].BuildFilter(data));
+ result = result.Or(SortedRanges[i].BuildFilter(data));
}
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp
index f9decb2189f..acd16bea9a8 100644
--- a/ydb/core/tx/columnshard/engines/predicate/range.cpp
+++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp
@@ -36,8 +36,7 @@ std::set<std::string> TPKRangeFilter::GetColumnNames() const {
NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const {
NArrow::TColumnFilter result = PredicateTo.BuildFilter(data);
- result.And(PredicateFrom.BuildFilter(data));
- return result;
+ return result.And(PredicateFrom.BuildFilter(data));
}
bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const {
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 6b8b4988f27..912ce96f24f 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -113,7 +113,7 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
for (auto&& columnInfo : orderedObjects) {
ui32 expected = 0;
- auto it = FetchedInfo.GetFilter()->GetIterator();
+ auto it = FetchedInfo.GetFilter()->GetIterator(false);
bool undefinedShift = false;
bool itFinished = false;
for (auto&& [chunk, rec] : columnInfo.second) {
@@ -163,4 +163,63 @@ ui64 TBatch::GetUsefulBytes(const ui64 bytes) const {
return bytes * FetchedInfo.GetUsefulDataKff();
}
+std::shared_ptr<TSortableBatchPosition> TBatch::GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const {
+ if (!FirstPK || !LastPK) {
+ std::shared_ptr<TSortableBatchPosition> from;
+ std::shared_ptr<TSortableBatchPosition> to;
+ GetPKBorders(reverse, indexInfo, from, to);
+ }
+ if (reverse) {
+ return *ReverseFirstPK;
+ } else {
+ return *FirstPK;
+ }
+}
+
+void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const {
+ from = nullptr;
+ to = nullptr;
+ if (!FirstPK || !LastPK) {
+ std::vector<std::shared_ptr<arrow::Scalar>> minRecord;
+ std::vector<std::shared_ptr<arrow::Scalar>> maxRecord;
+ for (auto&& i : indexInfo.GetReplaceKey()->fields()) {
+ const ui32 columnId = indexInfo.GetColumnId(i->name());
+ std::shared_ptr<arrow::Scalar> minScalar;
+ std::shared_ptr<arrow::Scalar> maxScalar;
+ PortionInfo->MinMaxValue(columnId, minScalar, maxScalar);
+ if (!FirstPK && !minScalar) {
+ FirstPK = nullptr;
+ ReverseLastPK = nullptr;
+ } else {
+ minRecord.emplace_back(minScalar);
+ }
+ if (!LastPK && !maxScalar) {
+ LastPK = nullptr;
+ ReverseFirstPK = nullptr;
+ } else {
+ maxRecord.emplace_back(maxScalar);
+ }
+ }
+ if (!FirstPK) {
+ auto batch = NArrow::BuildSingleRecordBatch(indexInfo.GetReplaceKey(), minRecord);
+ Y_VERIFY(batch);
+ FirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexInfo.GetReplaceKey()->field_names(), false);
+ ReverseLastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexInfo.GetReplaceKey()->field_names(), true);
+ }
+ if (!LastPK) {
+ auto batch = NArrow::BuildSingleRecordBatch(indexInfo.GetReplaceKey(), maxRecord);
+ Y_VERIFY(batch);
+ LastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexInfo.GetReplaceKey()->field_names(), false);
+ ReverseFirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexInfo.GetReplaceKey()->field_names(), true);
+ }
+ }
+ if (reverse) {
+ from = *ReverseFirstPK;
+ to = *ReverseLastPK;
+ } else {
+ from = *FirstPK;
+ to = *LastPK;
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 583466d7867..86b767c88a9 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -1,6 +1,7 @@
#pragma once
#include "common.h"
#include "conveyor_task.h"
+#include "read_filter_merger.h"
#include <ydb/core/formats/arrow/arrow_filter.h>
#include <ydb/core/tx/columnshard/blob.h>
@@ -77,7 +78,11 @@ private:
YDB_READONLY(ui64, FetchedBytes, 0);
THashSet<TBlobRange> WaitIndexed;
-
+ mutable std::optional<std::shared_ptr<TSortableBatchPosition>> FirstPK;
+ mutable std::optional<std::shared_ptr<TSortableBatchPosition>> LastPK;
+ mutable std::optional<std::shared_ptr<TSortableBatchPosition>> ReverseFirstPK;
+ mutable std::optional<std::shared_ptr<TSortableBatchPosition>> ReverseLastPK;
+
YDB_READONLY_FLAG(DuplicationsAvailable, false);
YDB_READONLY_DEF(TBatchFetchedInfo, FetchedInfo);
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
@@ -90,6 +95,9 @@ private:
ui64 GetUsefulBytes(const ui64 bytes) const;
public:
+ std::shared_ptr<TSortableBatchPosition> GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const;
+ void GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const;
+
bool AllowEarlyFilter() const {
return PortionInfo->AllowEarlyFilter();
}
@@ -105,9 +113,6 @@ public:
return GetUsefulBytes(FetchedBytes);
}
- bool IsSortableInGranule() const {
- return PortionInfo->IsSortableInGranule();
- }
TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo);
bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData);
bool AskedColumnsAlready(const std::set<ui32>& columnIds) const;
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index 7d2b7e15487..2b7cb49bbed 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -16,6 +16,7 @@ TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMe
UsedColumns = ReadMetadata->GetUsedColumnIds();
EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds();
FilterStageColumns = SortingPolicy->GetFilterStageColumns();
+ PKColumnNames = ReadMetadata->GetReplaceKey()->field_names();
PostFilterColumns = ReadMetadata->GetUsedColumnIds();
for (auto&& i : FilterStageColumns) {
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index d57a6c0b9d7..1ee7bd46b90 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -12,6 +12,7 @@ namespace NKikimr::NOlap::NIndexedReader {
class TGranulesFillingContext {
private:
+ YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames);
bool AbortedFlag = false;
TReadMetadata::TConstPtr ReadMetadata;
const bool InternalReading = false;
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index 8f9079d3c30..24a29917d86 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -23,7 +23,7 @@ bool TAssembleFilter::DoExecuteImpl() {
if (ReadMetadata->Program) {
if (AllowEarlyFilter) {
auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program));
- Filter->CombineSequential(*earlyFilter);
+ Filter = std::make_shared<NArrow::TColumnFilter>(Filter->CombineSequentialAnd(*earlyFilter));
if (!earlyFilter->Apply(batch)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount);
FilteredBatch = nullptr;
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index 8ad6a8b871a..ff24281f35c 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -17,11 +17,7 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
Y_VERIFY(!ReadyFlag);
Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchAddress().GetBatchGranuleIdx()));
if (batch && batch->num_rows()) {
- if (batchInfo.IsSortableInGranule()) {
- SortableBatches.emplace_back(batch);
- } else {
- NonSortableBatches.emplace_back(batch);
- }
+ RecordBatches.emplace_back(batch);
auto& indexInfo = Owner->GetReadMetadata()->GetIndexInfo();
if (!batchInfo.IsDuplicationsAvailable()) {
@@ -62,35 +58,40 @@ bool TGranule::OnFilterReady(TBatch& batchInfo) {
return Owner->GetSortingPolicy()->OnFilterReady(batchInfo, *this, *Owner);
}
-std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata) {
- std::deque<TBatch*> batches;
+std::deque<TGranule::TBatchForMerge> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata) {
+ std::deque<TBatchForMerge> batches;
for (auto&& i : Batches) {
- batches.emplace_back(&i);
+ std::shared_ptr<TSortableBatchPosition> from;
+ std::shared_ptr<TSortableBatchPosition> to;
+ i.GetPKBorders(reverse, readMetadata->GetIndexInfo(), from, to);
+ batches.emplace_back(TBatchForMerge(&i, from, to));
}
- const int reverseKff = reverse ? -1 : 0;
- auto& indexInfo = readMetadata->GetIndexInfo();
- const auto pred = [reverseKff, indexInfo](const TBatch* l, const TBatch* r) {
- if (l->IsSortableInGranule() && r->IsSortableInGranule()) {
- return l->GetPortionInfo().CompareMinByPk(r->GetPortionInfo(), indexInfo) * reverseKff < 0;
- } else if (l->IsSortableInGranule()) {
- return false;
- } else if (r->IsSortableInGranule()) {
- return true;
- } else {
- return false;
+ std::sort(batches.begin(), batches.end());
+ ui32 currentPoolId = 0;
+ std::map<TSortableBatchPosition, ui32> poolIds;
+ for (auto&& i : batches) {
+ if (!i.GetFrom() && !i.GetTo()) {
+ continue;
}
- };
- std::sort(batches.begin(), batches.end(), pred);
- bool nonCompactedSerial = true;
- for (ui32 i = 0; i + 1 < batches.size(); ++i) {
- if (batches[i]->IsSortableInGranule()) {
- auto& l = *batches[i];
- auto& r = *batches[i + 1];
- Y_VERIFY(r.IsSortableInGranule());
- Y_VERIFY(l.GetPortionInfo().CompareSelfMaxItemMinByPk(r.GetPortionInfo(), indexInfo) * reverseKff <= 0);
- nonCompactedSerial = false;
- } else {
- Y_VERIFY(nonCompactedSerial);
+ if (i.GetFrom()) {
+ auto it = poolIds.rbegin();
+ for (; it != poolIds.rend(); ++it) {
+ if (it->first.Compare(*i.GetFrom()) < 0) {
+ break;
+ }
+ }
+ if (it != poolIds.rend()) {
+ i.SetPoolId(it->second);
+ if (i.GetTo()) {
+ poolIds.erase(it->first);
+ poolIds.emplace(*i.GetTo(), *i.GetPoolId());
+ } else {
+ poolIds.erase(it->first);
+ }
+ } else if (i.GetTo()) {
+ i.SetPoolId(++currentPoolId);
+ poolIds.emplace(*i.GetTo(), *i.GetPoolId());
+ }
}
}
return batches;
@@ -107,6 +108,9 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
if (batch && batch->num_rows()) {
Y_VERIFY(!NotIndexedBatch);
NotIndexedBatch = batch;
+ if (NotIndexedBatch) {
+ RecordBatches.emplace_back(NotIndexedBatch);
+ }
if (Owner->GetReadMetadata()->Program) {
NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program));
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index 442af41ed99..4c8f12b4e73 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -20,8 +20,7 @@ private:
std::shared_ptr<arrow::RecordBatch> NotIndexedBatch;
std::shared_ptr<NArrow::TColumnFilter> NotIndexedBatchFutureFilter;
- std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches;
- std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> RecordBatches;
bool DuplicationsAvailableFlag = false;
bool ReadyFlag = false;
std::deque<TBatch> Batches;
@@ -71,14 +70,7 @@ public:
}
std::vector<std::shared_ptr<arrow::RecordBatch>> GetReadyBatches() const {
- std::vector<std::shared_ptr<arrow::RecordBatch>> result;
- result.reserve(SortableBatches.size() + NonSortableBatches.size() + 1);
- if (NotIndexedBatch) {
- result.emplace_back(NotIndexedBatch);
- }
- result.insert(result.end(), NonSortableBatches.begin(), NonSortableBatches.end());
- result.insert(result.end(), SortableBatches.begin(), SortableBatches.end());
- return result;
+ return RecordBatches;
}
TBatch& GetBatchInfo(const ui32 batchIdx) {
@@ -91,7 +83,44 @@ public:
const TGranulesFillingContext& GetOwner() const {
return *Owner;
}
- std::deque<TBatch*> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata);
+
+ class TBatchForMerge {
+ private:
+ TBatch* Batch = nullptr;
+ YDB_ACCESSOR_DEF(std::optional<ui32>, PoolId);
+ YDB_ACCESSOR_DEF(std::shared_ptr<TSortableBatchPosition>, From);
+ YDB_ACCESSOR_DEF(std::shared_ptr<TSortableBatchPosition>, To);
+ public:
+ TBatch* operator->() {
+ return Batch;
+ }
+
+ TBatch& operator*() {
+ return *Batch;
+ }
+
+ TBatchForMerge(TBatch* batch, std::shared_ptr<TSortableBatchPosition> from, std::shared_ptr<TSortableBatchPosition> to)
+ : Batch(batch)
+ , From(from)
+ , To(to)
+ {
+ Y_VERIFY(Batch);
+ }
+
+ bool operator<(const TBatchForMerge& item) const {
+ if (!From && !item.From) {
+ return false;
+ } else if (From && item.From) {
+ return From->Compare(*item.From) < 0;
+ } else if (!From) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ std::deque<TBatchForMerge> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata);
const std::set<ui32>& GetEarlyFilterColumns() const;
void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
index ead178faca5..e183f7b53e2 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
@@ -26,25 +26,34 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont
}
auto& batches = g.GetOrderedBatches();
while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) {
- TBatch* b = batches.front();
- std::optional<ui32> batchPoolId;
- if (b->IsSortableInGranule()) {
- ++CountSorted;
- batchPoolId = 0;
+ TGranule::TBatchForMerge& b = batches.front();
+ if (!b.GetPoolId()) {
+ ++CountNotSortedPortions;
} else {
- ++CountNotSorted;
+ ++CountBatchesByPools[*b.GetPoolId()];
}
- MergeStream.AddPoolSource(batchPoolId, b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter());
+ ++CountProcessedBatches;
+ MergeStream.AddPoolSource(b.GetPoolId(), b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter());
OnBatchFilterInitialized(*b, context);
batches.pop_front();
- while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.DrainCurrent()) {
- if (!--CurrentItemsLimit) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "stop_on_limit")
- ("limit", ReadMetadata->Limit)("sorted_count", CountSorted)("unsorted_count", CountNotSorted)("granules_count", CountProcessedGranules);
+ if (batches.size()) {
+ auto nextBatchControlPoint = batches.front()->GetFirstPK(ReadMetadata->IsDescSorted(), ReadMetadata->GetIndexInfo());
+ if (!nextBatchControlPoint) {
+ continue;
}
+ MergeStream.PutControlPoint(nextBatchControlPoint);
+ }
+ while (CurrentItemsLimit && MergeStream.DrainCurrent()) {
+ --CurrentItemsLimit;
+ }
+ if (MergeStream.ControlPointEnriched()) {
+ MergeStream.RemoveControlPoint();
+ } else if (batches.size()) {
+ Y_VERIFY(!CurrentItemsLimit);
}
}
if (batches.empty()) {
+ Y_VERIFY(MergeStream.IsEmpty() || !CurrentItemsLimit);
GranulesOutOrderForPortions.pop_front();
} else {
break;
@@ -57,11 +66,23 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont
while (batches.size()) {
auto b = batches.front();
context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns()));
+ ++CountSkippedBatches;
b->InitBatch(nullptr);
batches.pop_front();
}
+ ++CountSkippedGranules;
GranulesOutOrderForPortions.pop_front();
}
+ if (GranulesOutOrderForPortions.empty()) {
+ if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "stop_on_limit")
+ ("limit", ReadMetadata->Limit)("limit_reached", !CurrentItemsLimit)
+ ("processed_batches", CountProcessedBatches)("processed_granules", CountProcessedGranules)
+ ("skipped_batches", CountSkippedBatches)("skipped_granules", CountSkippedGranules)
+ ("pools_count", CountBatchesByPools.size())("bad_pool_size", CountNotSortedPortions)
+ ;
+ }
+ }
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
index 36ec23b9c94..79c22238974 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
@@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NIndexedReader {
class TGranuleOrdered {
private:
bool StartedFlag = false;
- std::deque<TBatch*> OrderedBatches;
+ std::deque<TGranule::TBatchForMerge> OrderedBatches;
TGranule* Granule = nullptr;
public:
bool Start() {
@@ -20,13 +20,13 @@ public:
}
- TGranuleOrdered(std::deque<TBatch*>&& orderedBatches, TGranule* granule)
+ TGranuleOrdered(std::deque<TGranule::TBatchForMerge>&& orderedBatches, TGranule* granule)
: OrderedBatches(std::move(orderedBatches))
, Granule(granule)
{
}
- std::deque<TBatch*>& GetOrderedBatches() noexcept {
+ std::deque<TGranule::TBatchForMerge>& GetOrderedBatches() noexcept {
return OrderedBatches;
}
@@ -41,9 +41,12 @@ private:
std::deque<TGranule*> GranulesOutOrder;
std::deque<TGranuleOrdered> GranulesOutOrderForPortions;
ui32 CurrentItemsLimit = 0;
+ THashMap<ui32, ui32> CountBatchesByPools;
ui32 CountProcessedGranules = 0;
- ui32 CountNotSorted = 0;
- ui32 CountSorted = 0;
+ ui32 CountSkippedBatches = 0;
+ ui32 CountProcessedBatches = 0;
+ ui32 CountNotSortedPortions = 0;
+ ui32 CountSkippedGranules = 0;
TMergePartialStream MergeStream;
protected:
virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override;
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
index 7456cdc0b72..9540904a4cf 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -2,4 +2,71 @@
namespace NKikimr::NOlap::NIndexedReader {
+bool TSortableBatchPosition::IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const {
+ if (Fields.size() != (size_t)schema->num_fields()) {
+ return false;
+ }
+ for (ui32 i = 0; i < Fields.size(); ++i) {
+ if (Fields[i]->type() != schema->field(i)->type()) {
+ return false;
+ }
+ if (Fields[i]->name() != schema->field(i)->name()) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
+ Y_VERIFY(point);
+ Y_VERIFY(point->IsSameSchema(SortSchema));
+ Y_VERIFY(++ControlPoints == 1);
+ SortHeap.emplace_back(TBatchIterator(*point));
+ if (SortHeap.size() > 1) {
+ Y_VERIFY(SortHeap.front().GetKeyColumns().Compare(SortHeap.back().GetKeyColumns()) != std::partial_ordering::greater);
+ }
+ std::push_heap(SortHeap.begin(), SortHeap.end());
+}
+
+void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
+ if (!batch || !batch->num_rows()) {
+ return;
+ }
+ Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema));
+ if (!poolId) {
+ IndependentBatches.emplace_back(batch);
+ AddNewToHeap(poolId, batch, filter, true);
+ } else {
+ auto it = BatchPools.find(*poolId);
+ if (it == BatchPools.end()) {
+ it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first;
+ }
+ it->second.emplace_back(batch, filter);
+ if (it->second.size() == 1) {
+ AddNewToHeap(poolId, batch, filter, true);
+ }
+ }
+}
+
+void TMergePartialStream::AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) {
+ if (!filter || filter->IsTotalAllowFilter()) {
+ SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), Reverse, poolId));
+ } else if (filter->IsTotalDenyFilter()) {
+ return;
+ } else {
+ SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), Reverse, poolId));
+ }
+ if (restoreHeap) {
+ std::push_heap(SortHeap.begin(), SortHeap.end());
+ }
+}
+
+void TMergePartialStream::RemoveControlPoint() {
+ Y_VERIFY(ControlPoints == 1);
+ Y_VERIFY(ControlPointEnriched());
+ Y_VERIFY(-- ControlPoints == 0);
+ std::pop_heap(SortHeap.begin(), SortHeap.end());
+ SortHeap.pop_back();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index 1c481b528c4..ef5de23549f 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -9,22 +9,87 @@
namespace NKikimr::NOlap::NIndexedReader {
+class TSortableBatchPosition {
+protected:
+ i64 Position = 0;
+ i64 RecordsCount = 0;
+ bool ReverseSort = false;
+ std::vector<std::shared_ptr<arrow::Array>> Columns;
+ std::vector<std::shared_ptr<arrow::Field>> Fields;
+ std::shared_ptr<arrow::RecordBatch> Batch;
+public:
+ TSortableBatchPosition() = default;
+
+ bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const;
+
+ TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& columns, const bool reverseSort)
+ : Position(position)
+ , RecordsCount(batch->num_rows())
+ , ReverseSort(reverseSort)
+ , Batch(batch)
+ {
+ Y_VERIFY(batch->num_rows());
+ Y_VERIFY_DEBUG(batch->ValidateFull().ok());
+ for (auto&& i : columns) {
+ auto c = batch->GetColumnByName(i);
+ Y_VERIFY(c);
+ Columns.emplace_back(c);
+ auto f = batch->schema()->GetFieldByName(i);
+ Fields.emplace_back(f);
+ }
+ Y_VERIFY(Columns.size());
+ }
+
+ std::partial_ordering Compare(const TSortableBatchPosition& item) const {
+ Y_VERIFY(item.ReverseSort == ReverseSort);
+ Y_VERIFY_DEBUG(item.Columns.size() == Columns.size());
+ const auto directResult = NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position);
+ if (ReverseSort) {
+ if (directResult == std::partial_ordering::less) {
+ return std::partial_ordering::greater;
+ } else if (directResult == std::partial_ordering::greater) {
+ return std::partial_ordering::less;
+ } else {
+ return std::partial_ordering::equivalent;
+ }
+ } else {
+ return directResult;
+ }
+ }
+
+ bool operator<(const TSortableBatchPosition& item) const {
+ return Compare(item) == std::partial_ordering::less;
+ }
+
+ bool NextPosition(const i64 delta) {
+ return InitPosition(Position + delta);
+ }
+
+ bool InitPosition(const i64 position) {
+ if (position < RecordsCount && position >= 0) {
+ Position = position;
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
+};
+
class TMergePartialStream {
private:
class TBatchIterator {
private:
- YDB_ACCESSOR(i64, Position, 0);
+ bool ControlPointFlag;
+ TSortableBatchPosition KeyColumns;
+ TSortableBatchPosition VersionColumns;
+ i64 RecordsCount;
+ int ReverseSortKff;
YDB_OPT(ui32, PoolId);
- std::shared_ptr<arrow::RecordBatch> Batch;
std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;
- std::shared_ptr<NArrow::TColumnFilter::TReverseIterator> ReverseFilterIterator;
-
- std::vector<std::shared_ptr<arrow::Array>> Columns;
- std::vector<std::shared_ptr<arrow::Array>> VersionColumns;
- int ReverseSortKff;
- i64 RecordsCount;
i32 GetFirstPosition() const {
if (ReverseSortKff > 0) {
@@ -34,123 +99,110 @@ private:
}
}
- i32 GetLastPosition() const {
- if (ReverseSortKff > 0) {
- return RecordsCount - 1;
- } else {
- return 0;
- }
+ public:
+ bool IsControlPoint() const {
+ return ControlPointFlag;
+ }
+
+ const TSortableBatchPosition& GetKeyColumns() const {
+ return KeyColumns;
+ }
+
+ TBatchIterator(const TSortableBatchPosition& keyColumns)
+ : ControlPointFlag(true)
+ , KeyColumns(keyColumns)
+ {
+
}
- public:
TBatchIterator(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
- std::shared_ptr<arrow::Schema> sortSchema, const bool reverseSort, const std::optional<ui32> poolId)
- : PoolId(poolId)
- , Batch(batch)
+ const std::vector<std::string>& keyColumns, const bool reverseSort, const std::optional<ui32> poolId)
+ : ControlPointFlag(false)
+ , KeyColumns(batch, 0, keyColumns, reverseSort)
+ , VersionColumns(batch, 0, TIndexInfo::GetSpecialColumnNames(), false)
+ , RecordsCount(batch->num_rows())
+ , ReverseSortKff(reverseSort ? 1 : -1)
+ , PoolId(poolId)
, Filter(filter)
- , ReverseSortKff(reverseSort ? -1 : 1)
- , RecordsCount(batch->num_rows()) {
+ {
+ Y_VERIFY(KeyColumns.InitPosition(GetFirstPosition()));
+ Y_VERIFY(VersionColumns.InitPosition(GetFirstPosition()));
if (Filter) {
- if (reverseSort) {
- ReverseFilterIterator = std::make_shared<NArrow::TColumnFilter::TReverseIterator>(Filter->GetReverseIterator());
- } else {
- FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator());
- }
+ FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort));
Y_VERIFY(Filter->Size() == RecordsCount);
}
- Position = GetFirstPosition();
- Y_UNUSED(Batch);
- Y_VERIFY(batch->num_rows());
- Y_VERIFY_DEBUG(batch->ValidateFull().ok());
- for (auto&& i : sortSchema->fields()) {
- auto c = batch->GetColumnByName(i->name());
- Y_VERIFY(c);
- Columns.emplace_back(c);
- }
- {
- auto c = batch->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
- Y_VERIFY(c);
- VersionColumns.emplace_back(c);
- }
- {
- auto c = batch->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
- Y_VERIFY(c);
- VersionColumns.emplace_back(c);
- }
}
bool CheckNextBatch(const TBatchIterator& nextIterator) {
- Y_VERIFY_DEBUG(nextIterator.Columns.size() == Columns.size());
- return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, nextIterator.GetFirstPosition()) * ReverseSortKff < 0;
+ return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less;
}
class TPosition {
private:
- const TBatchIterator* Owner;
- ui32 Position = 0;
- bool DeletedFlag = false;
+ TSortableBatchPosition KeyColumns;
+ TSortableBatchPosition VersionColumns;
+ bool DeletedFlag;
+ bool ControlPointFlag;
public:
+ const TSortableBatchPosition& GetKeyColumns() const {
+ return KeyColumns;
+ }
+
+ bool IsControlPoint() const {
+ return ControlPointFlag;
+ }
+
bool IsDeleted() const {
return DeletedFlag;
}
void TakeIfMoreActual(const TBatchIterator& anotherIterator) {
- if (NArrow::ColumnsCompare(Owner->VersionColumns, Position, anotherIterator.VersionColumns, anotherIterator.Position) < 0) {
- Owner = &anotherIterator;
- Position = anotherIterator.Position;
- DeletedFlag = Owner->IsDeleted();
+ Y_VERIFY_DEBUG(KeyColumns.Compare(anotherIterator.KeyColumns) == std::partial_ordering::equivalent);
+ if (VersionColumns.Compare(anotherIterator.VersionColumns) == std::partial_ordering::less) {
+ DeletedFlag = anotherIterator.IsDeleted();
+ ControlPointFlag = anotherIterator.IsControlPoint();
}
}
TPosition(const TBatchIterator& owner)
- : Owner(&owner)
- , Position(Owner->Position)
+ : KeyColumns(owner.KeyColumns)
+ , VersionColumns(owner.VersionColumns)
+ , DeletedFlag(owner.IsDeleted())
+ , ControlPointFlag(owner.IsControlPoint())
{
- DeletedFlag = Owner->IsDeleted();
- }
-
- int CompareNoVersion(const TBatchIterator& item) const {
- Y_VERIFY_DEBUG(item.Columns.size() == Owner->Columns.size());
- return NArrow::ColumnsCompare(Owner->Columns, Position, item.Columns, item.Position);
}
};
- int CompareNoVersion(const TBatchIterator& item) const {
- Y_VERIFY_DEBUG(item.Columns.size() == Columns.size());
- return NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position);
- }
-
bool IsDeleted() const {
- if (FilterIterator) {
- return FilterIterator->GetCurrentAcceptance();
- } else if (ReverseFilterIterator) {
- return ReverseFilterIterator->GetCurrentAcceptance();
- } else {
+ if (!FilterIterator) {
return false;
}
+ return FilterIterator->GetCurrentAcceptance();
}
bool Next() {
- bool result = false;
- if (ReverseSortKff > 0) {
- result = ++Position < RecordsCount;
- } else {
- result = --Position >= 0;
- }
+ const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff);
if (FilterIterator) {
Y_VERIFY(result == FilterIterator->Next(1));
- } else if (ReverseFilterIterator) {
- Y_VERIFY(result == ReverseFilterIterator->Next(1));
}
return result;
}
bool operator<(const TBatchIterator& item) const {
- const int result = CompareNoVersion(item) * ReverseSortKff;
- if (result == 0) {
- return NArrow::ColumnsCompare(VersionColumns, Position, item.VersionColumns, item.Position) < 0;
+ const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns);
+ if (result == std::partial_ordering::equivalent) {
+ if (IsControlPoint() && item.IsControlPoint()) {
+ return false;
+ } else if (IsControlPoint()) {
+ return false;
+ } else if (item.IsControlPoint()) {
+ return true;
+ }
+ //don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns)
+ return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less;
} else {
- return result > 0;
+ //inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns)
+ return result == std::partial_ordering::greater;
}
}
};
@@ -189,7 +241,7 @@ private:
it->second.pop_front();
TBatchIterator oldIterator = std::move(SortHeap.back());
SortHeap.pop_back();
- AddToHeap(SortHeap.back().GetPoolIdUnsafe(), it->second.front().GetBatch(), it->second.front().GetFilter(), false);
+ AddNewToHeap(SortHeap.back().GetPoolIdUnsafe(), it->second.front().GetBatch(), it->second.front().GetFilter(), false);
oldIterator.CheckNextBatch(SortHeap.back());
std::push_heap(SortHeap.begin(), SortHeap.end());
}
@@ -202,33 +254,27 @@ private:
std::vector<TBatchIterator> SortHeap;
std::shared_ptr<arrow::Schema> SortSchema;
const bool Reverse;
+ ui32 ControlPoints = 0;
TBatchIterator::TPosition DrainCurrentPosition() {
Y_VERIFY(SortHeap.size());
auto position = TBatchIterator::TPosition(SortHeap.front());
+ if (SortHeap.front().IsControlPoint()) {
+ return position;
+ }
bool isFirst = true;
- while (SortHeap.size() && (isFirst || !position.CompareNoVersion(SortHeap.front()))) {
+ while (SortHeap.size() && (isFirst || position.GetKeyColumns().Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) {
if (!isFirst) {
position.TakeIfMoreActual(SortHeap.front());
}
+ Y_VERIFY(!SortHeap.front().IsControlPoint());
NextInHeap(true);
isFirst = false;
}
return position;
}
- void AddToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) {
- if (!filter || filter->IsTotalAllowFilter()) {
- SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, poolId));
- } else if (filter->IsTotalDenyFilter()) {
- return;
- } else {
- SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema, Reverse, poolId));
- }
- if (restoreHeap) {
- std::push_heap(SortHeap.begin(), SortHeap.end());
- }
- }
+ void AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap);
public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse)
: SortSchema(sortSchema)
@@ -248,24 +294,18 @@ public:
return it->second.size();
}
- void AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
- if (!batch || !batch->num_rows()) {
- return;
- }
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema));
- if (!poolId) {
- IndependentBatches.emplace_back(batch);
- AddToHeap(poolId, batch, filter, true);
- } else {
- auto it = BatchPools.find(*poolId);
- if (it == BatchPools.end()) {
- it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first;
- }
- it->second.emplace_back(batch, filter);
- if (it->second.size() == 1) {
- AddToHeap(poolId, batch, filter, true);
- }
- }
+ void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point);
+
+ void RemoveControlPoint();
+
+ bool ControlPointEnriched() const {
+ return SortHeap.size() && SortHeap.front().IsControlPoint();
+ }
+
+ void AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
+
+ bool IsEmpty() const {
+ return SortHeap.empty();
}
bool DrainCurrent() {
@@ -274,6 +314,9 @@ public:
}
while (SortHeap.size()) {
auto currentPosition = DrainCurrentPosition();
+ if (currentPosition.IsControlPoint()) {
+ return false;
+ }
if (currentPosition.IsDeleted()) {
continue;
}