diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-22 12:29:41 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-22 12:29:41 +0300 |
commit | cd3bc4dfb33095ad45720b6382c948d5a0f99bf9 (patch) | |
tree | 5eb59ad8ebfb728bd0ff47cf542359618a05c849 | |
parent | 1b05fbaa2cdc845a2c2c7c602f91294a01544834 (diff) | |
download | ydb-cd3bc4dfb33095ad45720b6382c948d5a0f99bf9.tar.gz |
early filter after compaction test
7 files changed, 86 insertions, 12 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 1758e8b595..543f1daeed 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1338,6 +1338,38 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(expectedYson, ysonResult); } + Y_UNIT_TEST(CheckEarlyFilterOnEmptySelect) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + + TLocalHelper(kikimr).CreateTestOlapTable(); + auto csController = NYDBTest::TControllers::RegisterCSController<NYDBTest::NColumnShard::TController>(); + ui32 rowsCount = 0; + { + ui32 i = 0; + const ui32 rowsPack = 20; + const TInstant start = Now(); + while (!csController->HasCompactions() && Now() - start < TDuration::Seconds(100)) { + WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * rowsPack, rowsPack); + ++i; + rowsCount += rowsPack; + } + } + Sleep(TDuration::Seconds(10)); + auto tableClient = kikimr.GetTableClient(); + auto selectQuery = TString(R"( + SELECT * FROM `/Root/olapStore/olapTable` + WHERE uid='dsfdfsd' + LIMIT 10; + )"); + + auto rows = ExecuteScanQuery(tableClient, selectQuery); + Cerr << csController->GetFilteredRecordsCount().Val() << Endl; + Y_VERIFY(csController->GetFilteredRecordsCount().Val() * 10 <= rowsCount); + UNIT_ASSERT(rows.size() == 0); + } + Y_UNIT_TEST(ExtractRanges) { auto settings = TKikimrSettings() .SetWithSampleTables(false); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index e26bb9aac2..e8909808bf 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -3,6 +3,7 @@ #include "index_logic_logs.h" #include "indexed_read_data.h" +#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/formats/arrow/one_batch_input_stream.h> #include <ydb/core/formats/arrow/merging_sorted_input_stream.h> #include <ydb/library/conclusion/status.h> @@ -464,7 +465,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: return {}; } } - + NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes); Y_VERIFY(!changes->SwitchedPortions.empty()); return changes; } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 1c6951a0bf..d54f79f9b6 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -94,18 +94,8 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v } continue; } -#if 0 // optimization - auto deduped = SliceSortedBatches(slices, description); - for (auto& batch : deduped) { - if (batch && batch->num_rows()) { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); - out.push_back(batch); - } - } -#else auto batch = NArrow::CombineSortedBatches(slices, description); out.push_back(batch); -#endif } return out; diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index aaf404fd29..ad0335a9ae 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -1,5 +1,6 @@ #include "filter_assembler.h" #include <ydb/core/tx/columnshard/engines/filter.h> +#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> namespace NKikimr::NOlap::NIndexedReader { @@ -25,9 +26,12 @@ bool TAssembleFilter::DoExecuteImpl() { if (AllowEarlyFilter) { Filter = std::make_shared<NArrow::TColumnFilter>(Filter->CombineSequentialAnd(*earlyFilter)); if (!earlyFilter->Apply(batch)) { + NYDBTest::TControllers::GetColumnShardController()->OnAfterFilterAssembling(batch); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount); FilteredBatch = nullptr; return true; + } else { + NYDBTest::TControllers::GetColumnShardController()->OnAfterFilterAssembling(batch); } } else if (BatchesOrderPolicy->NeedNotAppliedEarlyFilter()) { EarlyFilter = earlyFilter; diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index b0d30f67bb..48c8e25541 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -7,6 +7,12 @@ namespace NKikimr::NOlap::NIndexedReader { class IOrderPolicy; } +namespace NKikimr::NOlap { +class TColumnEngineChanges; +} +namespace arrow { +class RecordBatch; +} namespace NKikimr::NYDBTest { @@ -17,6 +23,12 @@ protected: virtual bool DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> /*policy*/) { return true; } + virtual bool DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& /*batch*/) { + return true; + } + virtual bool DoOnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& /*changes*/) { + return true; + } public: using TPtr = std::shared_ptr<ICSController>; virtual ~ICSController() = default; @@ -24,6 +36,12 @@ public: OnSortingPolicyCounter.Inc(); return DoOnSortingPolicy(policy); } + bool OnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) { + return DoOnAfterFilterAssembling(batch); + } + bool OnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& changes) { + return DoOnStartCompaction(changes); + } }; class TControllers { diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.cpp b/ydb/core/tx/columnshard/hooks/testing/controller.cpp index 2ed500cae7..6c04ea7767 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.cpp +++ b/ydb/core/tx/columnshard/hooks/testing/controller.cpp @@ -1,6 +1,8 @@ #include "controller.h" #include <ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h> #include <ydb/core/tx/columnshard/engines/reader/order_control/default.h> +#include <ydb/core/tx/columnshard/engines/column_engine.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::NYDBTest::NColumnShard { @@ -19,4 +21,22 @@ bool TController::HasPKSortingOnly() const { return SortingWithLimit.Val() && !AnySorting.Val(); } +bool TController::DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) { + if (batch) { + FilteredRecordsCount.Add(batch->num_rows()); + } + return true; +} + +bool TController::DoOnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& changes) { + if (changes->CompactionInfo) { + if (changes->CompactionInfo->InGranule()) { + InternalCompactions.Inc(); + } else { + SplitCompactions.Inc(); + } + } + return true; +} + } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index bbcafab436..16db6d8e2c 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -7,10 +7,19 @@ class TController: public ICSController { private: YDB_READONLY(TAtomicCounter, SortingWithLimit, 0); YDB_READONLY(TAtomicCounter, AnySorting, 0); + YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0); + YDB_READONLY(TAtomicCounter, InternalCompactions, 0); + YDB_READONLY(TAtomicCounter, SplitCompactions, 0); protected: - virtual bool DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> policy); + virtual bool DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> policy) override; + virtual bool DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) override; + virtual bool DoOnStartCompaction(const std::shared_ptr<NOlap::TColumnEngineChanges>& changes) override; + public: bool HasPKSortingOnly() const; + bool HasCompactions() const { + return SplitCompactions.Val() + InternalCompactions.Val(); + } }; } |