aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-22 12:29:41 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-22 12:29:41 +0300
commitcd3bc4dfb33095ad45720b6382c948d5a0f99bf9 (patch)
tree5eb59ad8ebfb728bd0ff47cf542359618a05c849
parent1b05fbaa2cdc845a2c2c7c602f91294a01544834 (diff)
downloadydb-cd3bc4dfb33095ad45720b6382c948d5a0f99bf9.tar.gz
early filter after compaction test
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp32
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp4
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h18
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.cpp20
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h11
7 files changed, 86 insertions, 12 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 1758e8b595..543f1daeed 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -1338,6 +1338,38 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(expectedYson, ysonResult);
}
+ Y_UNIT_TEST(CheckEarlyFilterOnEmptySelect) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+
+ TLocalHelper(kikimr).CreateTestOlapTable();
+ auto csController = NYDBTest::TControllers::RegisterCSController<NYDBTest::NColumnShard::TController>();
+ ui32 rowsCount = 0;
+ {
+ ui32 i = 0;
+ const ui32 rowsPack = 20;
+ const TInstant start = Now();
+ while (!csController->HasCompactions() && Now() - start < TDuration::Seconds(100)) {
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * rowsPack, rowsPack);
+ ++i;
+ rowsCount += rowsPack;
+ }
+ }
+ Sleep(TDuration::Seconds(10));
+ auto tableClient = kikimr.GetTableClient();
+ auto selectQuery = TString(R"(
+ SELECT * FROM `/Root/olapStore/olapTable`
+ WHERE uid='dsfdfsd'
+ LIMIT 10;
+ )");
+
+ auto rows = ExecuteScanQuery(tableClient, selectQuery);
+ Cerr << csController->GetFilteredRecordsCount().Val() << Endl;
+ Y_VERIFY(csController->GetFilteredRecordsCount().Val() * 10 <= rowsCount);
+ UNIT_ASSERT(rows.size() == 0);
+ }
+
Y_UNIT_TEST(ExtractRanges) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index e26bb9aac2..e8909808bf 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -3,6 +3,7 @@
#include "index_logic_logs.h"
#include "indexed_read_data.h"
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
#include <ydb/library/conclusion/status.h>
@@ -464,7 +465,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
return {};
}
}
-
+ NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes);
Y_VERIFY(!changes->SwitchedPortions.empty());
return changes;
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 1c6951a0bf..d54f79f9b6 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -94,18 +94,8 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
}
continue;
}
-#if 0 // optimization
- auto deduped = SliceSortedBatches(slices, description);
- for (auto& batch : deduped) {
- if (batch && batch->num_rows()) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey));
- out.push_back(batch);
- }
- }
-#else
auto batch = NArrow::CombineSortedBatches(slices, description);
out.push_back(batch);
-#endif
}
return out;
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index aaf404fd29..ad0335a9ae 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -1,5 +1,6 @@
#include "filter_assembler.h"
#include <ydb/core/tx/columnshard/engines/filter.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
namespace NKikimr::NOlap::NIndexedReader {
@@ -25,9 +26,12 @@ bool TAssembleFilter::DoExecuteImpl() {
if (AllowEarlyFilter) {
Filter = std::make_shared<NArrow::TColumnFilter>(Filter->CombineSequentialAnd(*earlyFilter));
if (!earlyFilter->Apply(batch)) {
+ NYDBTest::TControllers::GetColumnShardController()->OnAfterFilterAssembling(batch);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount);
FilteredBatch = nullptr;
return true;
+ } else {
+ NYDBTest::TControllers::GetColumnShardController()->OnAfterFilterAssembling(batch);
}
} else if (BatchesOrderPolicy->NeedNotAppliedEarlyFilter()) {
EarlyFilter = earlyFilter;
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index b0d30f67bb..48c8e25541 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -7,6 +7,12 @@
namespace NKikimr::NOlap::NIndexedReader {
class IOrderPolicy;
}
+namespace NKikimr::NOlap {
+class TColumnEngineChanges;
+}
+namespace arrow {
+class RecordBatch;
+}
namespace NKikimr::NYDBTest {
@@ -17,6 +23,12 @@ protected:
virtual bool DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> /*policy*/) {
return true;
}
+ virtual bool DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& /*batch*/) {
+ return true;
+ }
+ virtual bool DoOnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& /*changes*/) {
+ return true;
+ }
public:
using TPtr = std::shared_ptr<ICSController>;
virtual ~ICSController() = default;
@@ -24,6 +36,12 @@ public:
OnSortingPolicyCounter.Inc();
return DoOnSortingPolicy(policy);
}
+ bool OnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) {
+ return DoOnAfterFilterAssembling(batch);
+ }
+ bool OnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& changes) {
+ return DoOnStartCompaction(changes);
+ }
};
class TControllers {
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.cpp b/ydb/core/tx/columnshard/hooks/testing/controller.cpp
index 2ed500cae7..6c04ea7767 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.cpp
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.cpp
@@ -1,6 +1,8 @@
#include "controller.h"
#include <ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h>
#include <ydb/core/tx/columnshard/engines/reader/order_control/default.h>
+#include <ydb/core/tx/columnshard/engines/column_engine.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::NYDBTest::NColumnShard {
@@ -19,4 +21,22 @@ bool TController::HasPKSortingOnly() const {
return SortingWithLimit.Val() && !AnySorting.Val();
}
+bool TController::DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) {
+ if (batch) {
+ FilteredRecordsCount.Add(batch->num_rows());
+ }
+ return true;
+}
+
+bool TController::DoOnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& changes) {
+ if (changes->CompactionInfo) {
+ if (changes->CompactionInfo->InGranule()) {
+ InternalCompactions.Inc();
+ } else {
+ SplitCompactions.Inc();
+ }
+ }
+ return true;
+}
+
}
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index bbcafab436..16db6d8e2c 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -7,10 +7,19 @@ class TController: public ICSController {
private:
YDB_READONLY(TAtomicCounter, SortingWithLimit, 0);
YDB_READONLY(TAtomicCounter, AnySorting, 0);
+ YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0);
+ YDB_READONLY(TAtomicCounter, InternalCompactions, 0);
+ YDB_READONLY(TAtomicCounter, SplitCompactions, 0);
protected:
- virtual bool DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> policy);
+ virtual bool DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> policy) override;
+ virtual bool DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) override;
+ virtual bool DoOnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& changes) override;
+
public:
bool HasPKSortingOnly() const;
+ bool HasCompactions() const {
+ return SplitCompactions.Val() + InternalCompactions.Val();
+ }
};
}