aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-27 14:16:36 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-27 14:41:09 +0300
commit2ef56dd64f5583ceb3f492ce815ef94dc52f159a (patch)
treeeb42e0e8526a6bf3f3e85aad05b0279a6d95adc9
parent678dbd8edb092567800f622b601cb3f380212a3b (diff)
downloadydb-2ef56dd64f5583ceb3f492ce815ef94dc52f159a.tar.gz
KIKIMR-19853: fix incorrect filters usage instead of incorrect method (actual filter) usage. through variative filter records count in different cases.
-rw-r--r--ydb/core/formats/arrow/arrow_filter.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp2
5 files changed, 9 insertions, 12 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp
index 3698f32e4d..978d55f4bf 100644
--- a/ydb/core/formats/arrow/arrow_filter.cpp
+++ b/ydb/core/formats/arrow/arrow_filter.cpp
@@ -5,6 +5,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
+#include <library/cpp/actors/core/log.h>
namespace NKikimr::NArrow {
@@ -521,7 +522,7 @@ TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui
if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) {
return TIterator(reverse, expectedSize, LastValue);
} else {
- Y_ABORT_UNLESS(expectedSize == Size());
+ AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse);
return TIterator(reverse, Filter, GetStartValue(reverse));
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
index f038d81d47..35a796b0ae 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
@@ -26,14 +26,6 @@ public:
return (AppliedFilter && AppliedFilter->IsTotalDenyFilter()) || (NotAppliedEarlyFilter && NotAppliedEarlyFilter->IsTotalDenyFilter());
}
- std::shared_ptr<NArrow::TColumnFilter> GetActualFilter() const {
- if (NotAppliedEarlyFilter) {
- return NotAppliedEarlyFilter;
- } else {
- return AppliedFilter;
- }
- }
-
TFilterStageData(std::shared_ptr<NArrow::TColumnFilter> appliedFilter, std::shared_ptr<NArrow::TColumnFilter> earlyFilter, std::shared_ptr<arrow::RecordBatch> batch)
: TBase(batch)
, AppliedFilter(appliedFilter)
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
index ef0b6cdf87..ab011a68bb 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
@@ -42,9 +42,10 @@ bool TAssembleFilter::DoExecute() {
Y_ABORT_UNLESS(AppliedFilter->Apply(addBatch));
Y_ABORT_UNLESS(NArrow::MergeBatchColumns({ batch, addBatch }, batch, BatchConstructor.GetSchemaColumnNames(), true));
}
+ AFL_VERIFY(AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")
("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter)
- ("filter_columns", FilterColumnIds.size());
+ ("filter_columns", FilterColumnIds.size())("af_count", AppliedFilter->Size())("ef_count", earlyFilter ? earlyFilter->Size() : 0);
FilteredBatch = batch;
return true;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index 09c96dc331..dfc1fec7cc 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -105,7 +105,7 @@ void TFetchingInterval::ConstructResult() {
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "start_construct_result")("interval_idx", IntervalIdx);
}
- AFL_VERIFY(!ResultConstructionInProgress);
+ AFL_VERIFY(!ResultConstructionInProgress)("interval_idx", IntervalIdx);
ResultConstructionInProgress = true;
auto merger = Context->BuildMerger();
for (auto&& [_, i] : Sources) {
@@ -125,6 +125,7 @@ void TFetchingInterval::ConstructResult() {
}
void TFetchingInterval::OnInitResourcesGuard(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocated")("interval_idx", IntervalIdx);
AFL_VERIFY(guard);
AFL_VERIFY(!ResourcesGuard);
ResourcesGuard = guard;
@@ -132,10 +133,12 @@ void TFetchingInterval::OnInitResourcesGuard(const std::shared_ptr<NResourceBrok
}
void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "fetched")("interval_idx", IntervalIdx);
ConstructResult();
}
void TFetchingInterval::OnSourceFilterStageReady(const ui32 /*sourceIdx*/) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "filtered")("interval_idx", IntervalIdx);
ConstructResult();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index c39bb6d00d..663e08ec6c 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -109,7 +109,7 @@ void TPortionDataSource::DoStartFetchStage() {
auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FETCHING");
readAction->SetIsBackgroundProcess(false);
THashMap<TBlobRange, ui32> nullBlocks;
- NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetActualFilter());
+ NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetAppliedFilter());
if (readAction->GetExpectedBlobsCount()) {
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
auto constructor = std::make_shared<TFFColumnsTaskConstructor>(GetContext(), actions, std::move(nullBlocks), columnIds, *this, "ReaderFetcher");