aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-08 15:22:01 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-08 15:22:01 +0300
commit12a3e9b5a765cf68340fa6286661deec892ce3a9 (patch)
tree17ceb214cca7f6114d52da6e7228551f88df7841
parentc70598f69da8a7004b8a2a0e2120b62c2584ce2c (diff)
downloadydb-12a3e9b5a765cf68340fa6286661deec892ce3a9.tar.gz
improve order filter usage for pk sorting
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp48
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/abstract.h82
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/default.cpp28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/default.h23
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h26
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp102
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h72
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.cpp159
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.h186
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp3
24 files changed, 519 insertions, 360 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index 193ce336fc..a47869de0c 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(order_control)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -24,6 +25,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
ydb-core-protos
core-formats-arrow
columnshard-engines-predicate
+ engines-reader-order_control
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
@@ -34,7 +36,6 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index 77faafc8c1..4e5ec18491 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(order_control)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -25,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
ydb-core-protos
core-formats-arrow
columnshard-engines-predicate
+ engines-reader-order_control
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
@@ -35,7 +37,6 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index 77faafc8c1..4e5ec18491 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(order_control)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -25,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
ydb-core-protos
core-formats-arrow
columnshard-engines-predicate
+ engines-reader-order_control
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
@@ -35,7 +37,6 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index 193ce336fc..a47869de0c 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(order_control)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -24,6 +25,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
ydb-core-protos
core-formats-arrow
columnshard-engines-predicate
+ engines-reader-order_control
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
@@ -34,7 +36,6 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index f6efac5e72..6b8b4988f2 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -163,12 +163,4 @@ ui64 TBatch::GetUsefulBytes(const ui64 bytes) const {
return bytes * FetchedInfo.GetUsefulDataKff();
}
-std::optional<ui32> TBatch::GetMergePoolId() const {
- if (IsSortableInGranule()) {
- return Granule;
- } else {
- return {};
- }
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 05f2e42ae5..583466d786 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -96,7 +96,6 @@ public:
const TBatchAddress& GetBatchAddress() const {
return BatchAddress;
}
- std::optional<ui32> GetMergePoolId() const;
ui64 GetUsefulWaitingBytes() const {
return GetUsefulBytes(WaitingBytes);
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index eed248d5c9..7d2b7e1548 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -1,5 +1,6 @@
#include "filling_context.h"
#include "filling_context.h"
+#include "order_control/not_sorted.h"
#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
namespace NKikimr::NOlap::NIndexedReader {
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index d70752a41e..d57a6c0b9d 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -1,7 +1,7 @@
#pragma once
#include "conveyor_task.h"
#include "granule.h"
-#include "order_controller.h"
+#include "order_control/abstract.h"
#include <util/generic/hash.h>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..c1aa18f423
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-reader-order_control)
+target_link_libraries(engines-reader-order_control PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(engines-reader-order_control PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..5f20dc37c5
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-reader-order_control)
+target_link_libraries(engines-reader-order_control PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(engines-reader-order_control PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..5f20dc37c5
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-reader-order_control)
+target_link_libraries(engines-reader-order_control PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(engines-reader-order_control PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..c1aa18f423
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-reader-order_control)
+target_link_libraries(engines-reader-order_control PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(engines-reader-order_control PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
new file mode 100644
index 0000000000..b565d57306
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
@@ -0,0 +1,48 @@
+#include "abstract.h"
+#include <ydb/core/tx/columnshard/engines/reader/filling_context.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFillingContext& context) {
+ auto& batch = batchOriginal.GetFetchedInfo();
+ Y_VERIFY(!!batch.GetFilter());
+ if (!batch.GetFilteredRecordsCount()) {
+ context.GetCounters().EmptyFilterCount->Add(1);
+ context.GetCounters().EmptyFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes());
+ context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns()));
+ batchOriginal.InitBatch(nullptr);
+ } else {
+ context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows());
+ if (batchOriginal.AskedColumnsAlready(context.GetPostFilterColumns())) {
+ context.GetCounters().FilterOnlyCount->Add(1);
+ context.GetCounters().FilterOnlyFetchedBytes->Add(batchOriginal.GetFetchedBytes());
+ context.GetCounters().FilterOnlyUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
+ context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns()));
+
+ batchOriginal.InitBatch(batch.GetFilterBatch());
+ } else {
+ context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes());
+ context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
+
+ batchOriginal.ResetWithFilter(context.GetPostFilterColumns());
+ if (batchOriginal.IsFetchingReady()) {
+ auto processor = context.GetTasksProcessor();
+ if (auto assembleBatchTask = batchOriginal.AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
+ processor.Add(context, assembleBatchTask);
+ }
+ }
+
+ context.GetCounters().TwoPhasesCount->Add(1);
+ context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes());
+ context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batchOriginal.GetUsefulWaitingBytes());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
+ ("filtered_count", batch.GetFilterBatch()->num_rows())
+ ("blobs_count", batchOriginal.GetWaitingBlobs().size())
+ ("columns_count", batchOriginal.GetCurrentColumnIds()->size())
+ ("fetch_size", batchOriginal.GetWaitingBytes())
+ ;
+ }
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h
new file mode 100644
index 0000000000..527c0bd8ed
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h
@@ -0,0 +1,82 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/reader/granule.h>
+#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TGranulesFillingContext;
+
+class IOrderPolicy {
+public:
+ enum class EFeatures: ui32 {
+ CanInterrupt = 1,
+ NeedNotAppliedEarlyFilter = 1 << 1
+ };
+ using TFeatures = ui32;
+private:
+ mutable std::optional<TFeatures> Features;
+protected:
+ TReadMetadata::TConstPtr ReadMetadata;
+ virtual void DoFill(TGranulesFillingContext& context) = 0;
+ virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) {
+ return true;
+ }
+ virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0;
+ virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) {
+ OnBatchFilterInitialized(batchInfo, context);
+ return true;
+ }
+
+ void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context);
+ virtual TFeatures DoGetFeatures() const {
+ return 0;
+ }
+ TFeatures GetFeatures() const {
+ if (!Features) {
+ Features = DoGetFeatures();
+ }
+ return *Features;
+ }
+public:
+ using TPtr = std::shared_ptr<IOrderPolicy>;
+ virtual ~IOrderPolicy() = default;
+
+ virtual std::set<ui32> GetFilterStageColumns() {
+ return ReadMetadata->GetEarlyFilterColumnIds();
+ }
+
+ IOrderPolicy(TReadMetadata::TConstPtr readMetadata)
+ : ReadMetadata(readMetadata)
+ {
+
+ }
+
+ bool CanInterrupt() const {
+ return GetFeatures() & (TFeatures)EFeatures::CanInterrupt;
+ }
+
+ bool NeedNotAppliedEarlyFilter() const {
+ return GetFeatures() & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
+ }
+
+ bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) {
+ return DoOnFilterReady(batchInfo, granule, context);
+ }
+
+
+ virtual bool ReadyForAddNotIndexedToEnd() const = 0;
+
+ std::vector<TGranule*> DetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ return DoDetachReadyGranules(granulesToOut);
+ }
+
+ void Fill(TGranulesFillingContext& context) {
+ DoFill(context);
+ }
+
+ bool Wakeup(const TGranule& granule, TGranulesFillingContext& context) {
+ return DoWakeup(granule, context);
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
new file mode 100644
index 0000000000..b31b499583
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
@@ -0,0 +1,28 @@
+#include "default.h"
+#include <ydb/core/tx/columnshard/engines/reader/filling_context.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+void TAnySorting::DoFill(TGranulesFillingContext& context) {
+ auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
+ for (ui64 granule : granulesOrder) {
+ TGranule& g = context.GetGranuleVerified(granule);
+ GranulesOutOrder.emplace_back(&g);
+ }
+}
+
+std::vector<TGranule*> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ std::vector<TGranule*> result;
+ while (GranulesOutOrder.size()) {
+ NIndexedReader::TGranule* granule = GranulesOutOrder.front();
+ if (!granule->IsReady()) {
+ break;
+ }
+ result.emplace_back(granule);
+ Y_VERIFY(granulesToOut.erase(granule->GetGranuleId()));
+ GranulesOutOrder.pop_front();
+ }
+ return result;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.h b/ydb/core/tx/columnshard/engines/reader/order_control/default.h
new file mode 100644
index 0000000000..e56e19f30f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.h
@@ -0,0 +1,23 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TAnySorting: public IOrderPolicy {
+private:
+ using TBase = IOrderPolicy;
+ std::deque<TGranule*> GranulesOutOrder;
+protected:
+ virtual void DoFill(TGranulesFillingContext& context) override;
+ virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+public:
+ TAnySorting(TReadMetadata::TConstPtr readMetadata)
+ :TBase(readMetadata) {
+
+ }
+ virtual bool ReadyForAddNotIndexedToEnd() const override {
+ return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
new file mode 100644
index 0000000000..8d6cc168e5
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
@@ -0,0 +1,15 @@
+#include "not_sorted.h"
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ std::vector<TGranule*> result;
+ result.reserve(granulesToOut.size());
+ for (auto&& i : granulesToOut) {
+ result.emplace_back(i.second);
+ }
+ granulesToOut.clear();
+ return result;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h
new file mode 100644
index 0000000000..499d929d03
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h
@@ -0,0 +1,26 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TNonSorting: public IOrderPolicy {
+private:
+ using TBase = IOrderPolicy;
+protected:
+ virtual void DoFill(TGranulesFillingContext& /*context*/) override {
+ }
+
+ virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+public:
+ TNonSorting(TReadMetadata::TConstPtr readMetadata)
+ :TBase(readMetadata)
+ {
+
+ }
+
+ virtual bool ReadyForAddNotIndexedToEnd() const override {
+ return true;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
new file mode 100644
index 0000000000..ead178faca
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
@@ -0,0 +1,102 @@
+#include "pk_with_limit.h"
+#include <ydb/core/tx/columnshard/engines/reader/filling_context.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingContext& context) {
+ Y_VERIFY(ReadMetadata->Limit);
+ if (!CurrentItemsLimit) {
+ return false;
+ }
+ Y_VERIFY(GranulesOutOrderForPortions.size());
+ if (GranulesOutOrderForPortions.front().GetGranule()->GetGranuleId() != granule.GetGranuleId()) {
+ return false;
+ }
+ while (GranulesOutOrderForPortions.size() && CurrentItemsLimit) {
+ auto& g = GranulesOutOrderForPortions.front();
+ // granule have to wait NotIndexedBatch initialization, at first (NotIndexedBatchReadyFlag initialization).
+ // other batches will be delivered in OrderedBatches[granuleId] order (not sortable at first in according to granule.SortBatchesByPK)
+ if (!g.GetGranule()->IsNotIndexedBatchReady()) {
+ break;
+ }
+ if (g.Start()) {
+ ++CountProcessedGranules;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size());
+ MergeStream.AddPoolSource({}, g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter());
+ }
+ auto& batches = g.GetOrderedBatches();
+ while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) {
+ TBatch* b = batches.front();
+ std::optional<ui32> batchPoolId;
+ if (b->IsSortableInGranule()) {
+ ++CountSorted;
+ batchPoolId = 0;
+ } else {
+ ++CountNotSorted;
+ }
+ MergeStream.AddPoolSource(batchPoolId, b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter());
+ OnBatchFilterInitialized(*b, context);
+ batches.pop_front();
+ while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.DrainCurrent()) {
+ if (!--CurrentItemsLimit) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "stop_on_limit")
+ ("limit", ReadMetadata->Limit)("sorted_count", CountSorted)("unsorted_count", CountNotSorted)("granules_count", CountProcessedGranules);
+ }
+ }
+ }
+ if (batches.empty()) {
+ GranulesOutOrderForPortions.pop_front();
+ } else {
+ break;
+ }
+ }
+ while (GranulesOutOrderForPortions.size() && !CurrentItemsLimit) {
+ auto& g = GranulesOutOrderForPortions.front();
+ g.GetGranule()->AddNotIndexedBatch(nullptr);
+ auto& batches = g.GetOrderedBatches();
+ while (batches.size()) {
+ auto b = batches.front();
+ context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns()));
+ b->InitBatch(nullptr);
+ batches.pop_front();
+ }
+ GranulesOutOrderForPortions.pop_front();
+ }
+ return true;
+}
+
+bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) {
+ return Wakeup(granule, context);
+}
+
+void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) {
+ auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
+ for (ui64 granule : granulesOrder) {
+ TGranule& g = context.GetGranuleVerified(granule);
+ GranulesOutOrder.emplace_back(&g);
+ GranulesOutOrderForPortions.emplace_back(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), &g);
+ }
+}
+
+std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ std::vector<TGranule*> result;
+ while (GranulesOutOrder.size()) {
+ NIndexedReader::TGranule* granule = GranulesOutOrder.front();
+ if (!granule->IsReady()) {
+ break;
+ }
+ result.emplace_back(granule);
+ Y_VERIFY(granulesToOut.erase(granule->GetGranuleId()));
+ GranulesOutOrder.pop_front();
+ }
+ return result;
+}
+
+TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
+ :TBase(readMetadata)
+ , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), readMetadata->IsDescSorted())
+{
+ CurrentItemsLimit = ReadMetadata->Limit;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
new file mode 100644
index 0000000000..36ec23b9c9
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
@@ -0,0 +1,72 @@
+#pragma once
+#include "abstract.h"
+#include <ydb/core/tx/columnshard/engines/reader/read_filter_merger.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TGranuleOrdered {
+private:
+ bool StartedFlag = false;
+ std::deque<TBatch*> OrderedBatches;
+ TGranule* Granule = nullptr;
+public:
+ bool Start() {
+ if (!StartedFlag) {
+ StartedFlag = true;
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
+ TGranuleOrdered(std::deque<TBatch*>&& orderedBatches, TGranule* granule)
+ : OrderedBatches(std::move(orderedBatches))
+ , Granule(granule)
+ {
+ }
+
+ std::deque<TBatch*>& GetOrderedBatches() noexcept {
+ return OrderedBatches;
+ }
+
+ TGranule* GetGranule() const noexcept {
+ return Granule;
+ }
+};
+
+class TPKSortingWithLimit: public IOrderPolicy {
+private:
+ using TBase = IOrderPolicy;
+ std::deque<TGranule*> GranulesOutOrder;
+ std::deque<TGranuleOrdered> GranulesOutOrderForPortions;
+ ui32 CurrentItemsLimit = 0;
+ ui32 CountProcessedGranules = 0;
+ ui32 CountNotSorted = 0;
+ ui32 CountSorted = 0;
+ TMergePartialStream MergeStream;
+protected:
+ virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override;
+ virtual void DoFill(TGranulesFillingContext& context) override;
+ virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+ virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override;
+ virtual TFeatures DoGetFeatures() const override {
+ return (TFeatures)EFeatures::CanInterrupt & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
+ }
+
+public:
+ virtual std::set<ui32> GetFilterStageColumns() override {
+ std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds();
+ for (auto&& i : ReadMetadata->GetPKColumnIds()) {
+ result.emplace(i);
+ }
+ return result;
+ }
+
+ TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata);
+ virtual bool ReadyForAddNotIndexedToEnd() const override {
+ return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
deleted file mode 100644
index 1e2beba67e..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+++ /dev/null
@@ -1,159 +0,0 @@
-#include "order_controller.h"
-#include "filling_context.h"
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-void TAnySorting::DoFill(TGranulesFillingContext& context) {
- auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
- for (ui64 granule : granulesOrder) {
- TGranule& g = context.GetGranuleVerified(granule);
- GranulesOutOrder.emplace_back(&g);
- }
-}
-
-std::vector<TGranule*> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
- std::vector<TGranule*> result;
- while (GranulesOutOrder.size()) {
- NIndexedReader::TGranule* granule = GranulesOutOrder.front();
- if (!granule->IsReady()) {
- break;
- }
- result.emplace_back(granule);
- Y_VERIFY(granulesToOut.erase(granule->GetGranuleId()));
- GranulesOutOrder.pop_front();
- }
- return result;
-}
-
-std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
- std::vector<TGranule*> result;
- result.reserve(granulesToOut.size());
- for (auto&& i : granulesToOut) {
- result.emplace_back(i.second);
- }
- granulesToOut.clear();
- return result;
-}
-
-bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingContext& context) {
- Y_VERIFY(ReadMetadata->Limit);
- if (!CurrentItemsLimit) {
- return false;
- }
- Y_VERIFY(GranulesOutOrderForPortions.size());
- if (GranulesOutOrderForPortions.front().GetGranule()->GetGranuleId() != granule.GetGranuleId()) {
- return false;
- }
- while (GranulesOutOrderForPortions.size()) {
- auto& g = GranulesOutOrderForPortions.front();
- // granule have to wait NotIndexedBatch initialization, at first (StartableFlag initialization).
- // other batches will be delivered in OrderedBatches[granuleId] order
- if (!g.GetGranule()->IsNotIndexedBatchReady()) {
- break;
- }
- if (g.Start()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size());
- MergeStream.AddPoolSource({}, g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter());
- }
- auto& batches = g.GetBatches();
- while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) {
- auto b = batches.front();
- MergeStream.AddPoolSource(b->GetMergePoolId(), b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter());
- OnBatchFilterInitialized(*b, context);
- batches.pop_front();
- }
- while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.DrainCurrent()) {
- --CurrentItemsLimit;
- }
- if (!CurrentItemsLimit || batches.empty()) {
- while (batches.size()) {
- auto b = batches.front();
- context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns()));
- b->InitBatch(nullptr);
- batches.pop_front();
- }
- GranulesOutOrderForPortions.pop_front();
- } else {
- break;
- }
- }
- return true;
-}
-
-bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) {
- return Wakeup(granule, context);
-}
-
-void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) {
- auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
- for (ui64 granule : granulesOrder) {
- TGranule& g = context.GetGranuleVerified(granule);
- GranulesOutOrder.emplace_back(&g);
- GranulesOutOrderForPortions.emplace_back(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), &g);
- }
-}
-
-std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
- std::vector<TGranule*> result;
- while (GranulesOutOrder.size()) {
- NIndexedReader::TGranule* granule = GranulesOutOrder.front();
- if (!granule->IsReady()) {
- break;
- }
- result.emplace_back(granule);
- Y_VERIFY(granulesToOut.erase(granule->GetGranuleId()));
- GranulesOutOrder.pop_front();
- }
- return result;
-}
-
-TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
- :TBase(readMetadata)
- , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), readMetadata->IsDescSorted())
-{
- CurrentItemsLimit = ReadMetadata->Limit;
-}
-
-void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFillingContext& context) {
- auto& batch = batchOriginal.GetFetchedInfo();
- Y_VERIFY(!!batch.GetFilter());
- if (!batch.GetFilteredRecordsCount()) {
- context.GetCounters().EmptyFilterCount->Add(1);
- context.GetCounters().EmptyFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes());
- context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns()));
- batchOriginal.InitBatch(nullptr);
- } else {
- context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows());
- if (batchOriginal.AskedColumnsAlready(context.GetPostFilterColumns())) {
- context.GetCounters().FilterOnlyCount->Add(1);
- context.GetCounters().FilterOnlyFetchedBytes->Add(batchOriginal.GetFetchedBytes());
- context.GetCounters().FilterOnlyUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
- context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns()));
-
- batchOriginal.InitBatch(batch.GetFilterBatch());
- } else {
- context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes());
- context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
-
- batchOriginal.ResetWithFilter(context.GetPostFilterColumns());
- if (batchOriginal.IsFetchingReady()) {
- auto processor = context.GetTasksProcessor();
- if (auto assembleBatchTask = batchOriginal.AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
- processor.Add(context, assembleBatchTask);
- }
- }
-
- context.GetCounters().TwoPhasesCount->Add(1);
- context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes());
- context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batchOriginal.GetUsefulWaitingBytes());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
- ("filtered_count", batch.GetFilterBatch()->num_rows())
- ("blobs_count", batchOriginal.GetWaitingBlobs().size())
- ("columns_count", batchOriginal.GetCurrentColumnIds()->size())
- ("fetch_size", batchOriginal.GetWaitingBytes())
- ;
- }
- }
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h
deleted file mode 100644
index 66f2fba42b..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.h
+++ /dev/null
@@ -1,186 +0,0 @@
-#pragma once
-#include "granule.h"
-#include "read_metadata.h"
-#include "read_filter_merger.h"
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TGranulesFillingContext;
-
-class IOrderPolicy {
-public:
- enum class EFeatures: ui32 {
- CanInterrupt = 1,
- NeedNotAppliedEarlyFilter = 1 << 1
- };
- using TFeatures = ui32;
-private:
- mutable std::optional<TFeatures> Features;
-protected:
- TReadMetadata::TConstPtr ReadMetadata;
- virtual void DoFill(TGranulesFillingContext& context) = 0;
- virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) {
- return true;
- }
- virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0;
- virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) {
- OnBatchFilterInitialized(batchInfo, context);
- return true;
- }
-
- void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context);
- virtual TFeatures DoGetFeatures() const {
- return 0;
- }
- TFeatures GetFeatures() const {
- if (!Features) {
- Features = DoGetFeatures();
- }
- return *Features;
- }
-public:
- using TPtr = std::shared_ptr<IOrderPolicy>;
- virtual ~IOrderPolicy() = default;
-
- virtual std::set<ui32> GetFilterStageColumns() {
- return ReadMetadata->GetEarlyFilterColumnIds();
- }
-
- IOrderPolicy(TReadMetadata::TConstPtr readMetadata)
- : ReadMetadata(readMetadata)
- {
-
- }
-
- bool CanInterrupt() const {
- return GetFeatures() & (TFeatures)EFeatures::CanInterrupt;
- }
-
- bool NeedNotAppliedEarlyFilter() const {
- return GetFeatures() & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
- }
-
- bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) {
- return DoOnFilterReady(batchInfo, granule, context);
- }
-
-
- virtual bool ReadyForAddNotIndexedToEnd() const = 0;
-
- std::vector<TGranule*> DetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
- return DoDetachReadyGranules(granulesToOut);
- }
-
- void Fill(TGranulesFillingContext& context) {
- DoFill(context);
- }
-
- bool Wakeup(const TGranule& granule, TGranulesFillingContext& context) {
- return DoWakeup(granule, context);
- }
-};
-
-class TNonSorting: public IOrderPolicy {
-private:
- using TBase = IOrderPolicy;
-protected:
- virtual void DoFill(TGranulesFillingContext& /*context*/) override {
- }
-
- virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
-public:
- TNonSorting(TReadMetadata::TConstPtr readMetadata)
- :TBase(readMetadata)
- {
-
- }
-
- virtual bool ReadyForAddNotIndexedToEnd() const override {
- return true;
- }
-};
-
-class TAnySorting: public IOrderPolicy {
-private:
- using TBase = IOrderPolicy;
- std::deque<TGranule*> GranulesOutOrder;
-protected:
- virtual void DoFill(TGranulesFillingContext& context) override;
- virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
-public:
- TAnySorting(TReadMetadata::TConstPtr readMetadata)
- :TBase(readMetadata) {
-
- }
- virtual bool ReadyForAddNotIndexedToEnd() const override {
- return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
- }
-};
-
-class TGranuleOrdered {
-private:
- bool StartedFlag = false;
- std::deque<TBatch*> Batches;
- const TGranule* Granule = nullptr;
-public:
- bool Start() {
- if (!StartedFlag) {
- StartedFlag = true;
- return true;
- } else {
- return false;
- }
-
- }
-
- TGranuleOrdered(std::deque<TBatch*>&& batches, TGranule* granule)
- : Batches(std::move(batches))
- , Granule(granule)
- {
- }
-
- const std::deque<TBatch*>& GetBatches() const noexcept {
- return Batches;
- }
-
- std::deque<TBatch*>& GetBatches() noexcept {
- return Batches;
- }
-
- const TGranule* GetGranule() const noexcept {
- return Granule;
- }
-};
-
-class TPKSortingWithLimit: public IOrderPolicy {
-private:
- using TBase = IOrderPolicy;
- std::deque<TGranule*> GranulesOutOrder;
- std::deque<TGranuleOrdered> GranulesOutOrderForPortions;
- ui32 CurrentItemsLimit = 0;
- TMergePartialStream MergeStream;
-protected:
- virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override;
- virtual void DoFill(TGranulesFillingContext& context) override;
- virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
- virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override;
- virtual TFeatures DoGetFeatures() const override {
- return (TFeatures)EFeatures::CanInterrupt & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
- }
-
-public:
- virtual std::set<ui32> GetFilterStageColumns() override {
- std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds();
- for (auto&& i : ReadMetadata->GetPKColumnIds()) {
- result.emplace(i);
- }
- return result;
- }
-
- TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata);
- virtual bool ReadyForAddNotIndexedToEnd() const override {
- return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index 6f43e2e95d..225bc1058c 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -1,5 +1,6 @@
#include "read_metadata.h"
-#include "order_controller.h"
+#include "order_control/default.h"
+#include "order_control/pk_with_limit.h"
#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
#include <ydb/core/tx/columnshard/columnshard__stats_scan.h>