aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-02 15:22:51 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-02 15:22:51 +0300
commit4cb4a9b3b10be8abd9a00540c5e92ec3b99af140 (patch)
treec9fbf1f621d1363804411ed235546ee7db362443
parent74fc0d19a74a14170ab2a1db5c7934fd067b096f (diff)
downloadydb-4cb4a9b3b10be8abd9a00540c5e92ec3b99af140.tar.gz
merge items for limit usage (instead of pessimistic additive mark)
-rw-r--r--ydb/core/formats/arrow_filter.h66
-rw-r--r--ydb/core/formats/arrow_helpers.cpp11
-rw-r--r--ydb/core/formats/arrow_helpers.h1
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp36
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h11
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp23
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h19
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.h33
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h232
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h1
24 files changed, 478 insertions, 79 deletions
diff --git a/ydb/core/formats/arrow_filter.h b/ydb/core/formats/arrow_filter.h
index a119ed59b1a..86c33949083 100644
--- a/ydb/core/formats/arrow_filter.h
+++ b/ydb/core/formats/arrow_filter.h
@@ -38,32 +38,45 @@ private:
return value;
}
-public:
-
- class TIterator {
+ template <class TIterator>
+ class TIteratorImpl {
private:
ui32 InternalPosition = 0;
- std::deque<ui32>::const_iterator It;
- std::deque<ui32>::const_iterator ItEnd;
+ ui32 CurrentRemainVolume = 0;
+ TIterator It;
+ TIterator ItEnd;
bool CurrentValue;
public:
- TIterator(const std::deque<ui32>& filter, const bool startValue)
- : It(filter.begin())
- , ItEnd(filter.end())
- , CurrentValue(startValue)
- {
+ TIteratorImpl(TIterator itBegin, TIterator itEnd, const bool startValue)
+ : It(itBegin)
+ , ItEnd(itEnd)
+ , CurrentValue(startValue) {
+ if (It != ItEnd) {
+ CurrentRemainVolume = *It;
+ }
+ }
+ bool GetCurrentAcceptance() const {
+ Y_VERIFY_DEBUG(CurrentRemainVolume);
+ return CurrentValue;
}
bool IsBatchForSkip(const ui32 size) const {
- return !CurrentValue && (*It - InternalPosition) >= size;
+ Y_VERIFY_DEBUG(CurrentRemainVolume);
+ return !CurrentValue && CurrentRemainVolume >= size;
}
bool Next(const ui32 size) {
+ if (CurrentRemainVolume > size) {
+ InternalPosition += size;
+ CurrentRemainVolume -= size;
+ return true;
+ }
ui32 sizeRemain = size;
while (It != ItEnd) {
if (*It - InternalPosition > sizeRemain) {
- InternalPosition += sizeRemain;
+ InternalPosition = sizeRemain;
+ CurrentRemainVolume = *It - InternalPosition - sizeRemain;
return true;
} else {
sizeRemain -= *It - InternalPosition;
@@ -72,12 +85,39 @@ public:
++It;
}
}
+ CurrentRemainVolume = 0;
return false;
}
};
+public:
+
+ using TIterator = TIteratorImpl<std::deque<ui32>::const_iterator>;
+ using TReverseIterator = TIteratorImpl<std::deque<ui32>::const_reverse_iterator>;
+
+ template <bool ForReverse>
+ class TIteratorSelector {
+
+ };
+
+ template <>
+ class TIteratorSelector<true> {
+ public:
+ using TIterator = TReverseIterator;
+ };
+
+ template <>
+ class TIteratorSelector<false> {
+ public:
+ using TIterator = TIterator;
+ };
+
TIterator GetIterator() const {
- return TIterator(Filter, GetStartValue());
+ return TIterator(Filter.cbegin(), Filter.cend(), GetStartValue());
+ }
+
+ TReverseIterator GetReverseIterator() const {
+ return TReverseIterator(Filter.crbegin(), Filter.crend(), CurrentValue);
}
TColumnFilter(std::vector<bool>&& values) {
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 64f1f0f6c7b..456426c4df0 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -917,4 +917,15 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b
return true;
}
+int ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow) {
+ auto result = TRawReplaceKey(&x, xRow).CompareNotNull(TRawReplaceKey(&y, yRow));
+ if (result == std::partial_ordering::greater) {
+ return 1;
+ } else if (result == std::partial_ordering::less) {
+ return -1;
+ } else {
+ return 0;
+ }
+}
+
}
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 97b381e98eb..54ae4b58348 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -132,6 +132,7 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar
bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
+int ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 4f4b18f5045..22e319727fa 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -232,7 +232,14 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
marksGranules.MakePrecedingMark(IndexInfo());
Y_VERIFY(!marksGranules.Empty());
- OutNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, IndexInfo());
+ auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, IndexInfo());
+ GranulesContext->AddNotIndexedBatches(outNotIndexed);
+ Y_VERIFY(outNotIndexed.size() <= 1);
+ if (outNotIndexed.size() == 1) {
+ auto it = outNotIndexed.find(0);
+ Y_VERIFY(it != outNotIndexed.end());
+ NotIndexedOutscopeBatch = it->second;
+ }
}
NotIndexed.clear();
ReadyNotIndexed = 0;
@@ -260,37 +267,26 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
out.reserve(ready.size() + 1);
// Prepend not indexed data (less then first granule) before granules for ASC sorting
- if (ReadMetadata->IsAscSorted() && OutNotIndexed.count(0)) {
+ if (ReadMetadata->IsAscSorted() && NotIndexedOutscopeBatch) {
out.push_back({});
- out.back().push_back(OutNotIndexed[0]);
- OutNotIndexed.erase(0);
+ out.back().push_back(NotIndexedOutscopeBatch);
+ NotIndexedOutscopeBatch = nullptr;
}
for (auto&& granule : ready) {
- bool canHaveDups = granule->IsDuplicationsAvailable();
std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = granule->GetReadyBatches();
- // Append not indexed data to granules
- auto itNotIndexed = OutNotIndexed.find(granule->GetGranuleId());
- if (itNotIndexed != OutNotIndexed.end()) {
- auto batch = itNotIndexed->second;
- if (batch && batch->num_rows()) { // TODO: check why it could be empty
- inGranule.push_back(batch);
- canHaveDups = true;
- }
- OutNotIndexed.erase(granule->GetGranuleId());
- }
if (inGranule.empty()) {
continue;
}
- if (canHaveDups) {
+ if (granule->IsDuplicationsAvailable()) {
for (auto& batch : inGranule) {
Y_VERIFY(batch->num_rows());
Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey));
}
#if 1 // optimization
- auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, BatchesToDedup);
+ auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, granule->GetBatchesToDedup());
out.emplace_back(std::move(deduped));
#else
out.push_back({});
@@ -302,10 +298,10 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
}
// Append not indexed data (less then first granule) after granules for DESC sorting
- if (GranulesContext->GetSortingPolicy()->ReadyForAddNotIndexedToEnd() && OutNotIndexed.count(0)) {
+ if (GranulesContext->GetSortingPolicy()->ReadyForAddNotIndexedToEnd() && NotIndexedOutscopeBatch) {
out.push_back({});
- out.back().push_back(OutNotIndexed[0]);
- OutNotIndexed.erase(0);
+ out.back().push_back(NotIndexedOutscopeBatch);
+ NotIndexedOutscopeBatch = nullptr;
}
return out;
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index e8dba569740..ffdc27fb5b1 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -28,11 +28,10 @@ private:
bool OnePhaseReadMode = false;
std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
- THashSet<const void*> BatchesToDedup;
THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch
THashSet<TBlobRange> IndexedBlobs;
ui32 ReadyNotIndexed{ 0 };
- THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append
+ std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch; // outscope granules batch
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
public:
@@ -80,15 +79,9 @@ public:
}
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
- void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
+ void OnBatchReady(const NIndexedReader::TBatch& /*batchInfo*/, std::shared_ptr<arrow::RecordBatch> batch) {
if (batch && batch->num_rows()) {
ReadMetadata->ReadStats->SelectedRows += batch->num_rows();
- if (batchInfo.IsDuplicationsAvailable()) {
- Y_VERIFY(batchInfo.GetOwner().IsDuplicationsAvailable());
- BatchesToDedup.insert(batch.get());
- } else {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, IndexInfo().GetReplaceKey(), false));
- }
}
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index e3cd2e4244d..4efd4c181d1 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -63,8 +63,7 @@ struct TPortionInfo {
size_t NumRecords() const { return Records.size(); }
bool IsSortableInGranule() const {
- return Meta.Produced == TPortionMeta::COMPACTED
- || Meta.Produced == TPortionMeta::SPLIT_COMPACTED;
+ return !CanIntersectOthers();
}
bool AllowEarlyFilter() const {
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index ab853275e1b..d03e55eb670 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -28,4 +28,5 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index e0b9dfc7be9..503e7703de0 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -29,4 +29,5 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index e0b9dfc7be9..503e7703de0 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -29,4 +29,5 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index ab853275e1b..d03e55eb670 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -28,4 +28,5 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 976dd31dac5..6257c1788e4 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -133,13 +133,15 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
}
}
-bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, const ui32 originalRecordsCount) {
+bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
+ const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> futureFilter) {
Y_VERIFY(filter);
Y_VERIFY(!Filter);
Y_VERIFY(!FilterBatch);
Filter = filter;
FilterBatch = filterBatch;
OriginalRecordsCount = originalRecordsCount;
+ FutureFilter = futureFilter;
return Owner->OnFilterReady(*this);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 4198c864543..25f6002a44d 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -29,6 +29,8 @@ private:
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch);
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch);
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, FutureFilter);
+
ui32 OriginalRecordsCount = 0;
YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
@@ -73,7 +75,8 @@ public:
return FilterBatch->num_rows();
}
}
- bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, const ui32 originalRecordsCount);
+ bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
+ const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> futureFilter);
void InitBatch(std::shared_ptr<arrow::RecordBatch> batch);
NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata);
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index 83293a384d1..ea1c4ee2f15 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -14,9 +14,11 @@ TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMe
SortingPolicy = InternalReading ? std::make_shared<TNonSorting>(ReadMetadata) : ReadMetadata->BuildSortingPolicy();
UsedColumns = ReadMetadata->GetUsedColumnIds();
- PostFilterColumns = ReadMetadata->GetUsedColumnIds();
EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds();
- for (auto&& i : EarlyFilterColumns) {
+ FilterStageColumns = SortingPolicy->GetFilterStageColumns();
+
+ PostFilterColumns = ReadMetadata->GetUsedColumnIds();
+ for (auto&& i : FilterStageColumns) {
PostFilterColumns.erase(i);
}
}
@@ -53,4 +55,22 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get
return Owner.GetTasksProcessor();
}
+void TGranulesFillingContext::AddNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>& batches) {
+ std::shared_ptr<arrow::RecordBatch> externalBatch;
+ for (auto it = batches.begin(); it != batches.end(); ++it) {
+ if (!it->first) {
+ externalBatch = it->second;
+ continue;
+ }
+ auto itGranule = Granules.find(it->first);
+ Y_VERIFY(itGranule != Granules.end());
+ itGranule->second.AddNotIndexedBatch(it->second);
+ }
+ THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> resultLocal;
+ if (externalBatch) {
+ resultLocal.emplace(0, externalBatch);
+ }
+ std::swap(batches, resultLocal);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index 2b71adcb028..cab84fc290c 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -21,6 +21,7 @@ private:
THashMap<ui64, NIndexedReader::TGranule> Granules;
YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns);
YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns);
+ std::set<ui32> FilterStageColumns;
std::set<ui32> UsedColumns;
YDB_READONLY_DEF(IOrderPolicy::TPtr, SortingPolicy);
YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters);
@@ -33,6 +34,7 @@ public:
NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const;
+ void AddNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>& batches);
TBatch& GetBatchInfo(const ui32 batchNo);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
@@ -48,7 +50,7 @@ public:
void OnNewBatch(TBatch& batch) {
if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) {
- batch.ResetNoFilter(EarlyFilterColumns);
+ batch.ResetNoFilter(FilterStageColumns);
} else {
batch.ResetNoFilter(UsedColumns);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index bff7044c5b7..7531c0dd176 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -20,13 +20,17 @@ bool TAssembleFilter::DoExecuteImpl() {
FilteredBatch = nullptr;
return true;
}
- if (ReadMetadata->Program && AllowEarlyFilter) {
- auto filter = NOlap::EarlyFilter(batch, ReadMetadata->Program);
- Filter->CombineSequential(filter);
- if (!filter.Apply(batch)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount);
- FilteredBatch = nullptr;
- return true;
+ if (ReadMetadata->Program) {
+ auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program));
+ if (AllowEarlyFilter) {
+ Filter->CombineSequential(*earlyFilter);
+ if (!earlyFilter->Apply(batch)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount);
+ FilteredBatch = nullptr;
+ return true;
+ }
+ } else {
+ EarlyFilter = earlyFilter;
}
}
@@ -51,7 +55,7 @@ bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const {
Y_VERIFY(OriginalCount);
owner.GetCounters().GetOriginalRowsCount()->Add(OriginalCount);
owner.GetCounters().GetAssembleFilterCount()->Add(1);
- batch.InitFilter(Filter, FilteredBatch, OriginalCount);
+ batch.InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter);
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
index c2f65c6d6dc..80ee2430c7b 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
@@ -16,6 +16,7 @@ namespace NKikimr::NOlap::NIndexedReader {
std::shared_ptr<arrow::RecordBatch> FilteredBatch;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
std::shared_ptr<NArrow::TColumnFilter> Filter;
+ std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
const ui32 BatchNo;
ui32 OriginalCount = 0;
bool AllowEarlyFilter = false;
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index 7302e6add52..58283b6051d 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -2,6 +2,7 @@
#include "filling_context.h"
#include <ydb/core/tx/columnshard/engines/portion_info.h>
#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+#include <ydb/core/tx/columnshard/engines/filter.h>
namespace NKikimr::NOlap::NIndexedReader {
@@ -14,7 +15,13 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
Y_VERIFY(!ReadyFlag);
Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo()));
if (batch && batch->num_rows()) {
- ReadyBatches.emplace_back(batch);
+ if (batchInfo.IsSortableInGranule()) {
+ SortableBatches.emplace_back(batch);
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false));
+ } else {
+ NonSortableBatches.emplace_back(batch);
+ BatchesToDedup.insert(batch.get());
+ }
}
Owner->OnBatchReady(batchInfo, batch);
if (WaitBatches.empty()) {
@@ -80,4 +87,18 @@ std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata:
return batches;
}
+void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
+ if (!batch || !batch->num_rows()) {
+ return;
+ }
+ Y_VERIFY(NonSortableBatches.empty());
+ Y_VERIFY(SortableBatches.empty());
+ Y_VERIFY(!NotIndexedBatch);
+ NotIndexedBatch = batch;
+ if (Owner->GetReadMetadata()->Program) {
+ NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program));
+ }
+ DuplicationsAvailableFlag = true;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index b7400339d30..cc6763a364d 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -14,19 +14,36 @@ class TGranulesFillingContext;
class TGranule {
private:
YDB_READONLY(ui64, GranuleId, 0);
- YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, ReadyBatches);
+ std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches;
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, NotIndexedBatch);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotIndexedBatchFutureFilter);
YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
YDB_READONLY_FLAG(Ready, false);
std::deque<TBatch> Batches;
std::set<ui32> WaitBatches;
std::set<ui32> GranuleBatchNumbers;
TGranulesFillingContext* Owner = nullptr;
+ YDB_READONLY_DEF(THashSet<const void*>, BatchesToDedup);
public:
TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
: GranuleId(granuleId)
, Owner(&owner) {
}
+ std::vector<std::shared_ptr<arrow::RecordBatch>> GetReadyBatches() const {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> result;
+ result.reserve(SortableBatches.size() + NonSortableBatches.size() + 1);
+ if (NotIndexedBatch) {
+ result.emplace_back(NotIndexedBatch);
+ }
+ result.insert(result.end(), NonSortableBatches.begin(), NonSortableBatches.end());
+ result.insert(result.end(), SortableBatches.begin(), SortableBatches.end());
+ return result;
+ }
+
+ void AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch);
+
const TGranulesFillingContext& GetOwner() const {
return *Owner;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
index 1c667356728..a20f94f1c95 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
@@ -46,28 +46,34 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule&
}
while (GranulesOutOrderForPortions.size()) {
auto it = OrderedBatches.find(GranulesOutOrderForPortions.front()->GetGranuleId());
+ auto g = GranulesOutOrderForPortions.front();
Y_VERIFY(it != OrderedBatches.end());
- while (it->second.size() && it->second.front()->IsFiltered() && CurrentItemsLimit) {
- auto b = it->second.front();
+ if (!it->second.GetStarted()) {
+ MergeStream.AddIndependentSource(g->GetNotIndexedBatch(), g->GetNotIndexedBatchFutureFilter());
+ it->second.SetStarted(true);
+ }
+ auto& batches = it->second.MutableBatches();
+ while (batches.size() && batches.front()->IsFiltered() && CurrentItemsLimit) {
+ auto b = batches.front();
if (b->IsSortableInGranule()) {
- if (CurrentItemsLimit <= b->GetFilteredRecordsCount()) {
- CurrentItemsLimit = 0;
- } else {
- CurrentItemsLimit -= b->GetFilteredRecordsCount();
- }
+ MergeStream.AddPoolSource(0, b->GetFilterBatch());
} else {
- CurrentItemsLimit += b->GetFilteredRecordsCount();
+ MergeStream.AddIndependentSource(b->GetFilterBatch(), b->GetFutureFilter());
}
OnBatchFilterInitialized(*b, context);
-
- it->second.pop_front();
+ batches.pop_front();
}
- if (!CurrentItemsLimit || it->second.empty()) {
- while (it->second.size()) {
- auto b = it->second.front();
+ if (MergeStream.IsValid()) {
+ while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.Next()) {
+ --CurrentItemsLimit;
+ }
+ }
+ if (!CurrentItemsLimit || batches.empty()) {
+ while (batches.size()) {
+ auto b = batches.front();
context.GetCounters().GetSkippedBytes()->Add(b->GetFetchBytes(context.GetPostFilterColumns()));
b->InitBatch(nullptr);
- it->second.pop_front();
+ batches.pop_front();
}
OrderedBatches.erase(it);
GranulesOutOrderForPortions.pop_front();
@@ -83,7 +89,7 @@ void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) {
for (ui64 granule : granulesOrder) {
TGranule& g = context.GetGranuleVerified(granule);
GranulesOutOrder.emplace_back(&g);
- Y_VERIFY(OrderedBatches.emplace(granule, g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata)).second);
+ Y_VERIFY(OrderedBatches.emplace(granule, TGranuleScanInfo(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata))).second);
}
GranulesOutOrderForPortions = GranulesOutOrder;
}
@@ -102,6 +108,13 @@ std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64,
return result;
}
+TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
+ :TBase(readMetadata)
+ , MergeStream(readMetadata->IndexInfo.GetReplaceKey(), readMetadata->IsDescSorted())
+{
+ CurrentItemsLimit = ReadMetadata->Limit;
+}
+
void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context) {
Y_VERIFY(!!batch.GetFilter());
if (!batch.GetFilteredRecordsCount()) {
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h
index ab14fef95ca..2f697219b66 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h
@@ -1,6 +1,7 @@
#pragma once
#include "granule.h"
#include "read_metadata.h"
+#include "read_filter_merger.h"
namespace NKikimr::NOlap::NIndexedReader {
@@ -21,6 +22,10 @@ public:
using TPtr = std::shared_ptr<IOrderPolicy>;
virtual ~IOrderPolicy() = default;
+ virtual std::set<ui32> GetFilterStageColumns() {
+ return ReadMetadata->GetEarlyFilterColumnIds();
+ }
+
IOrderPolicy(TReadMetadata::TConstPtr readMetadata)
: ReadMetadata(readMetadata)
{
@@ -84,26 +89,44 @@ public:
}
};
+class TGranuleScanInfo {
+private:
+ YDB_ACCESSOR(bool, Started, false);
+ YDB_ACCESSOR_DEF(std::deque<TBatch*>, Batches);
+public:
+ TGranuleScanInfo(std::deque<TBatch*>&& batches)
+ : Batches(std::move(batches))
+ {
+
+ }
+};
+
class TPKSortingWithLimit: public IOrderPolicy {
private:
using TBase = IOrderPolicy;
std::deque<TGranule*> GranulesOutOrder;
std::deque<TGranule*> GranulesOutOrderForPortions;
- THashMap<ui64, std::deque<TBatch*>> OrderedBatches;
+ THashMap<ui64, TGranuleScanInfo> OrderedBatches;
ui32 CurrentItemsLimit = 0;
+ TMergePartialStream MergeStream;
protected:
virtual void DoFill(TGranulesFillingContext& context) override;
virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override;
public:
+ virtual std::set<ui32> GetFilterStageColumns() override {
+ std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds();
+ for (auto&& i : ReadMetadata->GetPKColumnIds()) {
+ result.emplace(i);
+ }
+ return result;
+ }
+
virtual bool CanInterrupt() const override {
return true;
}
- TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
- :TBase(readMetadata) {
- CurrentItemsLimit = ReadMetadata->Limit;
- }
+ TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata);
virtual bool ReadyForAddNotIndexedToEnd() const override {
return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
new file mode 100644
index 00000000000..7456cdc0b72
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -0,0 +1,5 @@
+#include "read_filter_merger.h"
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
new file mode 100644
index 00000000000..d88dda1a689
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -0,0 +1,232 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/formats/arrow_filter.h>
+#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/tx/columnshard/engines/index_info.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <util/generic/hash.h>
+#include <set>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TMergePartialStream {
+private:
+ class TBatchIterator {
+ private:
+ YDB_ACCESSOR(i64, Position, 0);
+ YDB_OPT(ui32, PoolId);
+ std::shared_ptr<arrow::RecordBatch> Batch;
+
+ std::shared_ptr<NArrow::TColumnFilter> Filter;
+ std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;
+ std::shared_ptr<NArrow::TColumnFilter::TReverseIterator> ReverseFilterIterator;
+
+ std::vector<std::shared_ptr<arrow::Array>> Columns;
+ std::vector<std::shared_ptr<arrow::Array>> VersionColumns;
+ int ReverseSortKff;
+ i64 RecordsCount;
+
+ i32 GetFirstPosition() const {
+ if (ReverseSortKff > 0) {
+ return 0;
+ } else {
+ return RecordsCount - 1;
+ }
+ }
+
+ i32 GetLastPosition() const {
+ if (ReverseSortKff > 0) {
+ return RecordsCount - 1;
+ } else {
+ return 0;
+ }
+ }
+
+ public:
+ TBatchIterator(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
+ std::shared_ptr<arrow::Schema> sortSchema, const bool reverseSort, const std::optional<ui32> poolId)
+ : PoolId(poolId)
+ , Batch(batch)
+ , Filter(filter)
+ , ReverseSortKff(reverseSort ? -1 : 1)
+ , RecordsCount(batch->num_rows()) {
+ if (Filter) {
+ if (reverseSort) {
+ ReverseFilterIterator = std::make_shared<NArrow::TColumnFilter::TReverseIterator>(Filter->GetReverseIterator());
+ } else {
+ FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator());
+ }
+ Y_VERIFY(Filter->Size() == RecordsCount);
+ }
+ Position = GetFirstPosition();
+ Y_UNUSED(Batch);
+ Y_VERIFY(batch->num_rows());
+ Y_VERIFY_DEBUG(batch->ValidateFull().ok());
+ for (auto&& i : sortSchema->fields()) {
+ auto c = batch->GetColumnByName(i->name());
+ Y_VERIFY(c);
+ Columns.emplace_back(c);
+ }
+ {
+ auto c = batch->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
+ Y_VERIFY(c);
+ VersionColumns.emplace_back(c);
+ }
+ {
+ auto c = batch->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
+ Y_VERIFY(c);
+ VersionColumns.emplace_back(c);
+ }
+ }
+
+ bool CheckNextBatch(const TBatchIterator& nextIterator) {
+ Y_VERIFY_DEBUG(nextIterator.Columns.size() == Columns.size());
+ return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, 0) * ReverseSortKff < 0;
+ }
+
+ int CompareNoVersion(const TBatchIterator& item) const {
+ Y_VERIFY_DEBUG(item.Columns.size() == Columns.size());
+ return NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position);
+ }
+
+ bool IsDeleted() const {
+ if (FilterIterator) {
+ return FilterIterator->GetCurrentAcceptance();
+ } else if (ReverseFilterIterator) {
+ return ReverseFilterIterator->GetCurrentAcceptance();
+ } else {
+ return false;
+ }
+ }
+
+ bool Next() {
+ bool result = false;
+ if (ReverseSortKff > 0) {
+ result = ++Position < RecordsCount;
+ } else {
+ result = --Position >= 0;
+ }
+ if (FilterIterator) {
+ Y_VERIFY(result == FilterIterator->Next(1));
+ } else if (ReverseFilterIterator) {
+ Y_VERIFY(result == ReverseFilterIterator->Next(1));
+ }
+ return result;
+ }
+
+ bool operator<(const TBatchIterator& item) const {
+ const int result = CompareNoVersion(item) * ReverseSortKff;
+ if (result == 0) {
+ return NArrow::ColumnsCompare(VersionColumns, Position, item.VersionColumns, item.Position) > 0;
+ } else {
+ return result > 0;
+ }
+ }
+ };
+
+ bool NextInHeap(const bool needPop) {
+ if (SortHeap.empty()) {
+ return false;
+ }
+ if (needPop) {
+ std::pop_heap(SortHeap.begin(), SortHeap.end());
+ }
+ if (SortHeap.back().Next()) {
+ std::push_heap(SortHeap.begin(), SortHeap.end());
+ } else if (!SortHeap.back().HasPoolId()) {
+ SortHeap.pop_back();
+ } else {
+ auto it = BatchPools.find(SortHeap.back().GetPoolIdUnsafe());
+ Y_VERIFY(it->second.size());
+ if (it->second.size() == 1) {
+ BatchPools.erase(it);
+ SortHeap.pop_back();
+ } else {
+ it->second.pop_front();
+ TBatchIterator newIterator(it->second.front(), nullptr, SortSchema, Reverse, SortHeap.back().GetPoolIdUnsafe());
+ SortHeap.back().CheckNextBatch(newIterator);
+ std::swap(SortHeap.back(), newIterator);
+ std::push_heap(SortHeap.begin(), SortHeap.end());
+ }
+ }
+ return SortHeap.size();
+ }
+
+ THashMap<ui32, std::deque<std::shared_ptr<arrow::RecordBatch>>> BatchPools;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> IndependentBatches;
+ std::vector<TBatchIterator> SortHeap;
+ std::shared_ptr<arrow::Schema> SortSchema;
+ const bool Reverse;
+public:
+ TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse)
+ : SortSchema(sortSchema)
+ , Reverse(reverse) {
+ Y_VERIFY(SortSchema->num_fields());
+ }
+
+ bool IsValid() const {
+ return SortHeap.size();
+ }
+
+ void AddIndependentSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
+ if (!batch || !batch->num_rows()) {
+ return;
+ }
+ Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema));
+ IndependentBatches.emplace_back(batch);
+ if (!filter || filter->IsTotalAllowFilter()) {
+ SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, {}));
+ } else if (filter->IsTotalDenyFilter()) {
+ return;
+ } else {
+ SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema, Reverse, {}));
+ }
+ std::push_heap(SortHeap.begin(), SortHeap.end());
+ }
+
+ bool HasRecordsInPool(const ui32 poolId) const {
+ auto it = BatchPools.find(poolId);
+ if (it == BatchPools.end()) {
+ return false;
+ }
+ return it->second.size();
+ }
+
+ void AddPoolSource(const ui32 poolId, std::shared_ptr<arrow::RecordBatch> batch) {
+ if (!batch || !batch->num_rows()) {
+ return;
+ }
+ auto it = BatchPools.find(poolId);
+ if (it == BatchPools.end()) {
+ it = BatchPools.emplace(poolId, std::deque<std::shared_ptr<arrow::RecordBatch>>()).first;
+ }
+ it->second.emplace_back(batch);
+ if (it->second.size() == 1) {
+ SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, poolId));
+ std::push_heap(SortHeap.begin(), SortHeap.end());
+ }
+ }
+
+
+
+ bool Next() {
+ while (SortHeap.size()) {
+ std::pop_heap(SortHeap.begin(), SortHeap.end());
+ TBatchIterator mainIterator = std::move(SortHeap.back());
+ SortHeap.pop_back();
+ while (SortHeap.size() && !mainIterator.CompareNoVersion(SortHeap.front())) {
+ NextInHeap(true);
+ }
+ const bool isDeleted = mainIterator.IsDeleted();
+ SortHeap.emplace_back(std::move(mainIterator));
+ NextInHeap(false);
+ if (!isDeleted) {
+ break;
+ }
+ }
+ return SortHeap.size();
+ }
+};
+
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index eff5afd970a..14c771b45ca 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -42,6 +42,14 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
return result;
}
+std::set<ui32> TReadMetadata::GetPKColumnIds() const {
+ std::set<ui32> result;
+ for (auto&& i : IndexInfo.GetPrimaryKey()) {
+ Y_VERIFY(result.emplace(IndexInfo.GetColumnId(i.first)).second);
+ }
+ return result;
+}
+
std::set<ui32> TReadMetadata::GetUsedColumnIds() const {
std::set<ui32> result;
if (PlanStep) {
@@ -55,6 +63,9 @@ std::set<ui32> TReadMetadata::GetUsedColumnIds() const {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name());
result.emplace(IndexInfo.GetColumnId(f->name()));
}
+ for (auto&& i : IndexInfo.GetPrimaryKey()) {
+ Y_VERIFY(result.contains(IndexInfo.GetColumnId(i.first)));
+ }
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 7ea2898dadc..1f1a6f96e0c 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -117,6 +117,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_
std::set<ui32> GetEarlyFilterColumnIds() const;
std::set<ui32> GetUsedColumnIds() const;
+ std::set<ui32> GetPKColumnIds() const;
bool Empty() const {
Y_VERIFY(SelectInfo);