diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-08 15:22:01 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-08 15:22:01 +0300 |
commit | 12a3e9b5a765cf68340fa6286661deec892ce3a9 (patch) | |
tree | 17ceb214cca7f6114d52da6e7228551f88df7841 | |
parent | c70598f69da8a7004b8a2a0e2120b62c2584ce2c (diff) | |
download | ydb-12a3e9b5a765cf68340fa6286661deec892ce3a9.tar.gz |
improve order filter usage for pk sorting
24 files changed, 519 insertions, 360 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index 193ce336fc..a47869de0c 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(order_control) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -24,6 +25,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC ydb-core-protos core-formats-arrow columnshard-engines-predicate + engines-reader-order_control tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE @@ -34,7 +36,6 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index 77faafc8c1..4e5ec18491 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(order_control) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -25,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC ydb-core-protos core-formats-arrow columnshard-engines-predicate + engines-reader-order_control tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE @@ -35,7 +37,6 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index 77faafc8c1..4e5ec18491 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(order_control) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -25,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC ydb-core-protos core-formats-arrow columnshard-engines-predicate + engines-reader-order_control tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE @@ -35,7 +37,6 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index 193ce336fc..a47869de0c 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(order_control) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -24,6 +25,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC ydb-core-protos core-formats-arrow columnshard-engines-predicate + engines-reader-order_control tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE @@ -34,7 +36,6 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index f6efac5e72..6b8b4988f2 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -163,12 +163,4 @@ ui64 TBatch::GetUsefulBytes(const ui64 bytes) const { return bytes * FetchedInfo.GetUsefulDataKff(); } -std::optional<ui32> TBatch::GetMergePoolId() const { - if (IsSortableInGranule()) { - return Granule; - } else { - return {}; - } -} - } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 05f2e42ae5..583466d786 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -96,7 +96,6 @@ public: const TBatchAddress& GetBatchAddress() const { return BatchAddress; } - std::optional<ui32> GetMergePoolId() const; ui64 GetUsefulWaitingBytes() const { return GetUsefulBytes(WaitingBytes); diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index eed248d5c9..7d2b7e1548 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -1,5 +1,6 @@ #include "filling_context.h" #include "filling_context.h" +#include "order_control/not_sorted.h" #include <ydb/core/tx/columnshard/engines/indexed_read_data.h> namespace NKikimr::NOlap::NIndexedReader { diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index d70752a41e..d57a6c0b9d 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -1,7 +1,7 @@ #pragma once #include "conveyor_task.h" #include "granule.h" -#include "order_controller.h" +#include "order_control/abstract.h" #include <util/generic/hash.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..c1aa18f423 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(engines-reader-order_control) +target_link_libraries(engines-reader-order_control PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(engines-reader-order_control PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..5f20dc37c5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(engines-reader-order_control) +target_link_libraries(engines-reader-order_control PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(engines-reader-order_control PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..5f20dc37c5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(engines-reader-order_control) +target_link_libraries(engines-reader-order_control PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(engines-reader-order_control PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..c1aa18f423 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(engines-reader-order_control) +target_link_libraries(engines-reader-order_control PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(engines-reader-order_control PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp new file mode 100644 index 0000000000..b565d57306 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp @@ -0,0 +1,48 @@ +#include "abstract.h" +#include <ydb/core/tx/columnshard/engines/reader/filling_context.h> + +namespace NKikimr::NOlap::NIndexedReader { + +void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFillingContext& context) { + auto& batch = batchOriginal.GetFetchedInfo(); + Y_VERIFY(!!batch.GetFilter()); + if (!batch.GetFilteredRecordsCount()) { + context.GetCounters().EmptyFilterCount->Add(1); + context.GetCounters().EmptyFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes()); + context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns())); + batchOriginal.InitBatch(nullptr); + } else { + context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows()); + if (batchOriginal.AskedColumnsAlready(context.GetPostFilterColumns())) { + context.GetCounters().FilterOnlyCount->Add(1); + context.GetCounters().FilterOnlyFetchedBytes->Add(batchOriginal.GetFetchedBytes()); + context.GetCounters().FilterOnlyUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); + context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns())); + + batchOriginal.InitBatch(batch.GetFilterBatch()); + } else { + context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes()); + context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); + + batchOriginal.ResetWithFilter(context.GetPostFilterColumns()); + if (batchOriginal.IsFetchingReady()) { + auto processor = context.GetTasksProcessor(); + if (auto assembleBatchTask = batchOriginal.AssembleTask(processor.GetObject(), context.GetReadMetadata())) { + processor.Add(context, assembleBatchTask); + } + } + + context.GetCounters().TwoPhasesCount->Add(1); + context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes()); + context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batchOriginal.GetUsefulWaitingBytes()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") + ("filtered_count", batch.GetFilterBatch()->num_rows()) + ("blobs_count", batchOriginal.GetWaitingBlobs().size()) + ("columns_count", batchOriginal.GetCurrentColumnIds()->size()) + ("fetch_size", batchOriginal.GetWaitingBytes()) + ; + } + } +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h new file mode 100644 index 0000000000..527c0bd8ed --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h @@ -0,0 +1,82 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/reader/granule.h> +#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> + +namespace NKikimr::NOlap::NIndexedReader { + +class TGranulesFillingContext; + +class IOrderPolicy { +public: + enum class EFeatures: ui32 { + CanInterrupt = 1, + NeedNotAppliedEarlyFilter = 1 << 1 + }; + using TFeatures = ui32; +private: + mutable std::optional<TFeatures> Features; +protected: + TReadMetadata::TConstPtr ReadMetadata; + virtual void DoFill(TGranulesFillingContext& context) = 0; + virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) { + return true; + } + virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0; + virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) { + OnBatchFilterInitialized(batchInfo, context); + return true; + } + + void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context); + virtual TFeatures DoGetFeatures() const { + return 0; + } + TFeatures GetFeatures() const { + if (!Features) { + Features = DoGetFeatures(); + } + return *Features; + } +public: + using TPtr = std::shared_ptr<IOrderPolicy>; + virtual ~IOrderPolicy() = default; + + virtual std::set<ui32> GetFilterStageColumns() { + return ReadMetadata->GetEarlyFilterColumnIds(); + } + + IOrderPolicy(TReadMetadata::TConstPtr readMetadata) + : ReadMetadata(readMetadata) + { + + } + + bool CanInterrupt() const { + return GetFeatures() & (TFeatures)EFeatures::CanInterrupt; + } + + bool NeedNotAppliedEarlyFilter() const { + return GetFeatures() & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; + } + + bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) { + return DoOnFilterReady(batchInfo, granule, context); + } + + + virtual bool ReadyForAddNotIndexedToEnd() const = 0; + + std::vector<TGranule*> DetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + return DoDetachReadyGranules(granulesToOut); + } + + void Fill(TGranulesFillingContext& context) { + DoFill(context); + } + + bool Wakeup(const TGranule& granule, TGranulesFillingContext& context) { + return DoWakeup(granule, context); + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp new file mode 100644 index 0000000000..b31b499583 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp @@ -0,0 +1,28 @@ +#include "default.h" +#include <ydb/core/tx/columnshard/engines/reader/filling_context.h> + +namespace NKikimr::NOlap::NIndexedReader { + +void TAnySorting::DoFill(TGranulesFillingContext& context) { + auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); + for (ui64 granule : granulesOrder) { + TGranule& g = context.GetGranuleVerified(granule); + GranulesOutOrder.emplace_back(&g); + } +} + +std::vector<TGranule*> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + std::vector<TGranule*> result; + while (GranulesOutOrder.size()) { + NIndexedReader::TGranule* granule = GranulesOutOrder.front(); + if (!granule->IsReady()) { + break; + } + result.emplace_back(granule); + Y_VERIFY(granulesToOut.erase(granule->GetGranuleId())); + GranulesOutOrder.pop_front(); + } + return result; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.h b/ydb/core/tx/columnshard/engines/reader/order_control/default.h new file mode 100644 index 0000000000..e56e19f30f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.h @@ -0,0 +1,23 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NIndexedReader { + +class TAnySorting: public IOrderPolicy { +private: + using TBase = IOrderPolicy; + std::deque<TGranule*> GranulesOutOrder; +protected: + virtual void DoFill(TGranulesFillingContext& context) override; + virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; +public: + TAnySorting(TReadMetadata::TConstPtr readMetadata) + :TBase(readMetadata) { + + } + virtual bool ReadyForAddNotIndexedToEnd() const override { + return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp new file mode 100644 index 0000000000..8d6cc168e5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp @@ -0,0 +1,15 @@ +#include "not_sorted.h" + +namespace NKikimr::NOlap::NIndexedReader { + +std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + std::vector<TGranule*> result; + result.reserve(granulesToOut.size()); + for (auto&& i : granulesToOut) { + result.emplace_back(i.second); + } + granulesToOut.clear(); + return result; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h new file mode 100644 index 0000000000..499d929d03 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h @@ -0,0 +1,26 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NIndexedReader { + +class TNonSorting: public IOrderPolicy { +private: + using TBase = IOrderPolicy; +protected: + virtual void DoFill(TGranulesFillingContext& /*context*/) override { + } + + virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; +public: + TNonSorting(TReadMetadata::TConstPtr readMetadata) + :TBase(readMetadata) + { + + } + + virtual bool ReadyForAddNotIndexedToEnd() const override { + return true; + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp new file mode 100644 index 0000000000..ead178faca --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp @@ -0,0 +1,102 @@ +#include "pk_with_limit.h" +#include <ydb/core/tx/columnshard/engines/reader/filling_context.h> + +namespace NKikimr::NOlap::NIndexedReader { + +bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingContext& context) { + Y_VERIFY(ReadMetadata->Limit); + if (!CurrentItemsLimit) { + return false; + } + Y_VERIFY(GranulesOutOrderForPortions.size()); + if (GranulesOutOrderForPortions.front().GetGranule()->GetGranuleId() != granule.GetGranuleId()) { + return false; + } + while (GranulesOutOrderForPortions.size() && CurrentItemsLimit) { + auto& g = GranulesOutOrderForPortions.front(); + // granule have to wait NotIndexedBatch initialization, at first (NotIndexedBatchReadyFlag initialization). + // other batches will be delivered in OrderedBatches[granuleId] order (not sortable at first in according to granule.SortBatchesByPK) + if (!g.GetGranule()->IsNotIndexedBatchReady()) { + break; + } + if (g.Start()) { + ++CountProcessedGranules; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size()); + MergeStream.AddPoolSource({}, g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter()); + } + auto& batches = g.GetOrderedBatches(); + while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) { + TBatch* b = batches.front(); + std::optional<ui32> batchPoolId; + if (b->IsSortableInGranule()) { + ++CountSorted; + batchPoolId = 0; + } else { + ++CountNotSorted; + } + MergeStream.AddPoolSource(batchPoolId, b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter()); + OnBatchFilterInitialized(*b, context); + batches.pop_front(); + while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.DrainCurrent()) { + if (!--CurrentItemsLimit) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "stop_on_limit") + ("limit", ReadMetadata->Limit)("sorted_count", CountSorted)("unsorted_count", CountNotSorted)("granules_count", CountProcessedGranules); + } + } + } + if (batches.empty()) { + GranulesOutOrderForPortions.pop_front(); + } else { + break; + } + } + while (GranulesOutOrderForPortions.size() && !CurrentItemsLimit) { + auto& g = GranulesOutOrderForPortions.front(); + g.GetGranule()->AddNotIndexedBatch(nullptr); + auto& batches = g.GetOrderedBatches(); + while (batches.size()) { + auto b = batches.front(); + context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns())); + b->InitBatch(nullptr); + batches.pop_front(); + } + GranulesOutOrderForPortions.pop_front(); + } + return true; +} + +bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) { + return Wakeup(granule, context); +} + +void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { + auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); + for (ui64 granule : granulesOrder) { + TGranule& g = context.GetGranuleVerified(granule); + GranulesOutOrder.emplace_back(&g); + GranulesOutOrderForPortions.emplace_back(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), &g); + } +} + +std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + std::vector<TGranule*> result; + while (GranulesOutOrder.size()) { + NIndexedReader::TGranule* granule = GranulesOutOrder.front(); + if (!granule->IsReady()) { + break; + } + result.emplace_back(granule); + Y_VERIFY(granulesToOut.erase(granule->GetGranuleId())); + GranulesOutOrder.pop_front(); + } + return result; +} + +TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) + :TBase(readMetadata) + , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), readMetadata->IsDescSorted()) +{ + CurrentItemsLimit = ReadMetadata->Limit; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h new file mode 100644 index 0000000000..36ec23b9c9 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h @@ -0,0 +1,72 @@ +#pragma once +#include "abstract.h" +#include <ydb/core/tx/columnshard/engines/reader/read_filter_merger.h> + +namespace NKikimr::NOlap::NIndexedReader { + +class TGranuleOrdered { +private: + bool StartedFlag = false; + std::deque<TBatch*> OrderedBatches; + TGranule* Granule = nullptr; +public: + bool Start() { + if (!StartedFlag) { + StartedFlag = true; + return true; + } else { + return false; + } + + } + + TGranuleOrdered(std::deque<TBatch*>&& orderedBatches, TGranule* granule) + : OrderedBatches(std::move(orderedBatches)) + , Granule(granule) + { + } + + std::deque<TBatch*>& GetOrderedBatches() noexcept { + return OrderedBatches; + } + + TGranule* GetGranule() const noexcept { + return Granule; + } +}; + +class TPKSortingWithLimit: public IOrderPolicy { +private: + using TBase = IOrderPolicy; + std::deque<TGranule*> GranulesOutOrder; + std::deque<TGranuleOrdered> GranulesOutOrderForPortions; + ui32 CurrentItemsLimit = 0; + ui32 CountProcessedGranules = 0; + ui32 CountNotSorted = 0; + ui32 CountSorted = 0; + TMergePartialStream MergeStream; +protected: + virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override; + virtual void DoFill(TGranulesFillingContext& context) override; + virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; + virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override; + virtual TFeatures DoGetFeatures() const override { + return (TFeatures)EFeatures::CanInterrupt & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; + } + +public: + virtual std::set<ui32> GetFilterStageColumns() override { + std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds(); + for (auto&& i : ReadMetadata->GetPKColumnIds()) { + result.emplace(i); + } + return result; + } + + TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata); + virtual bool ReadyForAddNotIndexedToEnd() const override { + return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp deleted file mode 100644 index 1e2beba67e..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp +++ /dev/null @@ -1,159 +0,0 @@ -#include "order_controller.h" -#include "filling_context.h" - -namespace NKikimr::NOlap::NIndexedReader { - -void TAnySorting::DoFill(TGranulesFillingContext& context) { - auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); - for (ui64 granule : granulesOrder) { - TGranule& g = context.GetGranuleVerified(granule); - GranulesOutOrder.emplace_back(&g); - } -} - -std::vector<TGranule*> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { - std::vector<TGranule*> result; - while (GranulesOutOrder.size()) { - NIndexedReader::TGranule* granule = GranulesOutOrder.front(); - if (!granule->IsReady()) { - break; - } - result.emplace_back(granule); - Y_VERIFY(granulesToOut.erase(granule->GetGranuleId())); - GranulesOutOrder.pop_front(); - } - return result; -} - -std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { - std::vector<TGranule*> result; - result.reserve(granulesToOut.size()); - for (auto&& i : granulesToOut) { - result.emplace_back(i.second); - } - granulesToOut.clear(); - return result; -} - -bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingContext& context) { - Y_VERIFY(ReadMetadata->Limit); - if (!CurrentItemsLimit) { - return false; - } - Y_VERIFY(GranulesOutOrderForPortions.size()); - if (GranulesOutOrderForPortions.front().GetGranule()->GetGranuleId() != granule.GetGranuleId()) { - return false; - } - while (GranulesOutOrderForPortions.size()) { - auto& g = GranulesOutOrderForPortions.front(); - // granule have to wait NotIndexedBatch initialization, at first (StartableFlag initialization). - // other batches will be delivered in OrderedBatches[granuleId] order - if (!g.GetGranule()->IsNotIndexedBatchReady()) { - break; - } - if (g.Start()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size()); - MergeStream.AddPoolSource({}, g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter()); - } - auto& batches = g.GetBatches(); - while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) { - auto b = batches.front(); - MergeStream.AddPoolSource(b->GetMergePoolId(), b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter()); - OnBatchFilterInitialized(*b, context); - batches.pop_front(); - } - while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.DrainCurrent()) { - --CurrentItemsLimit; - } - if (!CurrentItemsLimit || batches.empty()) { - while (batches.size()) { - auto b = batches.front(); - context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns())); - b->InitBatch(nullptr); - batches.pop_front(); - } - GranulesOutOrderForPortions.pop_front(); - } else { - break; - } - } - return true; -} - -bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) { - return Wakeup(granule, context); -} - -void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { - auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); - for (ui64 granule : granulesOrder) { - TGranule& g = context.GetGranuleVerified(granule); - GranulesOutOrder.emplace_back(&g); - GranulesOutOrderForPortions.emplace_back(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), &g); - } -} - -std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { - std::vector<TGranule*> result; - while (GranulesOutOrder.size()) { - NIndexedReader::TGranule* granule = GranulesOutOrder.front(); - if (!granule->IsReady()) { - break; - } - result.emplace_back(granule); - Y_VERIFY(granulesToOut.erase(granule->GetGranuleId())); - GranulesOutOrder.pop_front(); - } - return result; -} - -TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) - :TBase(readMetadata) - , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), readMetadata->IsDescSorted()) -{ - CurrentItemsLimit = ReadMetadata->Limit; -} - -void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFillingContext& context) { - auto& batch = batchOriginal.GetFetchedInfo(); - Y_VERIFY(!!batch.GetFilter()); - if (!batch.GetFilteredRecordsCount()) { - context.GetCounters().EmptyFilterCount->Add(1); - context.GetCounters().EmptyFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes()); - context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns())); - batchOriginal.InitBatch(nullptr); - } else { - context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows()); - if (batchOriginal.AskedColumnsAlready(context.GetPostFilterColumns())) { - context.GetCounters().FilterOnlyCount->Add(1); - context.GetCounters().FilterOnlyFetchedBytes->Add(batchOriginal.GetFetchedBytes()); - context.GetCounters().FilterOnlyUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); - context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns())); - - batchOriginal.InitBatch(batch.GetFilterBatch()); - } else { - context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes()); - context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); - - batchOriginal.ResetWithFilter(context.GetPostFilterColumns()); - if (batchOriginal.IsFetchingReady()) { - auto processor = context.GetTasksProcessor(); - if (auto assembleBatchTask = batchOriginal.AssembleTask(processor.GetObject(), context.GetReadMetadata())) { - processor.Add(context, assembleBatchTask); - } - } - - context.GetCounters().TwoPhasesCount->Add(1); - context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes()); - context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batchOriginal.GetUsefulWaitingBytes()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") - ("filtered_count", batch.GetFilterBatch()->num_rows()) - ("blobs_count", batchOriginal.GetWaitingBlobs().size()) - ("columns_count", batchOriginal.GetCurrentColumnIds()->size()) - ("fetch_size", batchOriginal.GetWaitingBytes()) - ; - } - } -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h deleted file mode 100644 index 66f2fba42b..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.h +++ /dev/null @@ -1,186 +0,0 @@ -#pragma once -#include "granule.h" -#include "read_metadata.h" -#include "read_filter_merger.h" - -namespace NKikimr::NOlap::NIndexedReader { - -class TGranulesFillingContext; - -class IOrderPolicy { -public: - enum class EFeatures: ui32 { - CanInterrupt = 1, - NeedNotAppliedEarlyFilter = 1 << 1 - }; - using TFeatures = ui32; -private: - mutable std::optional<TFeatures> Features; -protected: - TReadMetadata::TConstPtr ReadMetadata; - virtual void DoFill(TGranulesFillingContext& context) = 0; - virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) { - return true; - } - virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0; - virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) { - OnBatchFilterInitialized(batchInfo, context); - return true; - } - - void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context); - virtual TFeatures DoGetFeatures() const { - return 0; - } - TFeatures GetFeatures() const { - if (!Features) { - Features = DoGetFeatures(); - } - return *Features; - } -public: - using TPtr = std::shared_ptr<IOrderPolicy>; - virtual ~IOrderPolicy() = default; - - virtual std::set<ui32> GetFilterStageColumns() { - return ReadMetadata->GetEarlyFilterColumnIds(); - } - - IOrderPolicy(TReadMetadata::TConstPtr readMetadata) - : ReadMetadata(readMetadata) - { - - } - - bool CanInterrupt() const { - return GetFeatures() & (TFeatures)EFeatures::CanInterrupt; - } - - bool NeedNotAppliedEarlyFilter() const { - return GetFeatures() & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; - } - - bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) { - return DoOnFilterReady(batchInfo, granule, context); - } - - - virtual bool ReadyForAddNotIndexedToEnd() const = 0; - - std::vector<TGranule*> DetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { - return DoDetachReadyGranules(granulesToOut); - } - - void Fill(TGranulesFillingContext& context) { - DoFill(context); - } - - bool Wakeup(const TGranule& granule, TGranulesFillingContext& context) { - return DoWakeup(granule, context); - } -}; - -class TNonSorting: public IOrderPolicy { -private: - using TBase = IOrderPolicy; -protected: - virtual void DoFill(TGranulesFillingContext& /*context*/) override { - } - - virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; -public: - TNonSorting(TReadMetadata::TConstPtr readMetadata) - :TBase(readMetadata) - { - - } - - virtual bool ReadyForAddNotIndexedToEnd() const override { - return true; - } -}; - -class TAnySorting: public IOrderPolicy { -private: - using TBase = IOrderPolicy; - std::deque<TGranule*> GranulesOutOrder; -protected: - virtual void DoFill(TGranulesFillingContext& context) override; - virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; -public: - TAnySorting(TReadMetadata::TConstPtr readMetadata) - :TBase(readMetadata) { - - } - virtual bool ReadyForAddNotIndexedToEnd() const override { - return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); - } -}; - -class TGranuleOrdered { -private: - bool StartedFlag = false; - std::deque<TBatch*> Batches; - const TGranule* Granule = nullptr; -public: - bool Start() { - if (!StartedFlag) { - StartedFlag = true; - return true; - } else { - return false; - } - - } - - TGranuleOrdered(std::deque<TBatch*>&& batches, TGranule* granule) - : Batches(std::move(batches)) - , Granule(granule) - { - } - - const std::deque<TBatch*>& GetBatches() const noexcept { - return Batches; - } - - std::deque<TBatch*>& GetBatches() noexcept { - return Batches; - } - - const TGranule* GetGranule() const noexcept { - return Granule; - } -}; - -class TPKSortingWithLimit: public IOrderPolicy { -private: - using TBase = IOrderPolicy; - std::deque<TGranule*> GranulesOutOrder; - std::deque<TGranuleOrdered> GranulesOutOrderForPortions; - ui32 CurrentItemsLimit = 0; - TMergePartialStream MergeStream; -protected: - virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override; - virtual void DoFill(TGranulesFillingContext& context) override; - virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; - virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override; - virtual TFeatures DoGetFeatures() const override { - return (TFeatures)EFeatures::CanInterrupt & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; - } - -public: - virtual std::set<ui32> GetFilterStageColumns() override { - std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds(); - for (auto&& i : ReadMetadata->GetPKColumnIds()) { - result.emplace(i); - } - return result; - } - - TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata); - virtual bool ReadyForAddNotIndexedToEnd() const override { - return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 6f43e2e95d..225bc1058c 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -1,5 +1,6 @@ #include "read_metadata.h" -#include "order_controller.h" +#include "order_control/default.h" +#include "order_control/pk_with_limit.h" #include <ydb/core/tx/columnshard/columnshard__index_scan.h> #include <ydb/core/tx/columnshard/columnshard__stats_scan.h> |