diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-27 14:16:36 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-27 14:41:09 +0300 |
commit | 2ef56dd64f5583ceb3f492ce815ef94dc52f159a (patch) | |
tree | eb42e0e8526a6bf3f3e85aad05b0279a6d95adc9 | |
parent | 678dbd8edb092567800f622b601cb3f380212a3b (diff) | |
download | ydb-2ef56dd64f5583ceb3f492ce815ef94dc52f159a.tar.gz |
KIKIMR-19853: fix incorrect filters usage instead of incorrect method (actual filter) usage. through variative filter records count in different cases.
5 files changed, 9 insertions, 12 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index 3698f32e4d..978d55f4bf 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -5,6 +5,7 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> #include <ydb/library/yverify_stream/yverify_stream.h> +#include <library/cpp/actors/core/log.h> namespace NKikimr::NArrow { @@ -521,7 +522,7 @@ TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) { return TIterator(reverse, expectedSize, LastValue); } else { - Y_ABORT_UNLESS(expectedSize == Size()); + AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse); return TIterator(reverse, Filter, GetStartValue(reverse)); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index f038d81d47..35a796b0ae 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -26,14 +26,6 @@ public: return (AppliedFilter && AppliedFilter->IsTotalDenyFilter()) || (NotAppliedEarlyFilter && NotAppliedEarlyFilter->IsTotalDenyFilter()); } - std::shared_ptr<NArrow::TColumnFilter> GetActualFilter() const { - if (NotAppliedEarlyFilter) { - return NotAppliedEarlyFilter; - } else { - return AppliedFilter; - } - } - TFilterStageData(std::shared_ptr<NArrow::TColumnFilter> appliedFilter, std::shared_ptr<NArrow::TColumnFilter> earlyFilter, std::shared_ptr<arrow::RecordBatch> batch) : TBase(batch) , AppliedFilter(appliedFilter) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp index ef0b6cdf87..ab011a68bb 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp @@ -42,9 +42,10 @@ bool TAssembleFilter::DoExecute() { Y_ABORT_UNLESS(AppliedFilter->Apply(addBatch)); Y_ABORT_UNLESS(NArrow::MergeBatchColumns({ batch, addBatch }, batch, BatchConstructor.GetSchemaColumnNames(), true)); } + AFL_VERIFY(AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data") ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter) - ("filter_columns", FilterColumnIds.size()); + ("filter_columns", FilterColumnIds.size())("af_count", AppliedFilter->Size())("ef_count", earlyFilter ? earlyFilter->Size() : 0); FilteredBatch = batch; return true; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 09c96dc331..dfc1fec7cc 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -105,7 +105,7 @@ void TFetchingInterval::ConstructResult() { } else { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "start_construct_result")("interval_idx", IntervalIdx); } - AFL_VERIFY(!ResultConstructionInProgress); + AFL_VERIFY(!ResultConstructionInProgress)("interval_idx", IntervalIdx); ResultConstructionInProgress = true; auto merger = Context->BuildMerger(); for (auto&& [_, i] : Sources) { @@ -125,6 +125,7 @@ void TFetchingInterval::ConstructResult() { } void TFetchingInterval::OnInitResourcesGuard(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocated")("interval_idx", IntervalIdx); AFL_VERIFY(guard); AFL_VERIFY(!ResourcesGuard); ResourcesGuard = guard; @@ -132,10 +133,12 @@ void TFetchingInterval::OnInitResourcesGuard(const std::shared_ptr<NResourceBrok } void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "fetched")("interval_idx", IntervalIdx); ConstructResult(); } void TFetchingInterval::OnSourceFilterStageReady(const ui32 /*sourceIdx*/) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "filtered")("interval_idx", IntervalIdx); ConstructResult(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index c39bb6d00d..663e08ec6c 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -109,7 +109,7 @@ void TPortionDataSource::DoStartFetchStage() { auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FETCHING"); readAction->SetIsBackgroundProcess(false); THashMap<TBlobRange, ui32> nullBlocks; - NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetActualFilter()); + NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetAppliedFilter()); if (readAction->GetExpectedBlobsCount()) { std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; auto constructor = std::make_shared<TFFColumnsTaskConstructor>(GetContext(), actions, std::move(nullBlocks), columnIds, *this, "ReaderFetcher"); |