aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-09-06 19:11:04 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-09-06 19:37:46 +0300
commit2efe58fb7a98d40e700c0f6304bd0fa752fb663c (patch)
tree36c3275421242af471b26d7d17e23d4756607f2a
parent88ec26760cfd1b45a5e1a3831ca94e2afb5ab344 (diff)
downloadydb-2efe58fb7a98d40e700c0f6304bd0fa752fb663c.tar.gz
KIKIMR-19213: restore optimizations. fixes. remove deprecated reader
-rw-r--r--ydb/core/formats/arrow/arrow_filter.cpp66
-rw-r--r--ydb/core/formats/arrow/arrow_filter.h65
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp12
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.h2
-rw-r--r--ydb/core/formats/arrow/permutations.cpp15
-rw-r--r--ydb/core/formats/arrow/permutations.h2
-rw-r--r--ydb/core/formats/arrow/ut_arrow.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h16
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h14
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp425
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h96
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/meta.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.h4
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp226
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h161
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp102
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h140
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp69
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.h51
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp187
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h159
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp89
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule_preparation.h46
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp42
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/abstract.h83
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/default.cpp27
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/default.h27
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h30
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp123
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h78
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/result.cpp29
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/result.h45
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/ya.make17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h110
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h50
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp48
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp58
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h29
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp135
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.h96
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp50
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h35
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.cpp103
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.h70
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp104
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h109
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp54
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/ya.make8
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h1
-rw-r--r--ydb/core/tx/columnshard/engines/ya.make1
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.cpp17
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h3
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/ya.make2
-rw-r--r--ydb/core/tx/columnshard/inflight_request_tracker.h3
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp3
99 files changed, 673 insertions, 3230 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp
index bb0e839ec8..cb63286d9f 100644
--- a/ydb/core/formats/arrow/arrow_filter.cpp
+++ b/ydb/core/formats/arrow/arrow_filter.cpp
@@ -130,6 +130,63 @@ void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatc
}
}
}
+
+}
+
+bool TColumnFilter::TIterator::Next(const ui32 size) {
+ Y_VERIFY(size);
+ if (CurrentRemainVolume > size) {
+ InternalPosition += size;
+ CurrentRemainVolume -= size;
+ return true;
+ }
+ if (!FilterPointer && CurrentRemainVolume == size) {
+ InternalPosition += size;
+ CurrentRemainVolume -= size;
+ return false;
+ }
+ Y_VERIFY(FilterPointer);
+ ui32 sizeRemain = size;
+ while (Position != FinishPosition) {
+ const ui32 currentVolume = (*FilterPointer)[Position];
+ if (currentVolume > sizeRemain + InternalPosition) {
+ InternalPosition += sizeRemain;
+ CurrentRemainVolume = currentVolume - InternalPosition;
+ return true;
+ } else {
+ sizeRemain -= currentVolume - InternalPosition;
+ InternalPosition = 0;
+ CurrentValue = !CurrentValue;
+ Position += DeltaPosition;
+ }
+ }
+ Y_VERIFY(Position == FinishPosition);
+ CurrentRemainVolume = 0;
+ return false;
+}
+
+TString TColumnFilter::TIterator::DebugString() const {
+ TStringBuilder sb;
+ if (FilterPointer) {
+ sb << "filter_pointer=";
+ ui32 idx = 0;
+ ui64 sum = 0;
+ for (auto&& i : *FilterPointer) {
+ sb << i << ",";
+ sum += i;
+ if (++idx > 100) {
+ break;
+ }
+ }
+ sb << ";sum=" << sum << ";";
+ }
+ sb << "internal_position=" << InternalPosition << ";";
+ sb << "position=" << Position << ";";
+ sb << "current=" << CurrentValue << ";";
+ sb << "finish=" << FinishPosition << ";";
+ sb << "delta=" << DeltaPosition << ";";
+ sb << "remain=" << CurrentRemainVolume << ";";
+ return sb;
}
std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter(const ui32 expectedSize) const {
@@ -456,4 +513,13 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
}
}
+TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const {
+ if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) {
+ return TIterator(reverse, expectedSize, CurrentValue);
+ } else {
+ Y_VERIFY(expectedSize == Size());
+ return TIterator(reverse, Filter, GetStartValue(reverse));
+ }
+}
+
}
diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h
index 67423ec640..f8c0158cd7 100644
--- a/ydb/core/formats/arrow/arrow_filter.h
+++ b/ydb/core/formats/arrow/arrow_filter.h
@@ -55,69 +55,60 @@ public:
class TIterator {
private:
- ui32 InternalPosition = 0;
- ui32 CurrentRemainVolume = 0;
- const std::vector<ui32>& Filter;
+ i64 InternalPosition = 0;
+ i64 CurrentRemainVolume = 0;
+ const std::vector<ui32>* FilterPointer = nullptr;
i32 Position = 0;
bool CurrentValue;
const i32 FinishPosition;
const i32 DeltaPosition;
public:
+ TString DebugString() const;
+
TIterator(const bool reverse, const std::vector<ui32>& filter, const bool startValue)
- : Filter(filter)
+ : FilterPointer(&filter)
, CurrentValue(startValue)
- , FinishPosition(reverse ? -1 : Filter.size())
+ , FinishPosition(reverse ? -1 : FilterPointer->size())
, DeltaPosition(reverse ? -1 : 1)
{
- if (!Filter.size()) {
+ if (!FilterPointer->size()) {
Position = FinishPosition;
} else {
if (reverse) {
- Position = Filter.size() - 1;
+ Position = FilterPointer->size() - 1;
}
- CurrentRemainVolume = Filter[Position];
+ CurrentRemainVolume = (*FilterPointer)[Position];
+ }
+ }
+
+ TIterator(const bool reverse, const ui32 size, const bool startValue)
+ : CurrentValue(startValue)
+ , FinishPosition(reverse ? -1 : 1)
+ , DeltaPosition(reverse ? -1 : 1) {
+ if (!size) {
+ Position = FinishPosition;
+ } else {
+ if (reverse) {
+ Position = 0;
+ }
+ CurrentRemainVolume = size;
}
}
bool GetCurrentAcceptance() const {
- Y_VERIFY_DEBUG(CurrentRemainVolume);
+ Y_VERIFY(CurrentRemainVolume);
return CurrentValue;
}
bool IsBatchForSkip(const ui32 size) const {
- Y_VERIFY_DEBUG(CurrentRemainVolume);
+ Y_VERIFY(CurrentRemainVolume);
return !CurrentValue && CurrentRemainVolume >= size;
}
- bool Next(const ui32 size) {
- Y_VERIFY(size);
- if (CurrentRemainVolume > size) {
- InternalPosition += size;
- CurrentRemainVolume -= size;
- return true;
- }
- ui32 sizeRemain = size;
- while (Position != FinishPosition) {
- const ui32 currentVolume = Filter[Position];
- if (currentVolume - InternalPosition > sizeRemain) {
- InternalPosition = sizeRemain;
- CurrentRemainVolume = currentVolume - InternalPosition - sizeRemain;
- return true;
- } else {
- sizeRemain -= currentVolume - InternalPosition;
- InternalPosition = 0;
- CurrentValue = !CurrentValue;
- Position += DeltaPosition;
- }
- }
- CurrentRemainVolume = 0;
- return false;
- }
+ bool Next(const ui32 size);
};
- TIterator GetIterator(const bool reverse) const {
- return TIterator(reverse, Filter, GetStartValue(reverse));
- }
+ TIterator GetIterator(const bool reverse, const ui32 expectedSize) const;
bool empty() const {
return Filter.empty();
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 43af9fb5dc..d6857dafab 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -341,8 +341,8 @@ bool IsNoOp(const arrow::UInt64Array& permutation) {
}
std::shared_ptr<arrow::RecordBatch> Reorder(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::UInt64Array>& permutation) {
- Y_VERIFY(permutation->length() == batch->num_rows());
+ const std::shared_ptr<arrow::UInt64Array>& permutation, const bool canRemove) {
+ Y_VERIFY(permutation->length() == batch->num_rows() || canRemove);
auto res = IsNoOp(*permutation) ? batch : arrow::compute::Take(batch, permutation);
Y_VERIFY(res.ok());
@@ -373,7 +373,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared
Y_VERIFY_OK(builder.Finish(&permutation));
}
- auto reorderedBatch = Reorder(batch, permutation);
+ auto reorderedBatch = Reorder(batch, permutation, false);
std::vector<std::shared_ptr<arrow::RecordBatch>> out(numShards);
@@ -689,9 +689,9 @@ int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr
}
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::Schema>& sortingKey) {
- auto sortPermutation = MakeSortPermutation(batch, sortingKey);
- return Reorder(batch, sortPermutation);
+ const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
+ auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);
+ return Reorder(batch, sortPermutation, andUnique);
}
std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec) {
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index fb012391c4..8314d4d9ab 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -98,7 +98,7 @@ bool ReserveData(arrow::ArrayBuilder& builder, const size_t size);
bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder = {}, const bool orderFieldsAreNecessary = true);
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::Schema>& sortingKey);
+ const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
bool IsSorted(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey,
bool desc = false);
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp
index f16f43b9c3..ed7f410b8a 100644
--- a/ydb/core/formats/arrow/permutations.cpp
+++ b/ydb/core/formats/arrow/permutations.cpp
@@ -39,7 +39,7 @@ std::shared_ptr<arrow::UInt64Array> MakePermutation(const int size, const bool r
}
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::Schema>& sortingKey) {
+ const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
auto keyBatch = ExtractColumns(batch, sortingKey);
auto keyColumns = std::make_shared<TArrayVec>(keyBatch->columns());
std::vector<TRawReplaceKey> points;
@@ -70,8 +70,21 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
arrow::UInt64Builder builder;
TStatusValidator::Validate(builder.Reserve(points.size()));
+ TRawReplaceKey* predKey = nullptr;
for (auto& point : points) {
+ if (andUnique) {
+ if (predKey) {
+ if (haveNulls) {
+ if (*predKey == point) {
+ continue;
+ }
+ } else if (predKey->CompareNotNull(point) == std::partial_ordering::equivalent) {
+ continue;
+ }
+ }
+ }
TStatusValidator::Validate(builder.Append(point.GetPosition()));
+ predKey = &point;
}
std::shared_ptr<arrow::UInt64Array> out;
diff --git a/ydb/core/formats/arrow/permutations.h b/ydb/core/formats/arrow/permutations.h
index c811cfe680..9a1175887b 100644
--- a/ydb/core/formats/arrow/permutations.h
+++ b/ydb/core/formats/arrow/permutations.h
@@ -8,7 +8,7 @@ namespace NKikimr::NArrow {
std::shared_ptr<arrow::UInt64Array> MakePermutation(const int size, const bool reverse = false);
std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64>& indexes);
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::Schema>& sortingKey);
+ const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& source, const std::vector<ui64>& indexes);
diff --git a/ydb/core/formats/arrow/ut_arrow.cpp b/ydb/core/formats/arrow/ut_arrow.cpp
index 8bbef7c388..3716d31b44 100644
--- a/ydb/core/formats/arrow/ut_arrow.cpp
+++ b/ydb/core/formats/arrow/ut_arrow.cpp
@@ -648,7 +648,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 1000);
UNIT_ASSERT(!CheckSorted1000(batch));
- auto sortPermutation = NArrow::MakeSortPermutation(batch, table->schema());
+ auto sortPermutation = NArrow::MakeSortPermutation(batch, table->schema(), false);
auto res = arrow::compute::Take(batch, sortPermutation);
UNIT_ASSERT(res.ok());
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 6ff5f6e3b3..5465ed35fa 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -21,7 +21,7 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data
IndexedData->AddData(blobRange, data);
}
-NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
+std::optional<NOlap::TPartialReadResult> TColumnShardScanIterator::GetBatch() {
FillReadyResults();
return ReadyResults.pop_front();
}
@@ -34,17 +34,17 @@ void TColumnShardScanIterator::FillReadyResults() {
auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch);
i64 limitLeft = ReadMetadata->Limit == 0 ? INT64_MAX : ReadMetadata->Limit - ItemsRead;
for (size_t i = 0; i < ready.size() && limitLeft; ++i) {
- if (ready[i].GetResultBatch()->num_rows() == 0 && !ready[i].GetLastReadKey()) {
+ if (ready[i].GetResultBatch().num_rows() == 0 && !ready[i].GetLastReadKey()) {
Y_VERIFY(i + 1 == ready.size(), "Only last batch can be empty!");
break;
}
auto& batch = ReadyResults.emplace_back(std::move(ready[i]));
- if (batch.GetResultBatch()->num_rows() > limitLeft) {
+ if (batch.GetResultBatch().num_rows() > limitLeft) {
batch.Slice(0, limitLeft);
}
- limitLeft -= batch.GetResultBatch()->num_rows();
- ItemsRead += batch.GetResultBatch()->num_rows();
+ limitLeft -= batch.GetResultBatch().num_rows();
+ ItemsRead += batch.GetResultBatch().num_rows();
}
if (limitLeft == 0) {
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 6d8a77a2af..09fb17f9db 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -2,7 +2,8 @@
#include "columnshard__scan.h"
#include "columnshard_common.h"
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+#include "engines/reader/read_metadata.h"
+#include "engines/reader/read_context.h"
namespace NKikimr::NColumnShard {
@@ -23,7 +24,6 @@ public:
}
};
-
using NOlap::TUnifiedBlobId;
using NOlap::TBlobRange;
@@ -40,7 +40,7 @@ public:
<< "records_count:" << RecordsCount << ";"
;
if (Data.size()) {
- sb << "schema=" << Data.front().GetResultBatch()->schema()->ToString() << ";";
+ sb << "schema=" << Data.front().GetResultBatch().schema()->ToString() << ";";
}
return sb;
}
@@ -50,16 +50,16 @@ public:
}
NOlap::TPartialReadResult& emplace_back(NOlap::TPartialReadResult&& v) {
- RecordsCount += v.GetResultBatch()->num_rows();
+ RecordsCount += v.GetResultBatch().num_rows();
Data.emplace_back(std::move(v));
return Data.back();
}
- NOlap::TPartialReadResult pop_front() {
+ std::optional<NOlap::TPartialReadResult> pop_front() {
if (Data.empty()) {
- return NOlap::TPartialReadResult();
+ return {};
}
auto result = std::move(Data.front());
- RecordsCount -= result.GetResultBatch()->num_rows();
+ RecordsCount -= result.GetResultBatch().num_rows();
Data.pop_front();
return result;
}
@@ -104,7 +104,7 @@ public:
return IndexedData->IsFinished() && ReadyResults.empty();
}
- NOlap::TPartialReadResult GetBatch() override;
+ std::optional<NOlap::TPartialReadResult> GetBatch() override;
std::optional<NBlobCache::TBlobRange> GetNextBlobToRead() override;
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index a2182f63e4..b888d503b1 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -5,7 +5,6 @@
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/columnshard__read_base.h>
#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index f7e67460f8..337b52ce98 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -277,7 +277,12 @@ private:
return false;
}
- auto result = ScanIterator->GetBatch();
+ auto resultOpt = ScanIterator->GetBatch();
+ if (!resultOpt) {
+ ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
+ return false;
+ }
+ auto& result = *resultOpt;
if (!result.ErrorString.empty()) {
ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", result.ErrorString);
SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size()));
@@ -291,12 +296,7 @@ private:
ResultYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetResultYqlSchema();
}
- if (!result.GetResultBatch()) {
- ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
- return false;
- }
-
- auto& batch = result.GetResultBatch();
+ auto batch = result.GetResultBatchPtrVerified();
int numRows = batch->num_rows();
int numColumns = batch->num_columns();
if (!numRows) {
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index 464f5b0fcc..ca72f0597c 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -97,24 +97,29 @@ public:
}
}
- const std::shared_ptr<arrow::RecordBatch>& GetResultBatch() const {
+ const std::shared_ptr<arrow::RecordBatch>& GetResultBatchPtrVerified() const {
+ Y_VERIFY(ResultBatch);
return ResultBatch;
}
+ const arrow::RecordBatch& GetResultBatch() const {
+ Y_VERIFY(ResultBatch);
+ return *ResultBatch;
+ }
+
const std::shared_ptr<arrow::RecordBatch>& GetLastReadKey() const {
return LastReadKey;
}
std::string ErrorString;
- TPartialReadResult() = default;
-
explicit TPartialReadResult(
std::shared_ptr<TScanMemoryLimiter::TGuard> memGuard,
std::shared_ptr<arrow::RecordBatch> batch)
: MemoryGuardExternal(memGuard)
, ResultBatch(batch)
{
+ Y_VERIFY(ResultBatch);
}
explicit TPartialReadResult(
@@ -124,6 +129,7 @@ public:
, ResultBatch(batch)
, LastReadKey(lastKey)
{
+ Y_VERIFY(ResultBatch);
}
};
}
@@ -143,7 +149,7 @@ public:
virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/) {}
virtual bool HasWaitingTasks() const = 0;
virtual bool Finished() const = 0;
- virtual NOlap::TPartialReadResult GetBatch() = 0;
+ virtual std::optional<NOlap::TPartialReadResult> GetBatch() = 0;
virtual std::optional<NBlobCache::TBlobRange> GetNextBlobToRead() { return {}; }
virtual TString DebugString() const {
return "NO_DATA";
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
index 3056242b13..021c4a2a49 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
@@ -2,7 +2,7 @@
namespace NKikimr::NColumnShard {
-NKikimr::NOlap::TPartialReadResult TStatsIterator::GetBatch() {
+std::optional<NOlap::TPartialReadResult> TStatsIterator::GetBatch() {
// Take next raw batch
auto batch = FillStatsBatch();
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index 825846f308..a334dff2a1 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -53,7 +53,7 @@ public:
return IndexStats.empty();
}
- NOlap::TPartialReadResult GetBatch() override;
+ std::optional<NOlap::TPartialReadResult> GetBatch() override;
private:
NOlap::TReadStatsMetadata::TConstPtr ReadMetadata;
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index 94f29e6893..41ff6614c5 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -46,7 +46,6 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_features.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index 08ceed7b24..e5129086a4 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -47,7 +47,6 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_features.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index 08ceed7b24..e5129086a4 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -47,7 +47,6 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_features.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index 94f29e6893..41ff6614c5 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -46,7 +46,6 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_features.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 7c76aa0f15..c58d1d964d 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1,6 +1,5 @@
#include "column_engine_logs.h"
#include "filter.h"
-#include "indexed_read_data.h"
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
deleted file mode 100644
index cdfb768fd0..0000000000
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ /dev/null
@@ -1,425 +0,0 @@
-#include "defs.h"
-#include "indexed_read_data.h"
-#include "filter.h"
-#include "column_engine_logs.h"
-#include "changes/mark_granules.h"
-#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
-#include <ydb/core/tx/columnshard/columnshard__stats_scan.h>
-#include <ydb/core/formats/arrow/one_batch_input_stream.h>
-#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
-#include <ydb/core/formats/arrow/custom_registry.h>
-
-namespace NKikimr::NOlap {
-
-namespace {
-
-// Slices a batch into smaller batches and appends them to result vector (which might be non-empty already)
-std::vector<std::shared_ptr<arrow::RecordBatch>> SliceBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
- const int64_t maxRowsInBatch)
-{
- std::vector<std::shared_ptr<arrow::RecordBatch>> result;
- if (!batch->num_rows()) {
- return result;
- }
- result.reserve(result.size() + batch->num_rows() / maxRowsInBatch + 1);
- if (batch->num_rows() <= maxRowsInBatch) {
- result.push_back(batch);
- return result;
- }
-
- int64_t offset = 0;
- while (offset < batch->num_rows()) {
- int64_t rows = std::min<int64_t>(maxRowsInBatch, batch->num_rows() - offset);
- result.emplace_back(batch->Slice(offset, rows));
- offset += rows;
- }
- return result;
-};
-
-}
-
-void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
- Y_VERIFY(IndexedBlobSubscriber.emplace(range, batch.GetBatchAddress()).second);
- if (batch.GetFetchedInfo().GetFilter()) {
- Context.GetCounters().PostFilterBytes->Add(range.Size);
- ReadMetadata->ReadStats->DataAdditionalBytes += range.Size;
- PriorityBlobsQueue.emplace_back(batch.GetGranule(), range);
- } else {
- Context.GetCounters().FilterBytes->Add(range.Size);
- ReadMetadata->ReadStats->DataFilterBytes += range.Size;
- FetchBlobsQueue.emplace_back(batch.GetGranule(), range);
- }
-}
-
-void TIndexedReadData::RegisterZeroGranula() {
- for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i) {
- const auto& cmtBlob = ReadMetadata->CommittedBlobs[i];
- WaitCommitted.emplace(cmtBlob.GetBlobId(), cmtBlob);
- }
- for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
- AddNotIndexed(blobId, batch);
- }
- for (const auto& [blobId, _] : WaitCommitted) {
- AddBlobToFetchInFront(0, TBlobRange(blobId, 0, blobId.BlobSize()));
- }
-}
-
-void TIndexedReadData::InitRead() {
- Y_VERIFY(!GranulesContext);
- GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode);
-
- RegisterZeroGranula();
-
- auto& indexInfo = ReadMetadata->GetIndexInfo();
- Y_VERIFY(indexInfo.GetSortingKey());
- Y_VERIFY(indexInfo.GetIndexKey() && indexInfo.GetIndexKey()->num_fields());
-
- SortReplaceDescription = indexInfo.SortReplaceDescription();
-
- ui64 portionsBytes = 0;
- std::set<ui64> granulesReady;
- ui64 prevGranule = 0;
- THashSet<ui32> columnIdsHash;
- for (auto&& i : ReadMetadata->GetAllColumns()) {
- columnIdsHash.emplace(i);
- }
- for (auto& portionSorted : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) {
- auto portionInfo = portionSorted->CopyWithFilteredColumns(columnIdsHash);
- portionsBytes += portionInfo.BlobsBytes();
- Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata);
-
- NIndexedReader::TGranule::TPtr granule = GranulesContext->UpsertGranule(portionInfo.GetGranule());
- if (prevGranule != portionInfo.GetGranule()) {
- Y_VERIFY(granulesReady.emplace(portionInfo.GetGranule()).second);
- prevGranule = portionInfo.GetGranule();
- }
- granule->RegisterBatchForFetching(std::move(portionInfo));
- }
- GranulesContext->PrepareForStart();
-
- Context.GetCounters().PortionBytes->Add(portionsBytes);
- auto& stats = ReadMetadata->ReadStats;
- stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size();
- stats->IndexPortions = ReadMetadata->SelectInfo->PortionsOrderedPK.size();
- stats->IndexBatches = ReadMetadata->NumIndexedBlobs();
- stats->CommittedBatches = ReadMetadata->CommittedBlobs.size();
- stats->SchemaColumns = ReadMetadata->GetSchemaColumnsCount();
- stats->FilterColumns = GranulesContext->GetEarlyFilterColumns().size();
- stats->AdditionalColumns = GranulesContext->GetPostFilterColumns().size();
- stats->PortionsBytes = portionsBytes;
-}
-
-void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data) {
- Y_VERIFY(GranulesContext);
- NIndexedReader::TBatch* portionBatch = nullptr;
- {
- auto it = IndexedBlobSubscriber.find(blobRange);
- Y_VERIFY(it != IndexedBlobSubscriber.end());
- portionBatch = GranulesContext->GetBatchInfo(it->second);
- if (!portionBatch) {
- ACFL_INFO("event", "batch for finished granule")("address", it->second.ToString());
- return;
- }
- IndexedBlobSubscriber.erase(it);
- }
- if (!portionBatch->AddIndexedReady(blobRange, data)) {
- return;
- }
-}
-
-std::shared_ptr<arrow::RecordBatch>
-TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const TSnapshot& snapshot) const {
- Y_VERIFY(srcBatch);
-
- // Extract columns (without check), filter, attach snapshot, extract columns with check
- // (do not filter snapshot columns)
- auto dataSchema = ReadMetadata->GetLoadSchema(snapshot);
-
- auto batch = NArrow::ExtractExistedColumns(srcBatch, dataSchema->GetSchema());
- Y_VERIFY(batch);
-
- auto filter = FilterNotIndexed(batch, *ReadMetadata);
- if (filter.IsTotalDenyFilter()) {
- return nullptr;
- }
- auto preparedBatch = batch;
- preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, snapshot);
- auto resultSchema = ReadMetadata->GetLoadSchema();
- preparedBatch = resultSchema->NormalizeBatch(*dataSchema, preparedBatch);
- preparedBatch = NArrow::ExtractColumns(preparedBatch, resultSchema->GetSchema());
- Y_VERIFY(preparedBatch);
-
- filter.Apply(preparedBatch);
- return preparedBatch;
-}
-
-std::vector<TPartialReadResult> TIndexedReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) {
- Y_VERIFY(SortReplaceDescription);
- auto& indexInfo = ReadMetadata->GetIndexInfo();
-
- if (WaitCommitted.size()) {
- // Wait till we have all not indexed data so we could replace keys in granules
- return {};
- }
-
- // First time extract OutNotIndexed data
- if (NotIndexed.size()) {
- auto mergedBatch = MergeNotIndexed(std::move(NotIndexed)); // merged has no dups
- if (mergedBatch) {
- // Init split by granules structures
- Y_VERIFY(ReadMetadata->SelectInfo);
- TMarksGranules marksGranules(*ReadMetadata->SelectInfo);
-
- // committed data before the first granule would be placed in fake preceding granule
- // committed data after the last granule would be placed into the last granule
- marksGranules.MakePrecedingMark(indexInfo);
- Y_VERIFY(!marksGranules.Empty());
-
- auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, indexInfo);
- GranulesContext->DrainNotIndexedBatches(&outNotIndexed);
- if (outNotIndexed.size() == 1) {
- auto it = outNotIndexed.begin();
- if (it->first) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("incorrect_granule_id", it->first);
- Y_FAIL();
- }
- NotIndexedOutscopeBatch = it->second;
- }
- } else {
- GranulesContext->DrainNotIndexedBatches(nullptr);
- }
- NotIndexed.clear();
- } else {
- GranulesContext->DrainNotIndexedBatches(nullptr);
- }
-
- return ReadyToOut(maxRowsInBatch);
-}
-
-/// @return batches that are not blocked by others
-std::vector<TPartialReadResult> TIndexedReadData::ReadyToOut(const i64 maxRowsInBatch) {
- Y_VERIFY(SortReplaceDescription);
- Y_VERIFY(GranulesContext);
-
- std::vector<NIndexedReader::TGranule::TPtr> ready = GranulesContext->DetachReadyInOrder();
- std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out;
- out.reserve(ready.size() + 1);
-
- // Prepend not indexed data (less then first granule) before granules for ASC sorting
- if (ReadMetadata->IsAscSorted() && NotIndexedOutscopeBatch) {
- out.push_back({});
- out.back().push_back(NotIndexedOutscopeBatch);
- NotIndexedOutscopeBatch = nullptr;
- }
-
- for (auto&& granule : ready) {
- std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = granule->GetReadyBatches();
-
- if (inGranule.empty()) {
- continue;
- }
- out.emplace_back(std::move(inGranule));
- }
-
- // Append not indexed data (less then first granule) after granules for DESC sorting
- if (GranulesContext->GetSortingPolicy()->ReadyForAddNotIndexedToEnd() && NotIndexedOutscopeBatch) {
- out.push_back({});
- out.back().push_back(NotIndexedOutscopeBatch);
- NotIndexedOutscopeBatch = nullptr;
- }
-
- return MakeResult(std::move(out), maxRowsInBatch);
-}
-
-std::shared_ptr<arrow::RecordBatch>
-TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const {
- Y_VERIFY(ReadMetadata->IsSorted());
- auto& indexInfo = ReadMetadata->GetIndexInfo();
- Y_VERIFY(indexInfo.GetSortingKey());
-
- {
- const auto pred = [](const std::shared_ptr<arrow::RecordBatch>& b) {
- return !b || !b->num_rows();
- };
- batches.erase(std::remove_if(batches.begin(), batches.end(), pred), batches.end());
- }
-
- if (batches.empty()) {
- return {};
- }
-
- // We could merge data here only if backpressure limits committed data size. KIKIMR-12520
- auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription());
- Y_VERIFY(merged);
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey()));
- return merged;
-}
-
-void TIndexedReadData::MergeTooSmallBatches(const std::shared_ptr<TMemoryAggregation>& memAggregation, std::vector<TPartialReadResult>& out) const {
- if (out.size() < 10) {
- return;
- }
-
- i64 sumRows = 0;
- for (auto& result : out) {
- sumRows += result.GetResultBatch()->num_rows();
- }
- if (sumRows / out.size() > 100) {
- return;
- }
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.reserve(out.size());
- for (auto& batch : out) {
- batches.push_back(batch.GetResultBatch());
- }
-
- auto res = arrow::Table::FromRecordBatches(batches);
- if (!res.ok()) {
- Y_VERIFY_DEBUG(false, "Cannot make table from batches");
- return;
- }
-
- res = (*res)->CombineChunks();
- if (!res.ok()) {
- Y_VERIFY_DEBUG(false, "Cannot combine chunks");
- return;
- }
- auto batch = NArrow::ToBatch(*res);
-
- std::vector<TPartialReadResult> merged;
-
- auto g = std::make_shared<TScanMemoryLimiter::TGuard>(GetMemoryAccessor(), memAggregation);
- g->Take(NArrow::GetBatchMemorySize(batch));
- merged.emplace_back(TPartialReadResult(g, batch, out.back().GetLastReadKey()));
- out.swap(merged);
-}
-
-std::vector<TPartialReadResult> TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules,
- int64_t maxRowsInBatch) const {
- Y_VERIFY(ReadMetadata->IsSorted());
- Y_VERIFY(SortReplaceDescription);
- auto& indexInfo = ReadMetadata->GetIndexInfo();
-
- std::vector<TPartialReadResult> out;
-
- bool isDesc = ReadMetadata->IsDescSorted();
-
- for (auto& batches : granules) {
- if (isDesc) {
- std::reverse(batches.begin(), batches.end());
- }
- for (auto& batch : batches) {
- if (isDesc) {
- auto permutation = NArrow::MakePermutation(batch->num_rows(), true);
- batch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(batch, permutation)).record_batch();
- }
- std::shared_ptr<TScanMemoryLimiter::TGuard> memGuard = std::make_shared<TScanMemoryLimiter::TGuard>(
- GetMemoryAccessor(), Context.GetCounters().Aggregations.GetResultsReady());
- memGuard->Take(NArrow::GetBatchMemorySize(batch));
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> splitted = SliceBatch(batch, maxRowsInBatch);
-
- for (auto& batch : splitted) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey(), isDesc));
- // Extract the last row's PK
- auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
- auto lastKey = keyBatch->Slice(keyBatch->num_rows() - 1, 1);
-
- // Leave only requested columns
- auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema());
- out.emplace_back(TPartialReadResult(memGuard, resultBatch, lastKey));
- }
- }
- }
-
- if (ReadMetadata->GetProgram().HasProgram()) {
- MergeTooSmallBatches(Context.GetCounters().Aggregations.GetResultsReady(), out);
- for (auto& result : out) {
- result.ApplyProgram(ReadMetadata->GetProgram());
- }
- }
- return out;
-}
-
-TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const TReadContext& context)
- : TBase(context, readMetadata)
- , OnePhaseReadMode(context.GetIsInternalRead())
-{
-}
-
-bool TIndexedReadData::DoIsFinished() const {
- Y_VERIFY(GranulesContext);
- return NotIndexed.empty() && FetchBlobsQueue.empty() && PriorityBlobsQueue.empty() && GranulesContext->IsFinished();
-}
-
-void TIndexedReadData::DoAbort() {
- Y_VERIFY(GranulesContext);
- Context.MutableProcessor().Stop();
- FetchBlobsQueue.Stop();
- PriorityBlobsQueue.Stop();
- GranulesContext->Abort();
- IndexedBlobSubscriber.clear();
- WaitCommitted.clear();
-}
-
-std::optional<TBlobRange> TIndexedReadData::DoExtractNextBlob(const bool hasReadyResults) {
- Y_VERIFY(GranulesContext);
- while (auto* f = PriorityBlobsQueue.front()) {
- if (!GranulesContext->IsGranuleActualForProcessing(f->GetObjectId())) {
- ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetObjectId());
- PriorityBlobsQueue.pop_front();
- continue;
- }
-
- GranulesContext->ForceStartProcessGranule(f->GetObjectId(), f->GetRange());
- Context.GetCounters().OnPriorityFetch(f->GetRange().Size);
- return PriorityBlobsQueue.pop_front();
- }
-
- while (auto* f = FetchBlobsQueue.front()) {
- if (!GranulesContext->IsGranuleActualForProcessing(f->GetObjectId())) {
- ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetObjectId());
- FetchBlobsQueue.pop_front();
- continue;
- }
-
- if (GranulesContext->TryStartProcessGranule(f->GetObjectId(), f->GetRange(), hasReadyResults)) {
- Context.GetCounters().OnGeneralFetch(f->GetRange().Size);
- return FetchBlobsQueue.pop_front();
- } else {
- Context.GetCounters().OnProcessingOverloaded();
- return {};
- }
- }
- return {};
-}
-
-void TIndexedReadData::AddNotIndexed(const TBlobRange& blobRange, const TString& column) {
- auto it = WaitCommitted.find(blobRange.BlobId);
- Y_VERIFY(it != WaitCommitted.end());
- auto batch = NArrow::DeserializeBatch(column, ReadMetadata->GetBlobSchema(it->second.GetSchemaSnapshot()));
- NotIndexed.emplace_back(MakeNotIndexedBatch(batch, it->second.GetSchemaSnapshot()));
- WaitCommitted.erase(it);
-}
-
-void TIndexedReadData::AddNotIndexed(const TUnifiedBlobId& blobId, const std::shared_ptr<arrow::RecordBatch>& batch) {
- auto it = WaitCommitted.find(blobId);
- Y_VERIFY(it != WaitCommitted.end());
- NotIndexed.emplace_back(MakeNotIndexedBatch(batch, it->second.GetSchemaSnapshot()));
- WaitCommitted.erase(it);
-}
-
-void TIndexedReadData::DoAddData(const TBlobRange& blobRange, const TString& data) {
- if (GranulesContext->IsFinished()) {
- ACFL_DEBUG("event", "AddData on GranulesContextFinished");
- return;
- }
- if (IsIndexedBlob(blobRange)) {
- AddIndexed(blobRange, data);
- } else {
- AddNotIndexed(blobRange, data);
- }
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
deleted file mode 100644
index 13fa7e27b3..0000000000
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ /dev/null
@@ -1,96 +0,0 @@
-#pragma once
-#include "defs.h"
-#include "column_engine.h"
-#include "predicate/predicate.h"
-#include "reader/queue.h"
-#include "reader/granule.h"
-#include "reader/batch.h"
-#include "reader/filling_context.h"
-#include "reader/read_metadata.h"
-
-#include <ydb/library/accessor/accessor.h>
-#include <ydb/core/tx/columnshard/counters.h>
-
-namespace NKikimr::NColumnShard {
-class TScanIteratorBase;
-}
-
-namespace NKikimr::NOlap {
-
-class TIndexedReadData: public IDataReader, TNonCopyable {
-private:
- using TBase = IDataReader;
- std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext;
- THashMap<TUnifiedBlobId, NOlap::TCommittedBlob> WaitCommitted;
-
- TFetchBlobsQueue FetchBlobsQueue;
- TFetchBlobsQueue PriorityBlobsQueue;
- bool OnePhaseReadMode = false;
- std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
-
- THashMap<TBlobRange, NIndexedReader::TBatchAddress> IndexedBlobSubscriber;
- std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch;
- std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
-
- void AddNotIndexed(const TBlobRange& blobRange, const TString& column);
- void AddNotIndexed(const TUnifiedBlobId& blobId, const std::shared_ptr<arrow::RecordBatch>& batch);
- void RegisterZeroGranula();
-
- void AddIndexed(const TBlobRange& blobRange, const TString& column);
- bool IsIndexedBlob(const TBlobRange& blobRange) const {
- return IndexedBlobSubscriber.contains(blobRange);
- }
-protected:
- virtual TString DoDebugString() const override {
- return TStringBuilder()
- << "wait_committed:" << WaitCommitted.size() << ";"
- << "granules_context:(" << (GranulesContext ? GranulesContext->DebugString() : "NO") << ");"
- ;
- }
-
- /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
- virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override;
-
- virtual void DoAbort() override;
- virtual bool DoIsFinished() const override;
-
- virtual void DoAddData(const TBlobRange& blobRange, const TString& data) override;
- virtual std::optional<TBlobRange> DoExtractNextBlob(const bool hasReadyResults) override;
-public:
- TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const TReadContext& context);
-
- NIndexedReader::TGranulesFillingContext& GetGranulesContext() {
- Y_VERIFY(GranulesContext);
- return *GranulesContext;
- }
-
- void InitRead();
-
- NOlap::TReadMetadata::TConstPtr GetReadMetadata() const {
- return ReadMetadata;
- }
-
- void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
- void OnBatchReady(const NIndexedReader::TBatch& /*batchInfo*/, std::shared_ptr<arrow::RecordBatch> batch) {
- if (batch && batch->num_rows()) {
- ReadMetadata->ReadStats->SelectedRows += batch->num_rows();
- }
- }
-
- void AddBlobToFetchInFront(const ui64 granuleId, const TBlobRange& range) {
- PriorityBlobsQueue.emplace_back(granuleId, range);
- }
-
-private:
- std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
- const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) const;
-
- std::shared_ptr<arrow::RecordBatch> MergeNotIndexed(
- std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const;
- std::vector<TPartialReadResult> ReadyToOut(const i64 maxRowsInBatch);
- void MergeTooSmallBatches(const std::shared_ptr<TMemoryAggregation>& memAggregation, std::vector<TPartialReadResult>& out) const;
- std::vector<TPartialReadResult> MakeResult(
- std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, int64_t maxRowsInBatch) const;
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.cpp b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
index 092e852abc..64fa210150 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
@@ -6,7 +6,7 @@ bool TInsertedDataMeta::DeserializeFromProto(const NKikimrTxColumnShard::TLogica
if (proto.HasDirtyWriteTimeSeconds()) {
DirtyWriteTime = TInstant::Seconds(proto.GetDirtyWriteTimeSeconds());
}
- if (proto.HasRawBytes()) {
+ if (proto.HasSpecialKeysRawData()) {
SpecialKeys = NArrow::TFirstLastSpecialKeys(proto.GetSpecialKeysRawData());
}
NumRows = proto.GetNumRows();
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h
index 0e17895476..a460b22d8a 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.h
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.h
@@ -100,10 +100,6 @@ public:
return ColumnId == item.ColumnId && Chunk == item.Chunk;
}
- std::optional<ui32> GetChunkRowsCount() const {
- return Meta.GetNumRows();
- }
-
bool Valid() const {
return ColumnId && ValidBlob();
}
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 21b0180180..1c7cd58e18 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -29,7 +29,7 @@ const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record)
void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName) {
const auto& indexInfo = snapshotSchema.GetIndexInfo();
-
+ Y_VERIFY(batch->num_rows() == NumRows());
Meta = {};
Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
Meta.FillBatchInfo(batch, indexInfo);
@@ -89,7 +89,7 @@ ui64 TPortionInfo::GetRawBytes(const std::vector<ui32>& columnIds) const {
} else {
for (auto&& r : Records) {
if (r.ColumnId == i) {
- sum += r.GetMeta().GetRawBytes().value_or(0);
+ sum += r.GetMeta().GetRawBytesVerified();
}
}
}
@@ -110,6 +110,8 @@ TString TPortionInfo::DebugString() const {
sb << "(portion_id:" << Portion << ";" <<
"granule_id:" << Granule << ";records_count:" << NumRows() << ";"
"min_snapshot:(" << MinSnapshot.DebugString() << ");" <<
+// "from:" << IndexKeyStart().DebugString() << ";" <<
+// "to:" << IndexKeyEnd().DebugString() << ";" <<
"size:" << BlobsBytes() << ";" <<
"meta:(" << Meta.DebugString() << ");";
if (RemoveSnapshot.Valid()) {
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index b98c6e9d64..d7f008e55a 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -224,7 +224,7 @@ public:
std::optional<ui32> columnIdFirst;
for (auto&& i : Records) {
if (!columnIdFirst || *columnIdFirst == i.ColumnId) {
- result += i.GetMeta().GetNumRows().value_or(0);
+ result += i.GetMeta().GetNumRowsVerified();
columnIdFirst = i.ColumnId;
}
}
@@ -235,7 +235,7 @@ public:
ui32 result = 0;
for (auto&& i : Records) {
if (columnId == i.ColumnId) {
- result += i.GetMeta().GetNumRows().value_or(0);
+ result += i.GetMeta().GetNumRowsVerified();
}
}
return result;
@@ -246,7 +246,7 @@ public:
ui64 RawBytesSum() const {
ui64 result = 0;
for (auto&& i : Records) {
- result += i.GetMeta().GetRawBytes().value_or(0);
+ result += i.GetMeta().GetRawBytesVerified();
}
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index 46264ebdea..3f60021c20 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -6,7 +6,6 @@
# original buildsystem will not be accepted.
-add_subdirectory(order_control)
add_subdirectory(plain_reader)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -29,26 +28,18 @@ target_link_libraries(columnshard-engines-reader PUBLIC
columnshard-hooks-abstract
tx-columnshard-resources
core-tx-program
- engines-reader-order_control
engines-reader-plain_reader
columnshard-engines-scheme
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index 8b5d772ca2..2702824112 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -6,7 +6,6 @@
# original buildsystem will not be accepted.
-add_subdirectory(order_control)
add_subdirectory(plain_reader)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -30,26 +29,18 @@ target_link_libraries(columnshard-engines-reader PUBLIC
columnshard-hooks-abstract
tx-columnshard-resources
core-tx-program
- engines-reader-order_control
engines-reader-plain_reader
columnshard-engines-scheme
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index 8b5d772ca2..2702824112 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -6,7 +6,6 @@
# original buildsystem will not be accepted.
-add_subdirectory(order_control)
add_subdirectory(plain_reader)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -30,26 +29,18 @@ target_link_libraries(columnshard-engines-reader PUBLIC
columnshard-hooks-abstract
tx-columnshard-resources
core-tx-program
- engines-reader-order_control
engines-reader-plain_reader
columnshard-engines-scheme
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index 46264ebdea..3f60021c20 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -6,7 +6,6 @@
# original buildsystem will not be accepted.
-add_subdirectory(order_control)
add_subdirectory(plain_reader)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -29,26 +28,18 @@ target_link_libraries(columnshard-engines-reader PUBLIC
columnshard-hooks-abstract
tx-columnshard-resources
core-tx-program
- engines-reader-order_control
engines-reader-plain_reader
columnshard-engines-scheme
tools-enum_parser-enum_serialization_runtime
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
deleted file mode 100644
index 5e5823dd1b..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ /dev/null
@@ -1,226 +0,0 @@
-#include "batch.h"
-#include "granule.h"
-#include "filter_assembler.h"
-#include "postfilter_assembler.h"
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
-#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-TBatch::TBatch(const TBatchAddress& address, TGranule& owner, TPortionInfo&& portionInfoExt, const ui64 predictedBatchSize)
- : BatchAddress(address)
- , Portion(portionInfoExt.GetPortion())
- , Granule(owner.GetGranuleId())
- , PredictedBatchSize(predictedBatchSize)
- , Owner(&owner)
- , PortionInfo(std::move(portionInfoExt))
-{
- Y_VERIFY(Granule == PortionInfo.GetGranule());
- Y_VERIFY(PortionInfo.Records.size());
-
- if (PortionInfo.CanIntersectOthers()) {
- ACFL_TRACE("event", "intersect_portion");
- Owner->SetDuplicationsAvailable(true);
- if (PortionInfo.CanHaveDups()) {
- ACFL_TRACE("event", "dup_portion");
- DuplicationsAvailableFlag = true;
- }
- }
-}
-
-NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) {
- Y_VERIFY(WaitIndexed.empty());
- Y_VERIFY(PortionInfo.Produced());
- Y_VERIFY(!FetchedInfo.GetFilteredBatch());
-
- auto blobSchema = readMetadata->GetLoadSchema(PortionInfo.GetMinSnapshot());
- auto readSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot());
- ISnapshotSchema::TPtr resultSchema;
- if (CurrentColumnIds) {
- resultSchema = std::make_shared<TFilteredSnapshotSchema>(readSchema, *CurrentColumnIds);
- } else {
- resultSchema = readSchema;
- }
- auto batchConstructor = PortionInfo.PrepareForAssemble(*blobSchema, *resultSchema, Data);
- Data.clear();
- if (!FetchedInfo.GetFilter()) {
- return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata,
- *this, Owner->GetEarlyFilterColumns(), processor, Owner->GetOwner().GetSortingPolicy());
- } else {
- return std::make_shared<TAssembleBatch>(std::move(batchConstructor), *this, readMetadata->GetColumnsOrder(), processor);
- }
-}
-
-bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const {
- if (!CurrentColumnIds) {
- return true;
- }
- if (AskedColumnIds.size() < columnIds.size()) {
- return false;
- }
- for (auto&& i : columnIds) {
- if (!AskedColumnIds.contains(i)) {
- return false;
- }
- }
- return true;
-}
-
-ui64 TBatch::GetFetchBytes(const std::set<ui32>& columnIds) {
- ui64 result = 0;
- for (const NOlap::TColumnRecord& rec : PortionInfo.Records) {
- if (!columnIds.contains(rec.ColumnId)) {
- continue;
- }
- Y_VERIFY(rec.Valid());
- result += rec.BlobRange.Size;
- }
- return result;
-}
-
-void TBatch::ResetCommon(const std::set<ui32>& columnIds) {
- CurrentColumnIds = columnIds;
- Y_VERIFY(CurrentColumnIds->size());
- for (auto&& i : *CurrentColumnIds) {
- Y_VERIFY(AskedColumnIds.emplace(i).second);
- }
-
- Y_VERIFY(WaitIndexed.empty());
- Y_VERIFY(Data.empty());
- WaitingBytes = 0;
- FetchedBytes = 0;
-}
-
-void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) {
- Y_VERIFY(!FetchedInfo.GetFilter());
- ResetCommon(columnIds);
- for (const NOlap::TColumnRecord& rec : PortionInfo.Records) {
- if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
- continue;
- }
- Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second);
- Owner->AddBlobForFetch(rec.BlobRange, *this);
- Y_VERIFY(rec.Valid());
- WaitingBytes += rec.BlobRange.Size;
- }
-}
-
-void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
- Y_VERIFY(FetchedInfo.GetFilter());
- ResetCommon(columnIds);
- std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects;
- for (const NOlap::TColumnRecord& rec : PortionInfo.Records) {
- if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
- continue;
- }
- orderedObjects[rec.ColumnId][rec.Chunk] = &rec;
- Y_VERIFY(rec.Valid());
- }
-
- for (auto&& columnInfo : orderedObjects) {
- ui32 expected = 0;
- auto it = FetchedInfo.GetFilter()->GetIterator(false);
- bool undefinedShift = false;
- bool itFinished = false;
- for (auto&& [chunk, rec] : columnInfo.second) {
- Y_VERIFY(!itFinished);
- Y_VERIFY(expected++ == chunk);
- if (!rec->GetChunkRowsCount()) {
- undefinedShift = true;
- }
- if (!undefinedShift && it.IsBatchForSkip(*rec->GetChunkRowsCount())) {
- Data.emplace(rec->BlobRange, TPortionInfo::TAssembleBlobInfo(*rec->GetChunkRowsCount()));
- } else {
- Y_VERIFY(WaitIndexed.emplace(rec->BlobRange).second);
- Owner->AddBlobForFetch(rec->BlobRange, *this);
- WaitingBytes += rec->BlobRange.Size;
- }
- if (!undefinedShift) {
- itFinished = !it.Next(*rec->GetChunkRowsCount());
- }
- }
- }
- CheckReadyForAssemble();
-}
-
-bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
- const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter)
-{
- FetchedInfo.InitFilter(filter, filterBatch, originalRecordsCount, notAppliedEarlyFilter);
- return Owner->OnFilterReady(*this);
-}
-
-void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) {
- FetchedInfo.InitBatch(batch);
- Owner->OnBatchReady(*this, batch);
-}
-
-bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) {
- if (!WaitIndexed.erase(bRange)) {
- Y_ASSERT(false);
- return false;
- }
- WaitingBytes -= bRange.Size;
- FetchedBytes += bRange.Size;
- Data.emplace(bRange, TPortionInfo::TAssembleBlobInfo(blobData));
- Owner->OnBlobReady(bRange);
- CheckReadyForAssemble();
- return true;
-}
-
-ui64 TBatch::GetUsefulBytes(const ui64 bytes) const {
- return bytes * FetchedInfo.GetUsefulDataKff();
-}
-
-std::shared_ptr<TSortableBatchPosition> TBatch::GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const {
- if (!FirstPK || !ReverseFirstPK) {
- std::shared_ptr<TSortableBatchPosition> from;
- std::shared_ptr<TSortableBatchPosition> to;
- GetPKBorders(reverse, indexInfo, from, to);
- }
- if (reverse) {
- return *ReverseFirstPK;
- } else {
- return *FirstPK;
- }
-}
-
-void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const {
- auto indexKey = indexInfo.GetIndexKey();
- Y_VERIFY(PortionInfo.Valid());
- if (!FirstPK) {
- const NArrow::TReplaceKey& minRecord = PortionInfo.IndexKeyStart();
- auto batch = minRecord.ToBatch(indexKey);
- Y_VERIFY(batch);
- FirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), false);
- ReverseLastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), true);
- }
- if (!LastPK) {
- const NArrow::TReplaceKey& maxRecord = PortionInfo.IndexKeyEnd();
- auto batch = maxRecord.ToBatch(indexKey);
- Y_VERIFY(batch);
- LastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), false);
- ReverseFirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), true);
- }
- if (reverse) {
- from = *ReverseFirstPK;
- to = *ReverseLastPK;
- } else {
- from = *FirstPK;
- to = *LastPK;
- }
-}
-
-bool TBatch::CheckReadyForAssemble() {
- if (IsFetchingReady()) {
- auto& context = Owner->GetOwner();
- auto processor = context.GetTasksProcessor();
- if (auto assembleBatchTask = AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
- processor.Add(context.GetOwner(), assembleBatchTask);
- }
- return true;
- }
- return false;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
deleted file mode 100644
index 338a610718..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ /dev/null
@@ -1,161 +0,0 @@
-#pragma once
-#include "common.h"
-#include "conveyor_task.h"
-#include "read_filter_merger.h"
-
-#include <ydb/core/formats/arrow/arrow_filter.h>
-#include <ydb/core/formats/arrow/size_calcer.h>
-#include <ydb/core/tx/columnshard/blob.h>
-#include <ydb/core/tx/columnshard/engines/portion_info.h>
-
-#include <ydb/library/accessor/accessor.h>
-
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-
-namespace NKikimr::NOlap {
-struct TReadMetadata;
-}
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TGranule;
-class TBatch;
-
-class TBatchFetchedInfo {
-private:
- YDB_READONLY_DEF(std::optional<ui64>, BatchSize);
- YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch);
- YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch);
- YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
- YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedEarlyFilter);
- ui32 OriginalRecordsCount = 0;
-public:
- void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
- const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter) {
- Y_VERIFY(filter);
- Y_VERIFY(!Filter);
- Y_VERIFY(!FilterBatch);
- Filter = filter;
- FilterBatch = filterBatch;
- OriginalRecordsCount = originalRecordsCount;
- NotAppliedEarlyFilter = notAppliedEarlyFilter;
- }
-
- double GetUsefulDataKff() const {
- if (!FilterBatch || !FilterBatch->num_rows()) {
- return 0;
- }
- Y_VERIFY_DEBUG(OriginalRecordsCount);
- if (!OriginalRecordsCount) {
- return 0;
- }
- return 1.0 * FilterBatch->num_rows() / OriginalRecordsCount;
- }
-
- bool IsFiltered() const {
- return !!Filter;
- }
-
- void InitBatch(std::shared_ptr<arrow::RecordBatch> fullBatch) {
- Y_VERIFY(!FilteredBatch);
- FilteredBatch = fullBatch;
- BatchSize = NArrow::GetBatchMemorySize(FilteredBatch);
- }
-
- ui32 GetFilteredRecordsCount() const {
- Y_VERIFY(IsFiltered());
- if (!FilterBatch) {
- return 0;
- } else {
- return FilterBatch->num_rows();
- }
- }
-};
-
-class TBatch: TNonCopyable {
-private:
- const TBatchAddress BatchAddress;
- YDB_READONLY(ui64, Portion, 0);
- YDB_READONLY(ui64, Granule, 0);
- YDB_READONLY(ui64, WaitingBytes, 0);
- YDB_READONLY(ui64, FetchedBytes, 0);
- const ui64 PredictedBatchSize;
-
- THashSet<TBlobRange> WaitIndexed;
- mutable std::optional<std::shared_ptr<TSortableBatchPosition>> FirstPK;
- mutable std::optional<std::shared_ptr<TSortableBatchPosition>> LastPK;
- mutable std::optional<std::shared_ptr<TSortableBatchPosition>> ReverseFirstPK;
- mutable std::optional<std::shared_ptr<TSortableBatchPosition>> ReverseLastPK;
-
- YDB_READONLY_FLAG(DuplicationsAvailable, false);
- YDB_READONLY_DEF(TBatchFetchedInfo, FetchedInfo);
- THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
- TGranule* Owner;
- const TPortionInfo PortionInfo;
-
- YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds);
- std::set<ui32> AskedColumnIds;
- void ResetCommon(const std::set<ui32>& columnIds);
- ui64 GetUsefulBytes(const ui64 bytes) const;
- bool CheckReadyForAssemble();
- bool IsFetchingReady() const {
- return WaitIndexed.empty();
- }
-
-public:
- std::shared_ptr<TSortableBatchPosition> GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const;
- void GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const;
-
- ui64 GetPredictedBatchSize() const {
- return PredictedBatchSize;
- }
-
- ui64 GetRealBatchSizeVerified() const {
- auto result = FetchedInfo.GetBatchSize();
- Y_VERIFY(result);
- return *result;
- }
-
- bool AllowEarlyFilter() const {
- return PortionInfo.AllowEarlyFilter();
- }
- const TBatchAddress& GetBatchAddress() const {
- return BatchAddress;
- }
-
- ui64 GetUsefulWaitingBytes() const {
- return GetUsefulBytes(WaitingBytes);
- }
-
- ui64 GetUsefulFetchedBytes() const {
- return GetUsefulBytes(FetchedBytes);
- }
-
- TBatch(const TBatchAddress& address, TGranule& owner, TPortionInfo&& portionInfo, const ui64 predictedBatchSize);
- bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData);
- bool AskedColumnsAlready(const std::set<ui32>& columnIds) const;
-
- void ResetNoFilter(const std::set<ui32>& columnIds);
- void ResetWithFilter(const std::set<ui32>& columnIds);
- ui64 GetFetchBytes(const std::set<ui32>& columnIds);
-
- ui32 GetFilteredRecordsCount() const;
- bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
- const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter);
- void InitBatch(std::shared_ptr<arrow::RecordBatch> batch);
-
- NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata);
-
- const THashSet<TBlobRange>& GetWaitingBlobs() const {
- return WaitIndexed;
- }
-
- const TGranule& GetOwner() const {
- return *Owner;
- }
-
- const TPortionInfo& GetPortionInfo() const {
- return PortionInfo;
- }
-};
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
index 2116c74733..a3c005fd2a 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
@@ -1,5 +1,5 @@
#include "conveyor_task.h"
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+#include "read_context.h"
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
deleted file mode 100644
index b7bc120a42..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ /dev/null
@@ -1,102 +0,0 @@
-#include "filling_context.h"
-#include "order_control/not_sorted.h"
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
-#include <ydb/core/tx/columnshard/resources/memory.h>
-#include <util/string/join.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading)
- : ReadMetadata(readMetadata)
- , InternalReading(internalReading)
- , Processing(owner.GetMemoryAccessor(), owner.GetCounters())
- , Result(owner.GetCounters())
- , GranulesLiveContext(std::make_shared<TGranulesLiveControl>())
- , Owner(owner)
- , Counters(owner.GetCounters())
-{
- SortingPolicy = InternalReading ? std::make_shared<TNonSorting>(ReadMetadata) : ReadMetadata->BuildSortingPolicy();
-
- UsedColumns = ReadMetadata->GetUsedColumnIds();
- EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds();
- FilterStageColumns = SortingPolicy->GetFilterStageColumns();
- PKColumnNames = ReadMetadata->GetReplaceKey()->field_names();
- PostFilterColumns = ReadMetadata->GetUsedColumnIds();
- for (auto&& i : FilterStageColumns) {
- PostFilterColumns.erase(i);
- }
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TGranulesFillingContext")("used", UsedColumns.size())("early", EarlyFilterColumns.size())
- ("filter_stage", FilterStageColumns.size())("PKColumnNames", JoinSeq(",", PKColumnNames))("post_filter", PostFilterColumns.size());
-}
-
-bool TGranulesFillingContext::PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const {
- if (!portionInfo.AllowEarlyFilter()) {
- return false;
- }
- if (EarlyFilterColumns.empty()) {
- return false;
- }
- if (TIndexInfo::IsSpecialColumns(EarlyFilterColumns)) {
- return false;
- }
- return true;
-}
-
-void TGranulesFillingContext::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
- return Owner.AddBlobForFetch(range, batch);
-}
-
-void TGranulesFillingContext::OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
- return Owner.OnBatchReady(batchInfo, batch);
-}
-
-NIndexedReader::TBatch* TGranulesFillingContext::GetBatchInfo(const TBatchAddress& address) {
- return Processing.GetBatchInfo(address);
-}
-
-NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfoVerified(const TBatchAddress& address) {
- return Processing.GetBatchInfoVerified(address);
-}
-
-NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const {
- return Owner.GetTasksProcessor();
-}
-
-void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) {
- Processing.DrainNotIndexedBatches(batches);
-}
-
-bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range, const bool hasReadyResults) {
- Y_VERIFY_DEBUG(!Result.IsReady(granuleId));
- if (InternalReading || Processing.IsInProgress(granuleId)
- || (!GranulesLiveContext->GetCount() && !hasReadyResults)
- || GetMemoryAccessor()->GetLimiter().HasBufferOrSubscribe(GetMemoryAccessor())
- )
- {
- Processing.StartBlobProcessing(granuleId, range);
- return true;
- } else {
- return false;
- }
-}
-
-bool TGranulesFillingContext::ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range) {
- Y_VERIFY_DEBUG(!Result.IsReady(granuleId));
- Processing.StartBlobProcessing(granuleId, range);
- return true;
-}
-
-void TGranulesFillingContext::OnGranuleReady(const ui64 granuleId) {
- Result.AddResult(Processing.ExtractReadyVerified(granuleId));
-}
-
-std::vector<NKikimr::NOlap::NIndexedReader::TGranule::TPtr> TGranulesFillingContext::DetachReadyInOrder() {
- Y_VERIFY(SortingPolicy);
- return SortingPolicy->DetachReadyGranules(Result);
-}
-
-const std::shared_ptr<NKikimr::NOlap::TActorBasedMemoryAccesor>& TGranulesFillingContext::GetMemoryAccessor() const {
- return Owner.GetMemoryAccessor();
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
deleted file mode 100644
index 04e6cd01e7..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ /dev/null
@@ -1,140 +0,0 @@
-#pragma once
-#include "conveyor_task.h"
-#include "granule.h"
-#include "processing_context.h"
-#include "order_control/abstract.h"
-#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
-#include <util/generic/hash.h>
-
-namespace NKikimr::NOlap {
-class TIndexedReadData;
-}
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TGranulesFillingContext: TNonCopyable, public NColumnShard::TMonitoringObjectsCounter<TGranulesFillingContext, true, false> {
-private:
- YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames);
- TReadMetadata::TConstPtr ReadMetadata;
- bool StartedFlag = false;
- const bool InternalReading = false;
- TProcessingController Processing;
- TResultController Result;
- std::shared_ptr<TGranulesLiveControl> GranulesLiveContext;
- TIndexedReadData& Owner;
- std::set<ui32> EarlyFilterColumns;
- std::set<ui32> PostFilterColumns;
- std::set<ui32> FilterStageColumns;
- std::set<ui32> UsedColumns;
- IOrderPolicy::TPtr SortingPolicy;
- NColumnShard::TConcreteScanCounters Counters;
- bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const;
-
- bool CheckBufferAvailable() const;
-public:
- TIndexedReadData& GetOwner() {
- return Owner;
- }
-
- std::shared_ptr<TGranulesLiveControl> GetGranulesLiveContext() const {
- return GranulesLiveContext;
- }
- bool IsGranuleActualForProcessing(const ui64 granuleId) const {
- return Processing.IsGranuleActualForProcessing(granuleId);
- }
- bool ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range);
- bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range, const bool hasReadyResults);
- TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading);
-
- const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const;
-
- TString DebugString() const {
- return TStringBuilder()
- << "processing:(" << Processing.DebugString() << ");"
- << "result:(" << Result.DebugString() << ");"
- << "sorting_policy:(" << SortingPolicy->DebugString() << ");"
- ;
- }
-
- void OnBlobReady(const ui64 /*granuleId*/, const TBlobRange& /*range*/) noexcept {
- }
-
- TReadMetadata::TConstPtr GetReadMetadata() const noexcept {
- return ReadMetadata;
- }
-
- const std::set<ui32>& GetEarlyFilterColumns() const noexcept {
- return EarlyFilterColumns;
- }
-
- const std::set<ui32>& GetPostFilterColumns() const noexcept {
- return PostFilterColumns;
- }
-
- IOrderPolicy::TPtr GetSortingPolicy() const noexcept {
- return SortingPolicy;
- }
-
- const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept {
- return Counters;
- }
-
- NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const;
-
- void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
- NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address);
- NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address);
-
- void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
- void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
-
- TGranule::TPtr GetGranuleVerified(const ui64 granuleId) {
- return Processing.GetGranuleVerified(granuleId);
- }
-
- bool IsFinished() const {
- return Processing.IsFinished() && !Result.GetCount();
- }
-
- void OnNewBatch(TBatch& batch) {
- Y_VERIFY(!StartedFlag);
- if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) {
- batch.ResetNoFilter(FilterStageColumns);
- } else {
- batch.ResetNoFilter(UsedColumns);
- }
- }
-
- std::vector<TGranule::TPtr> DetachReadyInOrder();
-
- void Abort() {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort");
- Processing.Abort();
- Result.Clear();
- Y_VERIFY(IsFinished());
- }
-
- TGranule::TPtr UpsertGranule(const ui64 granuleId) {
- Y_VERIFY(!StartedFlag);
- auto g = Processing.GetGranule(granuleId);
- if (!g) {
- return Processing.InsertGranule(std::make_shared<TGranule>(granuleId, *this));
- } else {
- return g;
- }
- }
-
- void OnGranuleReady(const ui64 granuleId);
-
- void Wakeup(TGranule& granule) {
- SortingPolicy->Wakeup(granule, *this);
- }
-
- void PrepareForStart() {
- Y_VERIFY(!StartedFlag);
- StartedFlag = true;
- SortingPolicy->Fill(*this);
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
deleted file mode 100644
index 972046bffa..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ /dev/null
@@ -1,69 +0,0 @@
-#include "filter_assembler.h"
-#include <ydb/core/tx/columnshard/engines/filter.h>
-#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-bool TAssembleFilter::DoExecuteImpl() {
- /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey.
- /// It's not OK to apply predicate before replacing key duplicates otherwise.
- /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
-
- TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- options.IncludedColumnIds = FilterColumnIds;
- auto batch = BatchConstructor.Assemble(options);
- Y_VERIFY(batch);
- Y_VERIFY(batch->num_rows());
- OriginalCount = batch->num_rows();
- Filter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata));
- if (!Filter->Apply(batch)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount)("columns_count", FilterColumnIds.size());
- FilteredBatch = nullptr;
- return true;
- }
- auto earlyFilter = ReadMetadata->GetProgram().BuildEarlyFilter(batch);
- if (earlyFilter) {
- if (AllowEarlyFilter) {
- Filter = std::make_shared<NArrow::TColumnFilter>(Filter->CombineSequentialAnd(*earlyFilter));
- if (!earlyFilter->Apply(batch)) {
- NYDBTest::TControllers::GetColumnShardController()->OnAfterFilterAssembling(batch);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount)("columns_count", FilterColumnIds.size());;
- FilteredBatch = nullptr;
- return true;
- } else {
- NYDBTest::TControllers::GetColumnShardController()->OnAfterFilterAssembling(batch);
- }
- } else if (BatchesOrderPolicy->NeedNotAppliedEarlyFilter()) {
- EarlyFilter = earlyFilter;
- }
- }
-
- if ((size_t)batch->schema()->num_fields() < BatchConstructor.GetColumnsCount()) {
- TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- options.ExcludedColumnIds = FilterColumnIds;
- auto addBatch = BatchConstructor.Assemble(options);
- Y_VERIFY(addBatch);
- Y_VERIFY(Filter->Apply(addBatch));
- Y_VERIFY(NArrow::MergeBatchColumns({ batch, addBatch }, batch, BatchConstructor.GetSchemaColumnNames(), true));
- }
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")
- ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter)
- ("filter_columns", FilterColumnIds.size());
-
- FilteredBatch = batch;
- return true;
-}
-
-bool TAssembleFilter::DoApply(IDataReader& owner) const {
- Y_VERIFY(OriginalCount);
- auto& reader = owner.GetMeAs<TIndexedReadData>();
- reader.GetCounters().OriginalRowsCount->Add(OriginalCount);
- reader.GetCounters().AssembleFilterCount->Add(1);
- TBatch* batch = reader.GetGranulesContext().GetBatchInfo(BatchAddress);
- if (batch) {
- batch->InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter);
- }
- return true;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
deleted file mode 100644
index 0422a62051..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#pragma once
-#include "common.h"
-#include "conveyor_task.h"
-
-#include <ydb/core/formats/arrow/arrow_filter.h>
-#include <ydb/core/tx/columnshard/engines/portion_info.h>
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
-#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
-
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
- class TAssembleFilter: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TAssembleFilter, true, true> {
- private:
- using TBase = NColumnShard::IDataTasksProcessor::ITask;
- TPortionInfo::TPreparedBatchData BatchConstructor;
- std::shared_ptr<arrow::RecordBatch> FilteredBatch;
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
- std::shared_ptr<NArrow::TColumnFilter> Filter;
- std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
- const TBatchAddress BatchAddress;
- ui32 OriginalCount = 0;
- bool AllowEarlyFilter = false;
- std::set<ui32> FilterColumnIds;
- IOrderPolicy::TPtr BatchesOrderPolicy;
- protected:
- virtual bool DoApply(IDataReader& owner) const override;
- virtual bool DoExecuteImpl() override;
- public:
-
- virtual TString GetTaskClassIdentifier() const override {
- return "Reading::TAssembleFilter";
- }
-
- TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
- TBatch& batch, const std::set<ui32>& filterColumnIds, NColumnShard::IDataTasksProcessor::TPtr processor,
- IOrderPolicy::TPtr batchesOrderPolicy)
- : TBase(processor)
- , BatchConstructor(batchConstructor)
- , ReadMetadata(readMetadata)
- , BatchAddress(batch.GetBatchAddress())
- , AllowEarlyFilter(batch.AllowEarlyFilter())
- , FilterColumnIds(filterColumnIds)
- , BatchesOrderPolicy(batchesOrderPolicy)
- {
- TBase::SetPriority(TBase::EPriority::Normal);
- }
- };
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
deleted file mode 100644
index ed8f5c4053..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ /dev/null
@@ -1,187 +0,0 @@
-#include "granule.h"
-#include "granule_preparation.h"
-#include "filling_context.h"
-#include <ydb/core/tx/columnshard/engines/portion_info.h>
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
-#include <ydb/core/tx/columnshard/engines/filter.h>
-#include <ydb/core/tx/columnshard/engines/index_info.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
- if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) {
- return;
- }
- Y_VERIFY(!ReadyFlag);
- Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchAddress().GetBatchGranuleIdx()));
- if (InConstruction) {
- GranuleDataSize.Take(batchInfo.GetRealBatchSizeVerified());
- GranuleDataSize.Free(batchInfo.GetPredictedBatchSize());
- RawDataSizeReal += batchInfo.GetRealBatchSizeVerified();
- }
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)
- ("batch_address", batchInfo.GetBatchAddress().ToString())("count", WaitBatches.size())("in_construction", InConstruction);
- if (batch && batch->num_rows()) {
- RecordBatches.emplace_back(batch);
-
- auto& indexInfo = Owner->GetReadMetadata()->GetIndexInfo();
- if (!batchInfo.IsDuplicationsAvailable()) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey(), false));
- } else {
- AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion_on_ready");
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, indexInfo.GetReplaceKey(), false));
- Y_VERIFY(IsDuplicationsAvailable());
- BatchesToDedup.insert(batch.get());
- }
- }
- Owner->OnBatchReady(batchInfo, batch);
- CheckReady();
-}
-
-TBatch& TGranule::RegisterBatchForFetching(TPortionInfo&& portionInfo) {
- const ui64 batchSize = portionInfo.GetRawBytes(Owner->GetReadMetadata()->GetAllColumns());
- RawDataSize += batchSize;
- const ui64 filtersSize = portionInfo.NumRows() * (8 + 8);
- RawDataSize += filtersSize;
- ACFL_DEBUG("event", "RegisterBatchForFetching")
- ("columns_count", Owner->GetReadMetadata()->GetAllColumns().size())("batch_raw_size", batchSize)("granule_size", RawDataSize)
- ("filter_size", filtersSize);
-
- Y_VERIFY(!ReadyFlag);
- ui32 batchGranuleIdx = Batches.size();
- WaitBatches.emplace(batchGranuleIdx);
- Batches.emplace_back(TBatchAddress(GranuleId, batchGranuleIdx), *this, std::move(portionInfo), batchSize);
- Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second);
- Owner->OnNewBatch(Batches.back());
- return Batches.back();
-}
-
-void TGranule::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const {
- Owner->AddBlobForFetch(range, batch);
-}
-
-const std::set<ui32>& TGranule::GetEarlyFilterColumns() const {
- return Owner->GetEarlyFilterColumns();
-}
-
-bool TGranule::OnFilterReady(TBatch& batchInfo) {
- if (ReadyFlag) {
- return false;
- }
- return Owner->GetSortingPolicy()->OnFilterReady(batchInfo, *this, *Owner);
-}
-
-std::deque<TGranule::TBatchForMerge> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata) {
- std::deque<TBatchForMerge> batches;
- for (auto&& i : Batches) {
- std::shared_ptr<TSortableBatchPosition> from;
- std::shared_ptr<TSortableBatchPosition> to;
- i.GetPKBorders(reverse, readMetadata->GetIndexInfo(), from, to);
- batches.emplace_back(TBatchForMerge(&i, from, to));
- }
- std::sort(batches.begin(), batches.end());
- ui32 currentPoolId = 0;
- std::map<TSortableBatchPosition, ui32> poolIds;
- for (auto&& i : batches) {
- if (!i.GetFrom()) {
- Y_VERIFY(!currentPoolId);
- continue;
- }
- auto it = poolIds.rbegin();
- for (; it != poolIds.rend(); ++it) {
- if (it->first.Compare(*i.GetFrom()) < 0) {
- break;
- }
- }
- if (it != poolIds.rend()) {
- i.SetPoolId(it->second);
- poolIds.erase(it->first);
- } else {
- i.SetPoolId(++currentPoolId);
- }
- if (i.GetTo()) {
- poolIds.emplace(*i.GetTo(), *i.GetPoolId());
- }
- }
- return batches;
-}
-
-void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
- if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) {
- return;
- }
- Y_VERIFY(!ReadyFlag);
- Y_VERIFY(!NotIndexedBatchReadyFlag || !batch);
- if (!NotIndexedBatchReadyFlag) {
- ACFL_TRACE("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size());
- } else {
- return;
- }
- NotIndexedBatchReadyFlag = true;
- if (batch && batch->num_rows()) {
- GranuleDataSize.Take(NArrow::GetBatchDataSize(batch));
- Y_VERIFY(!NotIndexedBatch);
- NotIndexedBatch = batch;
- if (NotIndexedBatch) {
- RecordBatches.emplace_back(NotIndexedBatch);
- }
- NotIndexedBatchFutureFilter = Owner->GetReadMetadata()->GetProgram().BuildEarlyFilter(batch);
- DuplicationsAvailableFlag = true;
- }
- CheckReady();
- Owner->Wakeup(*this);
-}
-
-void TGranule::OnGranuleDataPrepared(std::vector<std::shared_ptr<arrow::RecordBatch>>&& data) {
- ReadyFlag = true;
- RecordBatches = data;
- Owner->OnGranuleReady(GranuleId);
-}
-
-void TGranule::CheckReady() {
- if (WaitBatches.empty() && NotIndexedBatchReadyFlag) {
-
- if (RecordBatches.empty() || !IsDuplicationsAvailable()) {
- ReadyFlag = true;
- ACFL_DEBUG("event", "granule_ready")("predicted_size", RawDataSize)("real_size", RawDataSizeReal);
- Owner->OnGranuleReady(GranuleId);
- } else {
- ACFL_DEBUG("event", "granule_preparation")("predicted_size", RawDataSize)("real_size", RawDataSizeReal);
- std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = std::move(RecordBatches);
- auto processor = Owner->GetTasksProcessor();
- processor.Add(Owner->GetOwner(), std::make_shared<TTaskGranulePreparation>(std::move(inGranule), std::move(BatchesToDedup), GranuleId, Owner->GetReadMetadata(), processor.GetObject()));
- }
- }
-}
-
-void TGranule::OnBlobReady(const TBlobRange& range) noexcept {
- Y_VERIFY(InConstruction);
- if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) {
- return;
- }
- Y_VERIFY(!ReadyFlag);
- Owner->OnBlobReady(GranuleId, range);
-}
-
-TGranule::~TGranule() {
- if (InConstruction) {
- LiveController->Dec();
- }
-}
-
-TGranule::TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
- : GranuleId(granuleId)
- , LiveController(owner.GetGranulesLiveContext())
- , Owner(&owner)
- , GranuleDataSize(Owner->GetMemoryAccessor(), Owner->GetCounters().Aggregations.GetGranulesProcessing())
-{
-
-}
-
-void TGranule::StartConstruction() {
- InConstruction = true;
- LiveController->Inc();
- GranuleDataSize.Take(RawDataSize);
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
deleted file mode 100644
index 41de01f2be..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ /dev/null
@@ -1,159 +0,0 @@
-#pragma once
-#include "batch.h"
-#include "read_metadata.h"
-
-#include <ydb/library/accessor/accessor.h>
-#include <ydb/core/tx/columnshard/engines/portion_info.h>
-
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TGranulesFillingContext;
-class TGranulesLiveControl {
-private:
- TAtomicCounter GranulesCounter = 0;
-public:
- i64 GetCount() const {
- return GranulesCounter.Val();
- }
-
- void Inc() {
- GranulesCounter.Inc();
- }
- void Dec() {
- Y_VERIFY(GranulesCounter.Dec() >= 0);
- }
-};
-
-class TGranule {
-public:
- using TPtr = std::shared_ptr<TGranule>;
-private:
- ui64 GranuleId = 0;
- ui64 RawDataSize = 0;
- ui64 RawDataSizeReal = 0;
-
- bool NotIndexedBatchReadyFlag = false;
- bool InConstruction = false;
- std::shared_ptr<arrow::RecordBatch> NotIndexedBatch;
- std::shared_ptr<NArrow::TColumnFilter> NotIndexedBatchFutureFilter;
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> RecordBatches;
- bool DuplicationsAvailableFlag = false;
- bool ReadyFlag = false;
- std::deque<TBatch> Batches;
- std::set<ui32> WaitBatches;
- std::set<ui32> GranuleBatchNumbers;
- std::shared_ptr<TGranulesLiveControl> LiveController;
- TGranulesFillingContext* Owner = nullptr;
- THashSet<const void*> BatchesToDedup;
-
- TScanMemoryLimiter::TGuard GranuleDataSize;
- void CheckReady();
-public:
- TGranule(const ui64 granuleId, TGranulesFillingContext& owner);
- ~TGranule();
-
- ui64 GetGranuleId() const noexcept {
- return GranuleId;
- }
-
- void OnGranuleDataPrepared(std::vector<std::shared_ptr<arrow::RecordBatch>>&& data);
-
- const THashSet<const void*>& GetBatchesToDedup() const noexcept {
- return BatchesToDedup;
- }
-
- const std::shared_ptr<arrow::RecordBatch>& GetNotIndexedBatch() const noexcept {
- return NotIndexedBatch;
- }
-
- const std::shared_ptr<NArrow::TColumnFilter>& GetNotIndexedBatchFutureFilter() const noexcept {
- return NotIndexedBatchFutureFilter;
- }
-
- bool IsNotIndexedBatchReady() const noexcept {
- return NotIndexedBatchReadyFlag;
- }
-
- bool IsDuplicationsAvailable() const noexcept {
- return DuplicationsAvailableFlag;
- }
-
- void SetDuplicationsAvailable(bool val) noexcept {
- DuplicationsAvailableFlag = val;
- }
-
- bool IsReady() const noexcept {
- return ReadyFlag;
- }
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> GetReadyBatches() const {
- return RecordBatches;
- }
-
- TBatch& GetBatchInfo(const ui32 batchIdx) {
- Y_VERIFY(batchIdx < Batches.size());
- return Batches[batchIdx];
- }
-
- void AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch);
-
- const TGranulesFillingContext& GetOwner() const {
- return *Owner;
- }
-
- TGranulesFillingContext& GetOwner() {
- return *Owner;
- }
-
- class TBatchForMerge {
- private:
- TBatch* Batch = nullptr;
- YDB_ACCESSOR_DEF(std::optional<ui32>, PoolId);
- YDB_ACCESSOR_DEF(std::shared_ptr<TSortableBatchPosition>, From);
- YDB_ACCESSOR_DEF(std::shared_ptr<TSortableBatchPosition>, To);
- public:
- TBatch* operator->() {
- return Batch;
- }
-
- TBatch& operator*() {
- return *Batch;
- }
-
- TBatchForMerge(TBatch* batch, std::shared_ptr<TSortableBatchPosition> from, std::shared_ptr<TSortableBatchPosition> to)
- : Batch(batch)
- , From(from)
- , To(to)
- {
- Y_VERIFY(Batch);
- }
-
- bool operator<(const TBatchForMerge& item) const {
- if (!From && !item.From) {
- return false;
- } else if (From && item.From) {
- return From->Compare(*item.From) == std::partial_ordering::less;
- } else if (!From) {
- return true;
- } else {
- return false;
- }
- }
- };
-
- std::deque<TBatchForMerge> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata);
-
- const std::set<ui32>& GetEarlyFilterColumns() const;
- void StartConstruction();
- void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
- void OnBlobReady(const TBlobRange& range) noexcept;
- bool OnFilterReady(TBatch& batchInfo);
- TBatch& RegisterBatchForFetching(TPortionInfo&& portionInfo);
- void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const;
-
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
deleted file mode 100644
index 14ad92c436..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
+++ /dev/null
@@ -1,89 +0,0 @@
-#include "granule_preparation.h"
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TTaskGranulePreparation::GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo) {
- std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo]
- rangesSlices.reserve(batches.size());
- {
- TMap<NArrow::TReplaceKey, std::vector<std::shared_ptr<arrow::RecordBatch>>> points;
-
- for (auto& batch : batches) {
- auto compositeKey = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
- Y_VERIFY(compositeKey && compositeKey->num_rows() > 0);
- auto keyColumns = std::make_shared<NArrow::TArrayVec>(compositeKey->columns());
-
- NArrow::TReplaceKey min(keyColumns, 0);
- NArrow::TReplaceKey max(keyColumns, compositeKey->num_rows() - 1);
-
- points[min].push_back(batch); // insert start
- points[max].push_back({}); // insert end
- }
-
- int sum = 0;
- for (auto& [key, vec] : points) {
- if (!sum) { // count(start) == count(end), start new range
- rangesSlices.push_back({});
- rangesSlices.back().reserve(batches.size());
- }
-
- for (auto& batch : vec) {
- if (batch) {
- ++sum;
- rangesSlices.back().push_back(batch);
- } else {
- --sum;
- }
- }
- }
- }
- return rangesSlices;
-}
-
-std::vector<std::shared_ptr<arrow::RecordBatch>> TTaskGranulePreparation::SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo, const std::shared_ptr<NArrow::TSortDescription>& description, const THashSet<const void*>& batchesToDedup) {
- auto rangesSlices = GroupInKeyRanges(batches, indexInfo);
-
- // Merge slices in ranges
- std::vector<std::shared_ptr<arrow::RecordBatch>> out;
- out.reserve(rangesSlices.size());
- for (auto& slices : rangesSlices) {
- if (slices.empty()) {
- continue;
- }
-
- // Do not merge slice if it's alone in its key range
- if (slices.size() == 1) {
- auto batch = slices[0];
- if (batchesToDedup.count(batch.get())) {
- NArrow::DedupSortedBatch(batch, description->ReplaceKey, out);
- } else {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey));
- out.push_back(batch);
- }
- continue;
- }
- auto batch = NArrow::CombineSortedBatches(slices, description);
- out.push_back(batch);
- }
-
- return out;
-}
-
-bool TTaskGranulePreparation::DoApply(IDataReader& indexedDataRead) const {
- auto& readData = indexedDataRead.GetMeAs<TIndexedReadData>();
- readData.GetGranulesContext().GetGranuleVerified(GranuleId)->OnGranuleDataPrepared(std::move(BatchesInGranule));
- return true;
-}
-
-bool TTaskGranulePreparation::DoExecuteImpl() {
- auto& indexInfo = ReadMetadata->GetIndexInfo();
- for (auto& batch : BatchesInGranule) {
- Y_VERIFY(batch->num_rows());
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, indexInfo.SortReplaceDescription()->ReplaceKey));
- }
- BatchesInGranule = SpecialMergeSorted(BatchesInGranule, indexInfo, indexInfo.SortReplaceDescription(), BatchesToDedup);
- return true;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h
deleted file mode 100644
index a7ff39f854..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h
+++ /dev/null
@@ -1,46 +0,0 @@
-#pragma once
-#include "conveyor_task.h"
-#include "filling_context.h"
-#include "read_metadata.h"
-#include <ydb/core/formats/arrow/sort_cursor.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TTaskGranulePreparation: public NColumnShard::IDataTasksProcessor::ITask {
-private:
- using TBase = NColumnShard::IDataTasksProcessor::ITask;
- mutable std::vector<std::shared_ptr<arrow::RecordBatch>> BatchesInGranule;
- THashSet<const void*> BatchesToDedup;
- const ui64 GranuleId;
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
-
- static std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>
- GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo);
-
- static std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const TIndexInfo& indexInfo,
- const std::shared_ptr<NArrow::TSortDescription>& description,
- const THashSet<const void*>& batchesToDedup);
-
-protected:
- virtual bool DoApply(IDataReader& indexedDataRead) const override;
- virtual bool DoExecuteImpl() override;
-public:
-
- virtual TString GetTaskClassIdentifier() const override {
- return "Reading::GranulePreparation";
- }
-
- TTaskGranulePreparation(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches, THashSet<const void*>&& batchesToDedup,
- const ui64 granuleId, NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::IDataTasksProcessor::TPtr processor)
- : TBase(processor)
- , BatchesInGranule(std::move(batches))
- , BatchesToDedup(std::move(batchesToDedup))
- , GranuleId(granuleId)
- , ReadMetadata(readMetadata) {
-
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index 0349917f0d..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,24 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(engines-reader-order_control)
-target_link_libraries(engines-reader-order_control PUBLIC
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
-)
-target_sources(engines-reader-order_control PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
-)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index a6f577627b..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,25 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(engines-reader-order_control)
-target_link_libraries(engines-reader-order_control PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
-)
-target_sources(engines-reader-order_control PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
-)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index a6f577627b..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,25 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(engines-reader-order_control)
-target_link_libraries(engines-reader-order_control PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
-)
-target_sources(engines-reader-order_control PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
-)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt
deleted file mode 100644
index f8b31df0c1..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index 0349917f0d..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,24 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(engines-reader-order_control)
-target_link_libraries(engines-reader-order_control PUBLIC
- contrib-libs-cxxsupp
- yutil
- libs-apache-arrow
- ydb-core-protos
- core-formats-arrow
-)
-target_sources(engines-reader-order_control PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
-)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
deleted file mode 100644
index 63e96f8527..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
+++ /dev/null
@@ -1,42 +0,0 @@
-#include "abstract.h"
-#include <ydb/core/tx/columnshard/engines/reader/filling_context.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFillingContext& context) {
- auto& batch = batchOriginal.GetFetchedInfo();
- Y_VERIFY(!!batch.GetFilter());
- if (!batch.GetFilteredRecordsCount()) {
- context.GetCounters().EmptyFilterCount->Add(1);
- context.GetCounters().EmptyFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes());
- context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns()));
- batchOriginal.InitBatch(nullptr);
- } else {
- context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows());
- if (batchOriginal.AskedColumnsAlready(context.GetPostFilterColumns())) {
- context.GetCounters().FilterOnlyCount->Add(1);
- context.GetCounters().FilterOnlyFetchedBytes->Add(batchOriginal.GetFetchedBytes());
- context.GetCounters().FilterOnlyUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
- context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns()));
-
- batchOriginal.InitBatch(batch.GetFilterBatch());
- } else {
- context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes());
- context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
-
- batchOriginal.ResetWithFilter(context.GetPostFilterColumns());
-
- context.GetCounters().TwoPhasesCount->Add(1);
- context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes());
- context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batchOriginal.GetUsefulWaitingBytes());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
- ("filtered_count", batch.GetFilterBatch()->num_rows())
- ("blobs_count", batchOriginal.GetWaitingBlobs().size())
- ("columns_count", batchOriginal.GetCurrentColumnIds()->size())
- ("fetch_size", batchOriginal.GetWaitingBytes())
- ;
- }
- }
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h
deleted file mode 100644
index 21c0d4ff6b..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h
+++ /dev/null
@@ -1,83 +0,0 @@
-#pragma once
-#include "result.h"
-#include <ydb/core/tx/columnshard/engines/reader/granule.h>
-#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TGranulesFillingContext;
-
-class IOrderPolicy {
-public:
- enum class EFeatures: ui32 {
- CanInterrupt = 1,
- NeedNotAppliedEarlyFilter = 1 << 1
- };
- using TFeatures = ui32;
-protected:
- TReadMetadata::TConstPtr ReadMetadata;
- virtual TString DoDebugString() const = 0;
- virtual void DoFill(TGranulesFillingContext& context) = 0;
- virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) {
- return true;
- }
- virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(TResultController& granulesToOut) = 0;
- virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) {
- OnBatchFilterInitialized(batchInfo, context);
- return true;
- }
-
- void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context);
- virtual TFeatures DoGetFeatures() const {
- return 0;
- }
- TFeatures GetFeatures() const {
- return DoGetFeatures();
- }
-public:
- using TPtr = std::shared_ptr<IOrderPolicy>;
- virtual ~IOrderPolicy() = default;
-
- virtual std::set<ui32> GetFilterStageColumns() {
- return ReadMetadata->GetEarlyFilterColumnIds();
- }
-
- IOrderPolicy(TReadMetadata::TConstPtr readMetadata)
- : ReadMetadata(readMetadata)
- {
-
- }
-
- bool CanInterrupt() const {
- return GetFeatures() & (TFeatures)EFeatures::CanInterrupt;
- }
-
- bool NeedNotAppliedEarlyFilter() const {
- return GetFeatures() & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
- }
-
- bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) {
- return DoOnFilterReady(batchInfo, granule, context);
- }
-
-
- virtual bool ReadyForAddNotIndexedToEnd() const = 0;
-
- std::vector<TGranule::TPtr> DetachReadyGranules(TResultController& granulesToOut) {
- return DoDetachReadyGranules(granulesToOut);
- }
-
- void Fill(TGranulesFillingContext& context) {
- DoFill(context);
- }
-
- bool Wakeup(const TGranule& granule, TGranulesFillingContext& context) {
- return DoWakeup(granule, context);
- }
-
- TString DebugString() const {
- return DoDebugString();
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
deleted file mode 100644
index e3fb33176b..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp
+++ /dev/null
@@ -1,27 +0,0 @@
-#include "default.h"
-#include <ydb/core/tx/columnshard/engines/reader/filling_context.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-void TAnySorting::DoFill(TGranulesFillingContext& context) {
- for (auto&& granule : ReadMetadata->SelectInfo->GetGranulesOrdered(ReadMetadata->IsDescSorted())) {
- TGranule::TPtr g = context.GetGranuleVerified(granule.Granule);
- GranulesOutOrder.emplace_back(g);
- }
-}
-
-std::vector<TGranule::TPtr> TAnySorting::DoDetachReadyGranules(TResultController& granulesToOut) {
- std::vector<TGranule::TPtr> result;
- while (GranulesOutOrder.size()) {
- NIndexedReader::TGranule::TPtr granule = GranulesOutOrder.front();
- if (!granule->IsReady()) {
- break;
- }
- result.emplace_back(granule);
- Y_VERIFY(granulesToOut.ExtractResult(granule->GetGranuleId()));
- GranulesOutOrder.pop_front();
- }
- return result;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.h b/ydb/core/tx/columnshard/engines/reader/order_control/default.h
deleted file mode 100644
index 41ca14d498..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/default.h
+++ /dev/null
@@ -1,27 +0,0 @@
-#pragma once
-#include "abstract.h"
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TAnySorting: public IOrderPolicy {
-private:
- using TBase = IOrderPolicy;
- std::deque<TGranule::TPtr> GranulesOutOrder;
-protected:
- virtual void DoFill(TGranulesFillingContext& context) override;
- virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(TResultController& granulesToOut) override;
- virtual TString DoDebugString() const override {
- return TStringBuilder() << "type=AnySorting;granules_count=" << GranulesOutOrder.size() << ";";
- }
-
-public:
- TAnySorting(TReadMetadata::TConstPtr readMetadata)
- :TBase(readMetadata) {
-
- }
- virtual bool ReadyForAddNotIndexedToEnd() const override {
- return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
deleted file mode 100644
index ff9b2d28a8..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp
+++ /dev/null
@@ -1,14 +0,0 @@
-#include "not_sorted.h"
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-std::vector<TGranule::TPtr> TNonSorting::DoDetachReadyGranules(TResultController& granulesToOut) {
- std::vector<TGranule::TPtr> result;
- result.reserve(granulesToOut.GetCount());
- while (granulesToOut.GetCount()) {
- result.emplace_back(granulesToOut.ExtractFirst());
- }
- return result;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h
deleted file mode 100644
index 91f230191c..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#pragma once
-#include "abstract.h"
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TNonSorting: public IOrderPolicy {
-private:
- using TBase = IOrderPolicy;
-protected:
- virtual TString DoDebugString() const override {
- return TStringBuilder() << "type=NonSorting;";
- }
-
- virtual void DoFill(TGranulesFillingContext& /*context*/) override {
- }
-
- virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(TResultController& granulesToOut) override;
-public:
- TNonSorting(TReadMetadata::TConstPtr readMetadata)
- :TBase(readMetadata)
- {
-
- }
-
- virtual bool ReadyForAddNotIndexedToEnd() const override {
- return true;
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
deleted file mode 100644
index 8ae30314fa..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
+++ /dev/null
@@ -1,123 +0,0 @@
-#include "pk_with_limit.h"
-#include <ydb/core/tx/columnshard/engines/reader/filling_context.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingContext& context) {
- Y_VERIFY(ReadMetadata->Limit);
- if (!CurrentItemsLimit) {
- return false;
- }
- Y_VERIFY(GranulesOutOrderForPortions.size());
- if (GranulesOutOrderForPortions.front().GetGranule()->GetGranuleId() != granule.GetGranuleId()) {
- return false;
- }
- while (GranulesOutOrderForPortions.size() && CurrentItemsLimit) {
- auto& g = GranulesOutOrderForPortions.front();
- // granule have to wait NotIndexedBatch initialization, at first (NotIndexedBatchReadyFlag initialization).
- // other batches will be delivered in OrderedBatches[granuleId] order (not sortable at first in according to granule.SortBatchesByPK)
- if (!g.GetGranule()->IsNotIndexedBatchReady()) {
- break;
- }
- if (g.Start()) {
- ++CountProcessedGranules;
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size());
- MergeStream.AddPoolSource({}, g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter());
- }
- auto& batches = g.GetOrderedBatches();
- while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) {
- TGranule::TBatchForMerge& b = batches.front();
- if (!b.GetPoolId()) {
- ++CountNotSortedPortions;
- } else {
- ++CountBatchesByPools[*b.GetPoolId()];
- }
- ++CountProcessedBatches;
- MergeStream.AddPoolSource(b.GetPoolId(), b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter());
- OnBatchFilterInitialized(*b, context);
- const bool currentHasFrom = !!batches.front().GetFrom();
- batches.pop_front();
- if (batches.size()) {
- if (!batches.front().GetFrom() || !currentHasFrom) {
- continue;
- }
- Y_VERIFY(b.GetFrom()->Compare(*batches.front().GetFrom()) != std::partial_ordering::greater);
- MergeStream.PutControlPoint(batches.front().GetFrom());
- }
- while (CurrentItemsLimit && MergeStream.DrainCurrent()) {
- --CurrentItemsLimit;
- }
- if (MergeStream.ControlPointEnriched()) {
- MergeStream.RemoveControlPoint();
- } else if (batches.size()) {
- Y_VERIFY(!CurrentItemsLimit);
- }
- }
- if (batches.empty()) {
- Y_VERIFY(MergeStream.IsEmpty() || !CurrentItemsLimit);
- GranulesOutOrderForPortions.pop_front();
- } else {
- break;
- }
- }
- while (GranulesOutOrderForPortions.size() && !CurrentItemsLimit) {
- auto& g = GranulesOutOrderForPortions.front();
- g.GetGranule()->AddNotIndexedBatch(nullptr);
- auto& batches = g.GetOrderedBatches();
- while (batches.size()) {
- auto b = batches.front();
- context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns()));
- ++CountSkippedBatches;
- b->InitBatch(nullptr);
- batches.pop_front();
- }
- ++CountSkippedGranules;
- GranulesOutOrderForPortions.pop_front();
- }
- if (GranulesOutOrderForPortions.empty()) {
- if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "stop_on_limit")
- ("limit", ReadMetadata->Limit)("limit_reached", !CurrentItemsLimit)
- ("processed_batches", CountProcessedBatches)("processed_granules", CountProcessedGranules)
- ("skipped_batches", CountSkippedBatches)("skipped_granules", CountSkippedGranules)
- ("pools_count", CountBatchesByPools.size())("bad_pool_size", CountNotSortedPortions)
- ;
- }
- }
- return true;
-}
-
-bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) {
- return Wakeup(granule, context);
-}
-
-void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) {
- for (auto&& granule : ReadMetadata->SelectInfo->GetGranulesOrdered(ReadMetadata->IsDescSorted())) {
- TGranule::TPtr g = context.GetGranuleVerified(granule.Granule);
- GranulesOutOrder.emplace_back(g);
- GranulesOutOrderForPortions.emplace_back(g->SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), g);
- }
-}
-
-std::vector<TGranule::TPtr> TPKSortingWithLimit::DoDetachReadyGranules(TResultController& granulesToOut) {
- std::vector<TGranule::TPtr> result;
- while (GranulesOutOrder.size()) {
- NIndexedReader::TGranule::TPtr granule = GranulesOutOrder.front();
- if (!granule->IsReady()) {
- break;
- }
- result.emplace_back(granule);
- Y_VERIFY(granulesToOut.ExtractResult(granule->GetGranuleId()));
- GranulesOutOrder.pop_front();
- }
- return result;
-}
-
-TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
- : TBase(readMetadata)
- , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), nullptr, readMetadata->IsDescSorted())
-{
- CurrentItemsLimit = ReadMetadata->Limit;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
deleted file mode 100644
index b8cd01b075..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h
+++ /dev/null
@@ -1,78 +0,0 @@
-#pragma once
-#include "abstract.h"
-#include <ydb/core/tx/columnshard/engines/reader/read_filter_merger.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TGranuleOrdered {
-private:
- bool StartedFlag = false;
- std::deque<TGranule::TBatchForMerge> OrderedBatches;
- TGranule::TPtr Granule;
-public:
- bool Start() {
- if (!StartedFlag) {
- StartedFlag = true;
- return true;
- } else {
- return false;
- }
-
- }
-
- TGranuleOrdered(std::deque<TGranule::TBatchForMerge>&& orderedBatches, TGranule::TPtr granule)
- : OrderedBatches(std::move(orderedBatches))
- , Granule(granule)
- {
- }
-
- std::deque<TGranule::TBatchForMerge>& GetOrderedBatches() noexcept {
- return OrderedBatches;
- }
-
- TGranule::TPtr GetGranule() const noexcept {
- return Granule;
- }
-};
-
-class TPKSortingWithLimit: public IOrderPolicy {
-private:
- using TBase = IOrderPolicy;
- std::deque<TGranule::TPtr> GranulesOutOrder;
- std::deque<TGranuleOrdered> GranulesOutOrderForPortions;
- ui32 CurrentItemsLimit = 0;
- THashMap<ui32, ui32> CountBatchesByPools;
- ui32 CountProcessedGranules = 0;
- ui32 CountSkippedBatches = 0;
- ui32 CountProcessedBatches = 0;
- ui32 CountNotSortedPortions = 0;
- ui32 CountSkippedGranules = 0;
- TMergePartialStream MergeStream;
-protected:
- virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override;
- virtual void DoFill(TGranulesFillingContext& context) override;
- virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(TResultController& granulesToOut) override;
- virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override;
- virtual TFeatures DoGetFeatures() const override {
- return (TFeatures)EFeatures::CanInterrupt | (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
- }
-
- virtual TString DoDebugString() const override {
- return TStringBuilder() << "type=PKSortingWithLimit;granules_count=" << GranulesOutOrder.size() << ";limit=" << CurrentItemsLimit << ";";
- }
-public:
- virtual std::set<ui32> GetFilterStageColumns() override {
- std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds();
- for (auto&& i : ReadMetadata->GetPKColumnIds()) {
- result.emplace(i);
- }
- return result;
- }
-
- TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata);
- virtual bool ReadyForAddNotIndexedToEnd() const override {
- return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
deleted file mode 100644
index e366ce42a1..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-#include "result.h"
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-TGranule::TPtr TResultController::ExtractFirst() {
- TGranule::TPtr result;
- if (GranulesToOut.size()) {
- result = GranulesToOut.begin()->second;
- GranulesToOut.erase(GranulesToOut.begin());
- }
- return result;
-}
-
-void TResultController::AddResult(TGranule::TPtr granule) {
- Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second);
- Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second);
-}
-
-TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) {
- auto it = GranulesToOut.find(granuleId);
- if (it == GranulesToOut.end()) {
- return nullptr;
- }
- TGranule::TPtr result = it->second;
- GranulesToOut.erase(it);
- return result;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.h b/ydb/core/tx/columnshard/engines/reader/order_control/result.h
deleted file mode 100644
index 84a4366749..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/result.h
+++ /dev/null
@@ -1,45 +0,0 @@
-#pragma once
-#include <ydb/core/tx/columnshard/engines/reader/granule.h>
-#include <ydb/core/tx/columnshard/counters/scan.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TResultController {
-protected:
- THashMap<ui64, TGranule::TPtr> GranulesToOut;
- std::set<ui64> ReadyGranulesAccumulator;
- const NColumnShard::TConcreteScanCounters Counters;
-public:
- TString DebugString() const {
- return TStringBuilder()
- << "to_out:" << GranulesToOut.size() << ";"
- << "ready:" << ReadyGranulesAccumulator.size() << ";"
- ;
- }
-
- TResultController(const NColumnShard::TConcreteScanCounters& counters)
- : Counters(counters)
- {
-
- }
-
- void Clear() {
- GranulesToOut.clear();
- }
-
- bool IsReady(const ui64 granuleId) const {
- return ReadyGranulesAccumulator.contains(granuleId);
- }
-
- ui32 GetCount() const {
- return GranulesToOut.size();
- }
-
- TGranule::TPtr ExtractFirst();
-
- void AddResult(TGranule::TPtr granule);
-
- TGranule::TPtr ExtractResult(const ui64 granuleId);
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/ya.make b/ydb/core/tx/columnshard/engines/reader/order_control/ya.make
deleted file mode 100644
index a49000742a..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/order_control/ya.make
+++ /dev/null
@@ -1,17 +0,0 @@
-LIBRARY()
-
-SRCS(
- abstract.cpp
- not_sorted.cpp
- pk_with_limit.cpp
- result.cpp
- default.cpp
-)
-
-PEERDIR(
- contrib/libs/apache/arrow
- ydb/core/protos
- ydb/core/formats/arrow
-)
-
-END()
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt
index a8df8ca6eb..5d1bd47355 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt
@@ -22,4 +22,5 @@ target_sources(engines-reader-plain_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt
index 8d181e1b18..ef7f01c766 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt
@@ -23,4 +23,5 @@ target_sources(engines-reader-plain_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt
index 8d181e1b18..ef7f01c766 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt
@@ -23,4 +23,5 @@ target_sources(engines-reader-plain_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt
index a8df8ca6eb..5d1bd47355 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt
@@ -22,4 +22,5 @@ target_sources(engines-reader-plain_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
index 3d8a7ae3dd..baba224ae9 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
@@ -21,13 +21,8 @@ bool TAssembleBatch::DoExecuteImpl() {
return true;
}
-bool TAssemblePKBatch::DoApply(IDataReader& owner) const {
- owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitPK(Result);
- return true;
-}
-
bool TAssembleFFBatch::DoApply(IDataReader& owner) const {
- owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitFF(Result);
+ owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitFetchStageData(Result);
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
index 0a746a9939..31d0f2b60a 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
@@ -25,15 +25,6 @@ public:
const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter, const NColumnShard::IDataTasksProcessor::TPtr& processor);
};
-class TAssemblePKBatch: public TAssembleBatch {
-private:
- using TBase = TAssembleBatch;
-protected:
- virtual bool DoApply(IDataReader& owner) const override;
-public:
- using TBase::TBase;
-};
-
class TAssembleFFBatch: public TAssembleBatch {
private:
using TBase = TAssembleBatch;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
new file mode 100644
index 0000000000..99db988f36
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
@@ -0,0 +1,13 @@
+#include "columns_set.h"
+#include <util/string/join.h>
+
+namespace NKikimr::NOlap::NPlainReader {
+
+TString TColumnsSet::DebugString() const {
+ return TStringBuilder() << "("
+ << "column_ids=" << JoinSeq(",", ColumnIds) << ";"
+ << "column_names=" << JoinSeq(",", ColumnNames) << ";"
+ << ");";
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
new file mode 100644
index 0000000000..edf4a7e191
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
@@ -0,0 +1,110 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+
+namespace NKikimr::NOlap::NPlainReader {
+
+class TColumnsSet {
+private:
+ YDB_READONLY_DEF(std::set<ui32>, ColumnIds);
+ YDB_READONLY_DEF(std::set<TString>, ColumnNames);
+ mutable std::optional<std::vector<TString>> ColumnNamesVector;
+ YDB_READONLY_DEF(std::shared_ptr<arrow::Schema>, Schema);
+public:
+ TColumnsSet() = default;
+
+ const std::vector<TString>& GetColumnNamesVector() const {
+ if (!ColumnNamesVector) {
+ ColumnNamesVector = std::vector<TString>(ColumnNames.begin(), ColumnNames.end());
+ }
+ return *ColumnNamesVector;
+ }
+
+ ui32 GetSize() const {
+ return ColumnIds.size();
+ }
+
+ bool ColumnsOnly(const std::vector<std::string>& fieldNames) const {
+ if (fieldNames.size() != GetSize()) {
+ return false;
+ }
+ std::set<std::string> fieldNamesSet;
+ for (auto&& i : fieldNames) {
+ if (!fieldNamesSet.emplace(i).second) {
+ return false;
+ }
+ if (!ColumnNames.contains(TString(i.data(), i.size()))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ TColumnsSet(const std::set<ui32>& columnIds, const TIndexInfo& indexInfo) {
+ ColumnIds = columnIds;
+ Schema = indexInfo.GetColumnsSchema(ColumnIds);
+ for (auto&& i : ColumnIds) {
+ ColumnNames.emplace(indexInfo.GetColumnName(i));
+ }
+ }
+
+ TColumnsSet(const std::vector<ui32>& columnIds, const TIndexInfo& indexInfo) {
+ for (auto&& i : columnIds) {
+ Y_VERIFY(ColumnIds.emplace(i).second);
+ ColumnNames.emplace(indexInfo.GetColumnName(i));
+ }
+ Schema = indexInfo.GetColumnsSchema(ColumnIds);
+ }
+
+ bool Contains(const std::shared_ptr<TColumnsSet>& columnsSet) const {
+ if (!columnsSet) {
+ return true;
+ }
+ return Contains(*columnsSet);
+ }
+
+ bool Contains(const TColumnsSet& columnsSet) const {
+ for (auto&& i : columnsSet.ColumnIds) {
+ if (!ColumnIds.contains(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ TString DebugString() const;
+
+ TColumnsSet operator+(const TColumnsSet& external) const {
+ TColumnsSet result = *this;
+ result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end());
+ result.ColumnNames.insert(external.ColumnNames.begin(), external.ColumnNames.end());
+ auto fields = result.Schema->fields();
+ for (auto&& i : external.Schema->fields()) {
+ if (!result.Schema->GetFieldByName(i->name())) {
+ fields.emplace_back(i);
+ }
+ }
+ result.Schema = std::make_shared<arrow::Schema>(fields);
+ return result;
+ }
+
+ TColumnsSet operator-(const TColumnsSet& external) const {
+ TColumnsSet result = *this;
+ for (auto&& i : external.ColumnIds) {
+ result.ColumnIds.erase(i);
+ }
+ for (auto&& i : external.ColumnNames) {
+ result.ColumnNames.erase(i);
+ }
+ arrow::FieldVector fields;
+ for (auto&& i : Schema->fields()) {
+ if (!external.Schema->GetFieldByName(i->name())) {
+ fields.emplace_back(i);
+ }
+ }
+ result.Schema = std::make_shared<arrow::Schema>(fields);
+ return result;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
index 0e16286d48..ddfa7c12be 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
@@ -21,7 +21,7 @@ TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAsse
void TEFTaskConstructor::DoOnDataReady(IDataReader& reader) {
reader.GetContext().MutableProcessor().Add(reader, std::make_shared<TAssembleFilter>(BuildBatchAssembler(reader),
- reader.GetReadMetadata(), SourceIdx, ColumnIds, reader.GetContext().GetProcessor().GetObject(), false));
+ reader.GetReadMetadata(), SourceIdx, ColumnIds, reader.GetContext().GetProcessor().GetObject(), UseEarlyFilter));
}
void TFFColumnsTaskConstructor::DoOnDataReady(IDataReader& reader) {
@@ -29,9 +29,4 @@ void TFFColumnsTaskConstructor::DoOnDataReady(IDataReader& reader) {
SourceIdx, AppliedFilter, reader.GetContext().GetProcessor().GetObject()));
}
-void TPKColumnsTaskConstructor::DoOnDataReady(IDataReader& reader) {
- reader.GetContext().MutableProcessor().Add(reader, std::make_shared<TAssemblePKBatch>(BuildBatchAssembler(reader),
- SourceIdx, AppliedFilter, reader.GetContext().GetProcessor().GetObject()));
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
index 07ea259a73..71c892777f 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
@@ -11,13 +11,15 @@ private:
IDataReader& Reader;
bool Started = false;
protected:
- THashMap<TBlobRange, TColumnRecord> RecordsByBlobRange;
- THashMap<TBlobRange, TString> Data;
+ THashSet<TBlobRange> WaitingData;
+ THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
virtual void DoOnDataReady(IDataReader& reader) = 0;
void OnDataReady(IDataReader& reader) {
- Constructed = true;
- return DoOnDataReady(reader);
+ if (WaitingData.empty()) {
+ Constructed = true;
+ return DoOnDataReady(reader);
+ }
}
public:
IFetchTaskConstructor(IDataReader& reader)
@@ -28,9 +30,7 @@ public:
void StartDataWaiting() {
Started = true;
- if (Data.size() == RecordsByBlobRange.size()) {
- OnDataReady(Reader);
- }
+ OnDataReady(Reader);
}
void Abort() {
@@ -41,17 +41,21 @@ public:
Y_VERIFY(Constructed);
}
- void AddChunk(const TColumnRecord& rec) {
+ void AddWaitingRecord(const TColumnRecord& rec) {
Y_VERIFY(!Started);
- Y_VERIFY(RecordsByBlobRange.emplace(rec.BlobRange, rec).second);
+ Y_VERIFY(WaitingData.emplace(rec.BlobRange).second);
}
void AddData(const TBlobRange& range, TString&& data) {
Y_VERIFY(Started);
+ Y_VERIFY(WaitingData.erase(range));
Y_VERIFY(Data.emplace(range, std::move(data)).second);
- if (Data.size() == RecordsByBlobRange.size()) {
- OnDataReady(Reader);
- }
+ OnDataReady(Reader);
+ }
+
+ void AddNullData(const TBlobRange& range, const ui32 rowsCount) {
+ Y_VERIFY(!Started);
+ Y_VERIFY(Data.emplace(range, rowsCount).second);
}
};
@@ -82,31 +86,21 @@ private:
public:
TFFColumnsTaskConstructor(const std::set<ui32>& columnIds, const TPortionDataSource& portion, IDataReader& reader)
: TBase(columnIds, portion, reader)
- , AppliedFilter(portion.GetEFData().GetAppliedFilter())
+ , AppliedFilter(portion.GetFilterStageData().GetAppliedFilter())
{
}
};
-class TPKColumnsTaskConstructor: public TAssembleColumnsTaskConstructor {
-private:
- using TBase = TAssembleColumnsTaskConstructor;
- std::shared_ptr<NArrow::TColumnFilter> AppliedFilter;
- virtual void DoOnDataReady(IDataReader& reader) override;
-public:
- TPKColumnsTaskConstructor(const std::set<ui32>& columnIds, const TPortionDataSource& portion, IDataReader& reader)
- : TBase(columnIds, portion, reader)
- , AppliedFilter(portion.GetEFData().GetAppliedFilter()) {
- }
-};
-
class TEFTaskConstructor: public TAssembleColumnsTaskConstructor {
private:
+ bool UseEarlyFilter = false;
using TBase = TAssembleColumnsTaskConstructor;
virtual void DoOnDataReady(IDataReader& reader) override;
public:
- TEFTaskConstructor(const std::set<ui32>& columnIds, const TPortionDataSource& portion, IDataReader& reader)
- : TBase(columnIds, portion, reader) {
- Y_VERIFY(!portion.HasEFData());
+ TEFTaskConstructor(const std::set<ui32>& columnIds, const TPortionDataSource& portion, IDataReader& reader, const bool useEarlyFilter)
+ : TBase(columnIds, portion, reader)
+ , UseEarlyFilter(useEarlyFilter)
+ {
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
index ea9f51c6a0..f038d81d47 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
@@ -16,13 +16,25 @@ public:
}
};
-class TEarlyFilterData: public TFetchedData {
+class TFilterStageData: public TFetchedData {
private:
using TBase = TFetchedData;
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, AppliedFilter);
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedEarlyFilter);
public:
- TEarlyFilterData(std::shared_ptr<NArrow::TColumnFilter> appliedFilter, std::shared_ptr<NArrow::TColumnFilter> earlyFilter, std::shared_ptr<arrow::RecordBatch> batch)
+ bool IsEmptyFilter() const {
+ return (AppliedFilter && AppliedFilter->IsTotalDenyFilter()) || (NotAppliedEarlyFilter && NotAppliedEarlyFilter->IsTotalDenyFilter());
+ }
+
+ std::shared_ptr<NArrow::TColumnFilter> GetActualFilter() const {
+ if (NotAppliedEarlyFilter) {
+ return NotAppliedEarlyFilter;
+ } else {
+ return AppliedFilter;
+ }
+ }
+
+ TFilterStageData(std::shared_ptr<NArrow::TColumnFilter> appliedFilter, std::shared_ptr<NArrow::TColumnFilter> earlyFilter, std::shared_ptr<arrow::RecordBatch> batch)
: TBase(batch)
, AppliedFilter(appliedFilter)
, NotAppliedEarlyFilter(earlyFilter)
@@ -31,14 +43,7 @@ public:
}
};
-class TPrimaryKeyData: public TFetchedData {
-private:
- using TBase = TFetchedData;
-public:
- using TBase::TBase;
-};
-
-class TFullData: public TFetchedData {
+class TFetchStageData: public TFetchedData {
private:
using TBase = TFetchedData;
public:
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
index 29284f2f5c..2a37c87504 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
@@ -51,7 +51,7 @@ bool TAssembleFilter::DoExecuteImpl() {
}
bool TAssembleFilter::DoApply(IDataReader& owner) const {
- owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitEF(AppliedFilter, EarlyFilter, FilteredBatch);
+ owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitFilterStageData(AppliedFilter, EarlyFilter, FilteredBatch);
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index 6665565e13..9e58f7e5b7 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -3,49 +3,65 @@
namespace NKikimr::NOlap::NPlainReader {
+bool TFetchingInterval::IsExclusiveSource() const {
+ return IncludeStart && Sources.size() == 1 && IncludeFinish;
+}
+
void TFetchingInterval::ConstructResult() {
- if (Merger && IsSourcesFFReady()) {
+ if (!Merger || !IsSourcesReady()) {
+ return;
+ }
+ if (!IsExclusiveSource()) {
for (auto&& [_, i] : Sources) {
if (i->GetStart().Compare(Start) == std::partial_ordering::equivalent && !i->IsMergingStarted()) {
auto rb = i->GetBatch();
if (rb) {
- Merger->AddPoolSource({}, rb, i->GetEFData().GetNotAppliedEarlyFilter());
+ Merger->AddPoolSource({}, rb, i->GetFilterStageData().GetNotAppliedEarlyFilter());
}
i->StartMerging();
}
}
- Merger->DrainCurrent(RBBuilder, Finish, IncludeFinish);
+ Merger->DrainCurrentTo(*RBBuilder, Finish, IncludeFinish);
Scanner.OnIntervalResult(RBBuilder->Finalize(), GetIntervalIdx());
+ } else {
+ Y_VERIFY(Merger->IsEmpty());
+ Sources.begin()->second->StartMerging();
+ auto batch = Sources.begin()->second->GetBatch();
+ if (batch && batch->num_rows()) {
+ if (Scanner.IsReverse()) {
+ auto permutation = NArrow::MakePermutation(batch->num_rows(), true);
+ batch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(batch, permutation)).record_batch();
+ }
+ batch = NArrow::ExtractExistedColumns(batch, RBBuilder->GetFields());
+ AFL_VERIFY((ui32)batch->num_columns() == RBBuilder->GetFields().size())("batch", batch->num_columns())("builder", RBBuilder->GetFields().size())
+ ("batch_columns", JoinSeq(",", batch->schema()->field_names()))("builder_columns", RBBuilder->GetColumnNames());
+ }
+ Scanner.OnIntervalResult(batch, GetIntervalIdx());
}
}
-void TFetchingInterval::OnSourceEFReady(const ui32 /*sourceIdx*/) {
+void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) {
ConstructResult();
}
-void TFetchingInterval::OnSourcePKReady(const ui32 /*sourceIdx*/) {
- ConstructResult();
-}
-
-void TFetchingInterval::OnSourceFFReady(const ui32 /*sourceIdx*/) {
+void TFetchingInterval::OnSourceFilterStageReady(const ui32 /*sourceIdx*/) {
ConstructResult();
}
void TFetchingInterval::StartMerge(std::shared_ptr<NIndexedReader::TMergePartialStream> merger) {
+ Y_VERIFY(!Merger);
Merger = merger;
-// if (Merger->GetCurrentKeyColumns()) {
-// AFL_VERIFY(Merger->GetCurrentKeyColumns()->Compare(Start) == std::partial_ordering::less)("current", Merger->GetCurrentKeyColumns()->DebugJson())("start", Start.DebugJson());
-// }
ConstructResult();
}
TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, TScanHead& scanner,
- std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish)
+ std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish, const bool includeStart)
: Scanner(scanner)
, Start(start)
, Finish(finish)
, IncludeFinish(includeFinish)
+ , IncludeStart(includeStart)
, Sources(sources)
, IntervalIdx(intervalIdx)
, RBBuilder(builder)
@@ -54,11 +70,7 @@ TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPositio
for (auto&& [_, i] : Sources) {
i->RegisterInterval(this);
Scanner.AddSourceByIdx(i);
- i->NeedEF();
- if (Scanner.GetContext().GetIsInternalRead()) {
- i->NeedPK();
- i->NeedFF();
- }
+ i->InitFetchingPlan(Scanner.GetColumnsFetchingPlan(IsExclusiveSource()));
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
index af60f7bc7c..fc49fc16ba 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
@@ -12,11 +12,13 @@ private:
NIndexedReader::TSortableBatchPosition Start;
NIndexedReader::TSortableBatchPosition Finish;
const bool IncludeFinish = true;
+ const bool IncludeStart = false;
std::map<ui32, std::shared_ptr<IDataSource>> Sources;
YDB_READONLY(ui32, IntervalIdx, 0);
std::shared_ptr<NIndexedReader::TMergePartialStream> Merger;
std::shared_ptr<NIndexedReader::TRecordBatchBuilder> RBBuilder;
+ bool IsExclusiveSource() const;
void ConstructResult();
IDataSource& GetSourceVerified(const ui32 idx) {
@@ -25,9 +27,9 @@ private:
return *it->second;
}
- bool IsSourcesFFReady() {
+ bool IsSourcesReady() {
for (auto&& [_, s] : Sources) {
- if (!s->HasFFData()) {
+ if (!s->IsDataReady()) {
return false;
}
}
@@ -60,15 +62,14 @@ public:
return !!Merger;
}
- void OnSourceEFReady(const ui32 sourceIdx);
- void OnSourcePKReady(const ui32 sourceIdx);
- void OnSourceFFReady(const ui32 /*sourceIdx*/);
+ void OnSourceFetchStageReady(const ui32 sourceIdx);
+ void OnSourceFilterStageReady(const ui32 sourceIdx);
void StartMerge(std::shared_ptr<NIndexedReader::TMergePartialStream> merger);
TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, TScanHead& scanner,
- std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish);
+ std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish, const bool includeStart);
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
index f70f056c91..21f0d065a3 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
@@ -4,33 +4,17 @@ namespace NKikimr::NOlap::NPlainReader {
TPlainReadData::TPlainReadData(TReadMetadata::TConstPtr readMetadata, const TReadContext& context)
: TBase(context, readMetadata)
+ , EFColumns(std::make_shared<TColumnsSet>(GetReadMetadata()->GetEarlyFilterColumnIds(), GetReadMetadata()->GetIndexInfo()))
+ , PKColumns(std::make_shared<TColumnsSet>(GetReadMetadata()->GetPKColumnIds(), GetReadMetadata()->GetIndexInfo()))
+ , FFColumns(std::make_shared<TColumnsSet>(GetReadMetadata()->GetAllColumns(), GetReadMetadata()->GetIndexInfo()))
+ , TrivialEFFlag(EFColumns->ColumnsOnly(GetReadMetadata()->GetIndexInfo().ArrowSchemaSnapshot()->field_names()))
{
- EFColumnIds = GetReadMetadata()->GetEarlyFilterColumnIds();
- PKColumnIds = GetReadMetadata()->GetPKColumnIds();
- FFColumnIds = GetReadMetadata()->GetUsedColumnIds();
- for (auto&& i : EFColumnIds) {
- PKColumnIds.erase(i);
- FFColumnIds.erase(i);
- }
- for (auto&& i : PKColumnIds) {
- FFColumnIds.erase(i);
- }
- if (context.GetIsInternalRead()) {
- EFColumnIds.insert(PKColumnIds.begin(), PKColumnIds.end());
- EFColumnIds.insert(FFColumnIds.begin(), FFColumnIds.end());
- PKColumnIds.clear();
- FFColumnIds.clear();
- }
- for (auto&& i : EFColumnIds) {
- EFColumnNames.emplace_back(GetReadMetadata()->GetIndexInfo().GetColumnName(i));
- }
- for (auto&& i : PKColumnIds) {
- PKColumnNames.emplace_back(GetReadMetadata()->GetIndexInfo().GetColumnName(i));
- }
- for (auto&& i : FFColumnIds) {
- FFColumnNames.emplace_back(GetReadMetadata()->GetIndexInfo().GetColumnName(i));
- }
+ PKFFColumns = std::make_shared<TColumnsSet>(*PKColumns + *FFColumns);
+ EFPKColumns = std::make_shared<TColumnsSet>(*EFColumns + *PKColumns);
+ FFMinusEFColumns = std::make_shared<TColumnsSet>(*FFColumns - *EFColumns);
+ FFMinusEFPKColumns = std::make_shared<TColumnsSet>(*FFColumns - *EFColumns - *PKColumns);
+ Y_VERIFY(FFColumns->Contains(EFColumns));
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portionsOrdered = GetReadMetadata()->SelectInfo->GetPortionsOrdered(GetReadMetadata()->IsDescSorted());
@@ -124,4 +108,28 @@ void TPlainReadData::OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch)
}
}
+NKikimr::NOlap::NPlainReader::TFetchingPlan TPlainReadData::GetColumnsFetchingPlan(const bool exclusiveSource) const {
+ if (exclusiveSource) {
+ if (Context.GetIsInternalRead()) {
+ return TFetchingPlan(FFColumns, EmptyColumns, true);
+ } else {
+ if (TrivialEFFlag) {
+ return TFetchingPlan(FFColumns, EmptyColumns, true);
+ } else {
+ return TFetchingPlan(EFColumns, FFMinusEFColumns, true);
+ }
+ }
+ } else {
+ if (GetContext().GetIsInternalRead()) {
+ return TFetchingPlan(PKFFColumns, EmptyColumns, false);
+ } else {
+ if (TrivialEFFlag) {
+ return TFetchingPlan(PKFFColumns, EmptyColumns, false);
+ } else {
+ return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false);
+ }
+ }
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
index d41b405575..5a50f4c8fd 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
@@ -1,9 +1,11 @@
#pragma once
+#include "columns_set.h"
+#include "source.h"
+#include "scanner.h"
+
#include <ydb/core/tx/columnshard/engines/reader/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/queue.h>
-#include "source.h"
-#include "scanner.h"
namespace NKikimr::NOlap::NPlainReader {
@@ -11,12 +13,15 @@ class TPlainReadData: public IDataReader, TNonCopyable {
private:
using TBase = IDataReader;
std::shared_ptr<TScanHead> Scanner;
- YDB_READONLY_DEF(std::set<ui32>, EFColumnIds);
- YDB_READONLY_DEF(std::set<ui32>, PKColumnIds);
- YDB_READONLY_DEF(std::set<ui32>, FFColumnIds);
- YDB_READONLY_DEF(std::vector<TString>, EFColumnNames);
- YDB_READONLY_DEF(std::vector<TString>, PKColumnNames);
- YDB_READONLY_DEF(std::vector<TString>, FFColumnNames);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, EFColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PKColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns);
+ std::shared_ptr<TColumnsSet> EmptyColumns = std::make_shared<TColumnsSet>();
+ std::shared_ptr<TColumnsSet> PKFFColumns;
+ std::shared_ptr<TColumnsSet> EFPKColumns;
+ std::shared_ptr<TColumnsSet> FFMinusEFColumns;
+ std::shared_ptr<TColumnsSet> FFMinusEFPKColumns;
+ const bool TrivialEFFlag = false;
std::vector<TPartialReadResult> PartialResults;
ui32 ReadyResultsCount = 0;
TFetchBlobsQueue Queue;
@@ -25,9 +30,9 @@ private:
protected:
virtual TString DoDebugString() const override {
return TStringBuilder() <<
- "ef_columns=" << JoinSeq(",", EFColumnIds) << ";" <<
- "pk_columns=" << JoinSeq(",", PKColumnIds) << ";" <<
- "ff_columns=" << JoinSeq(",", FFColumnIds) << ";"
+ "ef=" << EFColumns->DebugString() << ";" <<
+ "pk=" << PKColumns->DebugString() << ";" <<
+ "ff=" << FFColumns->DebugString() << ";"
;
}
@@ -46,6 +51,8 @@ protected:
virtual void DoAddData(const TBlobRange& blobRange, const TString& data) override;
virtual std::optional<TBlobRange> DoExtractNextBlob(const bool hasReadyResults) override;
public:
+ TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const;
+
IDataSource& GetSourceByIdxVerified(const ui32 sourceIdx) {
return *Scanner->GetSourceVerified(sourceIdx);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
index 72b2884ea2..307c5ff86a 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
@@ -5,7 +5,7 @@
namespace NKikimr::NOlap::NPlainReader {
-void TScanHead::OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch, const ui32 intervalIdx) {
+void TScanHead::OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 intervalIdx) {
Y_VERIFY(FetchingIntervals.size());
Y_VERIFY(FetchingIntervals.front().GetIntervalIdx() == intervalIdx);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "interval_result")("interval", FetchingIntervals.front().GetIntervalIdx())("count", batch ? batch->num_rows() : 0);
@@ -18,7 +18,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, TPlainR
, Sources(std::move(sources))
{
auto resultSchema = reader.GetReadMetadata()->GetLoadSchema(reader.GetReadMetadata()->GetSnapshot());
- for (auto&& f : reader.GetReadMetadata()->GetUsedColumnIds()) {
+ for (auto&& f : reader.GetReadMetadata()->GetAllColumns()) {
ResultFields.emplace_back(resultSchema->GetFieldByColumnIdVerified(f));
}
Merger = std::make_shared<NIndexedReader::TMergePartialStream>(reader.GetReadMetadata()->GetReplaceKey(), std::make_shared<arrow::Schema>(ResultFields), reader.GetReadMetadata()->IsDescSorted());
@@ -30,6 +30,7 @@ bool TScanHead::BuildNextInterval() {
Y_VERIFY(FrontEnds.size());
auto position = BorderPoints.begin()->first;
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
+ const bool isIncludeStart = CurrentSegments.empty();
for (auto&& i : firstBorderPointInfo.GetStartSources()) {
CurrentSegments.emplace(i->GetSourceIdx(), i);
@@ -38,7 +39,7 @@ bool TScanHead::BuildNextInterval() {
if (firstBorderPointInfo.GetStartSources().size() && firstBorderPointInfo.GetFinishSources().size()) {
FetchingIntervals.emplace_back(
BorderPoints.begin()->first, BorderPoints.begin()->first, SegmentIdxCounter++, CurrentSegments,
- *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), true);
+ *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), true, true);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "new_interval")("interval", FetchingIntervals.back().DebugJson());
}
@@ -64,7 +65,7 @@ bool TScanHead::BuildNextInterval() {
const bool includeFinish = BorderPoints.begin()->second.GetFinishSources().size() == CurrentSegments.size() && !BorderPoints.begin()->second.GetStartSources().size();
FetchingIntervals.emplace_back(
*CurrentStart, BorderPoints.begin()->first, SegmentIdxCounter++, CurrentSegments,
- *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), includeFinish);
+ *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), includeFinish, isIncludeStart);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "new_interval")("interval", FetchingIntervals.back().DebugJson());
return true;
}
@@ -100,4 +101,12 @@ NKikimr::NOlap::TReadContext& TScanHead::GetContext() {
return Reader.GetContext();
}
+bool TScanHead::IsReverse() const {
+ return Reader.GetReadMetadata()->IsDescSorted();
+}
+
+NKikimr::NOlap::NPlainReader::TFetchingPlan TScanHead::GetColumnsFetchingPlan(const bool exclusiveSource) const {
+ return Reader.GetColumnsFetchingPlan(exclusiveSource);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
index a46b613ef1..f20df47faa 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
@@ -37,6 +37,11 @@ private:
void DrainSources();
public:
+
+ TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const;
+
+ bool IsReverse() const;
+
void Abort() {
for (auto&& i : FetchingIntervals) {
i.Abort();
@@ -62,7 +67,7 @@ public:
TReadContext& GetContext();
- void OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch, const ui32 intervalIdx);
+ void OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 intervalIdx);
std::shared_ptr<IDataSource> GetSourceVerified(const ui32 idx) const {
auto it = SourceByIdx.find(idx);
Y_VERIFY(it != SourceByIdx.end());
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index 483cda5070..3b7ba86529 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -7,59 +7,45 @@
namespace NKikimr::NOlap::NPlainReader {
-void IDataSource::InitFF(const std::shared_ptr<arrow::RecordBatch>& batch) {
- NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFF"));
- Y_VERIFY(!FFData);
- FFData = std::make_shared<TFullData>(batch);
- auto intervals = Intervals;
- for (auto&& i : intervals) {
- i->OnSourceFFReady(GetSourceIdx());
+void IDataSource::InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& batchExt) {
+ auto batch = batchExt;
+ if (!batch && FetchingPlan->GetFetchingStage()->GetSize()) {
+ const ui32 numRows = GetFilterStageData().GetBatch() ? GetFilterStageData().GetBatch()->num_rows() : 0;
+ batch = NArrow::MakeEmptyBatch(FetchingPlan->GetFetchingStage()->GetSchema(), numRows);
}
-}
-
-void IDataSource::InitEF(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch) {
- NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitEF"));
- Y_VERIFY(!EFData);
- EFData = std::make_shared<TEarlyFilterData>(appliedFilter, earlyFilter, batch);
- auto intervals = Intervals;
- for (auto&& i : intervals) {
- i->OnSourceEFReady(GetSourceIdx());
+ if (batch) {
+ Y_VERIFY((ui32)batch->num_columns() == FetchingPlan->GetFetchingStage()->GetSize());
}
- NeedPK();
-}
-
-void IDataSource::InitPK(const std::shared_ptr<arrow::RecordBatch>& batch) {
- NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitPK"));
- Y_VERIFY(!PKData);
- PKData = std::make_shared<TPrimaryKeyData>(batch);
+ NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchStageData"));
+ Y_VERIFY(!FetchStageData);
+ FetchStageData = std::make_shared<TFetchStageData>(batch);
auto intervals = Intervals;
for (auto&& i : intervals) {
- i->OnSourcePKReady(GetSourceIdx());
+ i->OnSourceFetchStageReady(GetSourceIdx());
}
- NeedFF();
}
-void IDataSource::NeedEF() {
- NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "NeedEF"));
- if (!NeedEFFlag) {
- NeedEFFlag = true;
- DoFetchEF();
+void IDataSource::InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch) {
+ NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFilterStageData"));
+ Y_VERIFY(!FilterStageData);
+ FilterStageData = std::make_shared<TFilterStageData>(appliedFilter, earlyFilter, batch);
+ if (batch) {
+ Y_VERIFY((ui32)batch->num_columns() == FetchingPlan->GetFilterStage()->GetSize());
}
-}
-
-void IDataSource::NeedPK() {
- NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "NeedPK"));
- if (!NeedPKFlag) {
- NeedPKFlag = true;
- DoFetchPK();
+ auto intervals = Intervals;
+ for (auto&& i : intervals) {
+ i->OnSourceFilterStageReady(GetSourceIdx());
}
+ DoStartFetchStage();
}
-void IDataSource::NeedFF() {
- NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "NeedFF"));
- if (!NeedFFFlag) {
- NeedFFFlag = true;
- DoFetchFF();
+void IDataSource::InitFetchingPlan(const TFetchingPlan& fetchingPlan) {
+ if (!FilterStageFlag) {
+ FilterStageFlag = true;
+ Y_VERIFY(!FetchingPlan);
+ FetchingPlan = fetchingPlan;
+ NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
+ DoStartFilterStage();
}
}
@@ -70,42 +56,50 @@ bool IDataSource::OnIntervalFinished(const ui32 intervalIdx) {
return Intervals.empty();
}
-void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, std::shared_ptr<IFetchTaskConstructor> constructor) {
+void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, std::shared_ptr<IFetchTaskConstructor> constructor, const std::shared_ptr<NArrow::TColumnFilter>& filter) {
+ const NArrow::TColumnFilter& cFilter = filter ? *filter : NArrow::TColumnFilter::BuildAllowFilter();
for (auto&& i : columnIds) {
auto columnChunks = Portion->GetColumnChunksPointers(i);
+ if (columnChunks.empty()) {
+ continue;
+ }
+ auto itFilter = cFilter.GetIterator(false, Portion->NumRows(i));
+ bool itFinished = false;
for (auto&& c : columnChunks) {
- constructor->AddChunk(*c);
- Y_VERIFY(BlobsWaiting.emplace(c->BlobRange, constructor).second);
- ReadData.AddBlobForFetch(GetSourceIdx(), c->BlobRange);
+ Y_VERIFY(!itFinished);
+ if (!itFilter.IsBatchForSkip(c->GetMeta().GetNumRowsVerified())) {
+ constructor->AddWaitingRecord(*c);
+ Y_VERIFY(BlobsWaiting.emplace(c->BlobRange, constructor).second);
+ ReadData.AddBlobForFetch(GetSourceIdx(), c->BlobRange);
+ } else {
+ constructor->AddNullData(c->BlobRange, c->GetMeta().GetNumRowsVerified());
+ }
+ itFinished = !itFilter.Next(c->GetMeta().GetNumRowsVerified());
}
+ AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Portion->NumRows(i));
}
constructor->StartDataWaiting();
}
-void TPortionDataSource::DoFetchEF() {
+void TPortionDataSource::DoStartFilterStage() {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchEF");
- if (ReadData.GetEFColumnIds().size()) {
- NeedFetchColumns(ReadData.GetEFColumnIds(), std::make_shared<TEFTaskConstructor>(ReadData.GetEFColumnIds(), *this, ReadData));
- } else {
- InitEF(nullptr, nullptr, nullptr);
- }
-}
-
-void TPortionDataSource::DoFetchPK() {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchPK");
- if (ReadData.GetPKColumnIds().size()) {
- NeedFetchColumns(ReadData.GetPKColumnIds(), std::make_shared<TPKColumnsTaskConstructor>(ReadData.GetPKColumnIds(), *this, ReadData));
- } else {
- InitPK(nullptr);
- }
-}
-
-void TPortionDataSource::DoFetchFF() {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchFF");
- if (ReadData.GetFFColumnIds().size()) {
- NeedFetchColumns(ReadData.GetFFColumnIds(), std::make_shared<TFFColumnsTaskConstructor>(ReadData.GetFFColumnIds(), *this, ReadData));
+ Y_VERIFY(FetchingPlan->GetFilterStage()->GetSize());
+ auto& columnIds = FetchingPlan->GetFilterStage()->GetColumnIds();
+ NeedFetchColumns(columnIds, std::make_shared<TEFTaskConstructor>(columnIds, *this, ReadData, FetchingPlan->CanUseEarlyFilterImmediately()), nullptr);
+}
+
+void TPortionDataSource::DoStartFetchStage() {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoStartFetchStage");
+ Y_VERIFY(!FetchStageData);
+ Y_VERIFY(FilterStageData);
+ if (!FetchingPlan->GetFetchingStage()->GetSize()) {
+ InitFetchStageData(nullptr);
+ } else if (!FilterStageData->IsEmptyFilter()) {
+ auto& columnIds = FetchingPlan->GetFetchingStage()->GetColumnIds();
+ NeedFetchColumns(columnIds, std::make_shared<TFFColumnsTaskConstructor>(columnIds, *this, ReadData),
+ GetFilterStageData().GetActualFilter());
} else {
- InitFF(nullptr);
+ InitFetchStageData(nullptr);
}
}
@@ -139,9 +133,8 @@ void TCommittedDataSource::AddData(const TBlobRange& /*range*/, TString&& data)
resultBatch = ReadData.GetReadMetadata()->GetIndexInfo().AddSpecialColumns(resultBatch, CommittedBlob.GetSnapshot());
Y_VERIFY(resultBatch);
ReadData.GetReadMetadata()->GetPKRangesFilter().BuildFilter(resultBatch).Apply(resultBatch);
- InitEF(nullptr, ReadData.GetReadMetadata()->GetProgram().BuildEarlyFilter(resultBatch), NArrow::ExtractColumnsValidate(resultBatch, ReadData.GetEFColumnNames()));
- InitPK(NArrow::ExtractColumnsValidate(resultBatch, ReadData.GetPKColumnNames()));
- InitFF(NArrow::ExtractColumnsValidate(resultBatch, ReadData.GetFFColumnNames()));
+ InitFilterStageData(nullptr, ReadData.GetReadMetadata()->GetProgram().BuildEarlyFilter(resultBatch), NArrow::ExtractColumnsValidate(resultBatch, FetchingPlan->GetFilterStage()->GetColumnNamesVector()));
+ InitFetchStageData(NArrow::ExtractColumnsValidate(resultBatch, FetchingPlan->GetFetchingStage()->GetColumnNamesVector()));
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
index d46c957677..edc1275e25 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
@@ -1,5 +1,6 @@
#pragma once
#include "fetched_data.h"
+#include "columns_set.h"
#include <ydb/core/tx/columnshard/engines/reader/read_filter_merger.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
@@ -16,6 +17,25 @@ class TFetchingInterval;
class TPlainReadData;
class IFetchTaskConstructor;
+class TFetchingPlan {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FilterStage);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FetchingStage);
+ bool CanUseEarlyFilterImmediatelyFlag = false;
+public:
+ TFetchingPlan(const std::shared_ptr<TColumnsSet>& filterStage, const std::shared_ptr<TColumnsSet>& fetchingStage, const bool canUseEarlyFilterImmediately)
+ : FilterStage(filterStage)
+ , FetchingStage(fetchingStage)
+ , CanUseEarlyFilterImmediatelyFlag(canUseEarlyFilterImmediately)
+ {
+
+ }
+
+ bool CanUseEarlyFilterImmediately() const {
+ return CanUseEarlyFilterImmediatelyFlag;
+ }
+};
+
class IDataSource {
private:
YDB_READONLY(ui32, SourceIdx, 0);
@@ -27,18 +47,15 @@ protected:
TPlainReadData& ReadData;
std::deque<TFetchingInterval*> Intervals;
- // EF (EarlyFilter)->PK (PrimaryKey)->FF (FullyFetched)
- std::shared_ptr<TEarlyFilterData> EFData;
- std::shared_ptr<TPrimaryKeyData> PKData;
- std::shared_ptr<TFullData> FFData;
+ std::shared_ptr<TFilterStageData> FilterStageData;
+ std::shared_ptr<TFetchStageData> FetchStageData;
- bool NeedEFFlag = false;
- bool NeedPKFlag = false;
- bool NeedFFFlag = false;
+ std::optional<TFetchingPlan> FetchingPlan;
- virtual void DoFetchEF() = 0;
- virtual void DoFetchPK() = 0;
- virtual void DoFetchFF() = 0;
+ bool FilterStageFlag = false;
+
+ virtual void DoStartFilterStage() = 0;
+ virtual void DoStartFetchStage() = 0;
virtual void DoAbort() = 0;
public:
@@ -55,10 +72,6 @@ public:
DoAbort();
}
- bool IsScannersFinished() const {
- return Intervals.empty();
- }
-
NJson::TJsonValue DebugJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("source_idx", SourceIdx);
@@ -71,41 +84,25 @@ public:
bool OnIntervalFinished(const ui32 intervalIdx);
std::shared_ptr<arrow::RecordBatch> GetBatch() const {
- if (!EFData || !PKData || !FFData) {
+ if (!FilterStageData || !FetchStageData) {
return nullptr;
}
- return NArrow::MergeColumns({EFData->GetBatch(), PKData->GetBatch(), FFData->GetBatch()});
- }
-
- bool HasFFData() const {
- return HasPKData() && !!FFData;
+ return NArrow::MergeColumns({FilterStageData->GetBatch(), FetchStageData->GetBatch()});
}
- bool HasPKData() const {
- return HasEFData() && !!PKData;
+ bool IsDataReady() const {
+ return !!FilterStageData && !!FetchStageData;
}
- bool HasEFData() const {
- return !!EFData;
+ const TFilterStageData& GetFilterStageData() const {
+ Y_VERIFY(FilterStageData);
+ return *FilterStageData;
}
- const TEarlyFilterData& GetEFData() const {
- Y_VERIFY(EFData);
- return *EFData;
- }
-
- const TPrimaryKeyData& GetPKData() const {
- Y_VERIFY(PKData);
- return *PKData;
- }
+ void InitFetchingPlan(const TFetchingPlan& fetchingPlan);
- void NeedEF();
- void NeedPK();
- void NeedFF();
-
- void InitEF(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch);
- void InitFF(const std::shared_ptr<arrow::RecordBatch>& batch);
- void InitPK(const std::shared_ptr<arrow::RecordBatch>& batch);
+ void InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch);
+ void InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& batch);
void RegisterInterval(TFetchingInterval* interval) {
Intervals.emplace_back(interval);
@@ -123,7 +120,9 @@ public:
Y_VERIFY(Start.Compare(Finish) != std::partial_ordering::greater);
}
- virtual ~IDataSource() = default;
+ virtual ~IDataSource() {
+ Y_VERIFY(Intervals.empty());
+ }
virtual void AddData(const TBlobRange& range, TString&& data) = 0;
};
@@ -133,11 +132,10 @@ private:
std::shared_ptr<TPortionInfo> Portion;
THashMap<TBlobRange, std::shared_ptr<IFetchTaskConstructor>> BlobsWaiting;
- void NeedFetchColumns(const std::set<ui32>& columnIds, std::shared_ptr<IFetchTaskConstructor> constructor);
+ void NeedFetchColumns(const std::set<ui32>& columnIds, std::shared_ptr<IFetchTaskConstructor> constructor, const std::shared_ptr<NArrow::TColumnFilter>& filter);
- virtual void DoFetchEF() override;
- virtual void DoFetchPK() override;
- virtual void DoFetchFF() override;
+ virtual void DoStartFilterStage() override;
+ virtual void DoStartFetchStage() override;
virtual NJson::TJsonValue DoDebugJson() const override {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("type", "portion");
@@ -177,15 +175,11 @@ private:
}
- virtual void DoFetchEF() override {
- DoFetch();
- }
-
- virtual void DoFetchPK() override {
+ virtual void DoStartFilterStage() override {
DoFetch();
}
- virtual void DoFetchFF() override {
+ virtual void DoStartFetchStage() override {
DoFetch();
}
virtual NJson::TJsonValue DoDebugJson() const override {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
index c43747503a..c5a23130f4 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
@@ -9,6 +9,7 @@ SRCS(
plain_read_data.cpp
filter_assembler.cpp
column_assembler.cpp
+ columns_set.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
deleted file mode 100644
index b87bc406ef..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-#include "postfilter_assembler.h"
-#include "batch.h"
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-bool TAssembleBatch::DoExecuteImpl() {
- /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey.
- /// It's not OK to apply predicate before replacing key duplicates otherwise.
- /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
-
- Y_VERIFY(BatchConstructor.GetColumnsCount());
-
- TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- auto addBatch = BatchConstructor.Assemble(options);
- Y_VERIFY(addBatch);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
- ("columns_count", addBatch->num_columns())("num_rows", addBatch->num_rows());
- Y_VERIFY(Filter->Apply(addBatch));
- Y_VERIFY(NArrow::MergeBatchColumns({ FilterBatch, addBatch }, FullBatch, FullColumnsOrder, true));
-
- return true;
-}
-
-bool TAssembleBatch::DoApply(IDataReader& owner) const {
- auto& reader = owner.GetMeAs<TIndexedReadData>();
- TBatch* batch = reader.GetGranulesContext().GetBatchInfo(BatchAddress);
- if (batch) {
- batch->InitBatch(FullBatch);
- }
- return true;
-}
-
-TAssembleBatch::TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor,
- TBatch& currentBatch, const std::vector<std::string>& fullColumnsOrder,
- NColumnShard::IDataTasksProcessor::TPtr processor)
- : TBase(processor)
- , BatchConstructor(batchConstructor)
- , FullColumnsOrder(fullColumnsOrder)
- , Filter(currentBatch.GetFetchedInfo().GetFilter())
- , FilterBatch(currentBatch.GetFetchedInfo().GetFilterBatch())
- , BatchAddress(currentBatch.GetBatchAddress())
-{
- TBase::SetPriority(TBase::EPriority::High);
- Y_VERIFY(currentBatch.GetFetchedInfo().GetFilter());
- Y_VERIFY(currentBatch.GetFetchedInfo().GetFilterBatch());
- Y_VERIFY(!currentBatch.GetFetchedInfo().GetFilteredBatch());
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
deleted file mode 100644
index 62e8d84bd4..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#pragma once
-#include "common.h"
-#include "conveyor_task.h"
-
-#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
-#include <ydb/core/tx/columnshard/engines/portion_info.h>
-#include <ydb/core/formats/arrow/arrow_filter.h>
-
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-class TBatch;
-class TAssembleBatch: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TAssembleBatch, true, true> {
-private:
- using TBase = NColumnShard::IDataTasksProcessor::ITask;
- TPortionInfo::TPreparedBatchData BatchConstructor;
- std::shared_ptr<arrow::RecordBatch> FullBatch;
- std::vector<std::string> FullColumnsOrder;
-
- std::shared_ptr<NArrow::TColumnFilter> Filter;
- std::shared_ptr<arrow::RecordBatch> FilterBatch;
-
- const TBatchAddress BatchAddress;
-protected:
- virtual bool DoApply(IDataReader& owner) const override;
- virtual bool DoExecuteImpl() override;
-public:
- virtual TString GetTaskClassIdentifier() const override {
- return "Reading::TAssembleBatch";
- }
-
- TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor,
- TBatch& currentBatch, const std::vector<std::string>& fullColumnsOrder, NColumnShard::IDataTasksProcessor::TPtr processor);
-};
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
deleted file mode 100644
index 2a91e4af26..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-#include "processing_context.h"
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-void TProcessingController::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) {
- if (NotIndexedBatchesInitialized) {
- Y_VERIFY(!batches);
- return;
- }
- NotIndexedBatchesInitialized = true;
- GranulesInProcessing.erase(0);
- auto granules = GranulesWaiting;
- for (auto&& [_, gPtr] : granules) {
- if (!batches) {
- gPtr->AddNotIndexedBatch(nullptr);
- } else {
- auto it = batches->find(gPtr->GetGranuleId());
- if (it == batches->end()) {
- gPtr->AddNotIndexedBatch(nullptr);
- } else {
- gPtr->AddNotIndexedBatch(it->second);
- }
- batches->erase(it);
- }
- }
- GuardZeroGranuleData.FreeAll();
-}
-
-NKikimr::NOlap::NIndexedReader::TBatch* TProcessingController::GetBatchInfo(const TBatchAddress& address) {
- auto it = GranulesWaiting.find(address.GetGranuleId());
- if (it == GranulesWaiting.end()) {
- return nullptr;
- } else {
- return &it->second->GetBatchInfo(address.GetBatchGranuleIdx());
- }
-}
-
-TGranule::TPtr TProcessingController::ExtractReadyVerified(const ui64 granuleId) {
- Y_VERIFY(NotIndexedBatchesInitialized);
- auto it = GranulesWaiting.find(granuleId);
- Y_VERIFY(it != GranulesWaiting.end());
- TGranule::TPtr result = it->second;
- GranulesInProcessing.erase(granuleId);
- GranulesWaiting.erase(it);
- return result;
-}
-
-TGranule::TPtr TProcessingController::GetGranuleVerified(const ui64 granuleId) const {
- auto it = GranulesWaiting.find(granuleId);
- Y_VERIFY(it != GranulesWaiting.end());
- return it->second;
-}
-
-TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) const {
- auto itGranule = GranulesWaiting.find(granuleId);
- if (itGranule == GranulesWaiting.end()) {
- return nullptr;
- }
- return itGranule->second;
-}
-
-TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) {
- Y_VERIFY(GranulesWaiting.emplace(g->GetGranuleId(), g).second);
- ++OriginalGranulesCount;
- return g;
-}
-
-void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) {
- if (GranulesInProcessing.emplace(granuleId).second) {
- if (granuleId) {
- GetGranuleVerified(granuleId)->StartConstruction();
- Y_VERIFY(GranulesWaiting.contains(granuleId));
- }
- }
- if (!granuleId) {
- Y_VERIFY(!NotIndexedBatchesInitialized);
- GuardZeroGranuleData.Take(range.Size);
- }
-}
-
-void TProcessingController::Abort() {
- NotIndexedBatchesInitialized = true;
- GranulesWaiting.clear();
- GranulesInProcessing.clear();
-}
-
-TString TProcessingController::DebugString() const {
- return TStringBuilder()
- << "waiting:" << GranulesWaiting.size() << ";"
- << "in_progress:" << GranulesInProcessing.size() << ";"
- << "original_waiting:" << OriginalGranulesCount << ";"
- << "common_granules_data:" << CommonGranuleData << ";"
- << "common_initialized:" << NotIndexedBatchesInitialized << ";"
- ;
-}
-
-NKikimr::NOlap::NIndexedReader::TBatch& TProcessingController::GetBatchInfoVerified(const TBatchAddress& address) {
- NIndexedReader::TBatch* bInfo = GetBatchInfo(address);
- Y_VERIFY(bInfo);
- return *bInfo;
-}
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h
deleted file mode 100644
index bddd479e08..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#pragma once
-#include "granule.h"
-#include <ydb/core/tx/columnshard/counters/scan.h>
-#include <ydb/core/tx/columnshard/resources/memory.h>
-
-namespace NKikimr::NOlap::NIndexedReader {
-
-class TProcessingController {
-private:
- THashMap<ui64, TGranule::TPtr> GranulesWaiting;
- ui32 OriginalGranulesCount = 0;
- ui64 CommonGranuleData = 0;
- std::set<ui64> GranulesInProcessing;
- bool NotIndexedBatchesInitialized = false;
- const NColumnShard::TConcreteScanCounters Counters;
- TScanMemoryLimiter::TGuard GuardZeroGranuleData;
-public:
- TString DebugString() const;
- bool IsGranuleActualForProcessing(const ui64 granuleId) const {
- return GranulesWaiting.contains(granuleId) || (granuleId == 0 && !NotIndexedBatchesInitialized);
- }
-
- TProcessingController(TScanMemoryLimiter::IMemoryAccessor::TPtr memoryAccessor, const NColumnShard::TConcreteScanCounters& counters)
- : Counters(counters)
- , GuardZeroGranuleData(memoryAccessor, Counters.Aggregations.GetGranulesProcessing())
- {
- }
-
- ~TProcessingController() {
- Abort();
- }
-
- void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
-
- NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address);
- NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address);
-
- const std::set<ui64>& GetProcessingGranules() const {
- return GranulesInProcessing;
- }
-
- ui32 GetProcessingGranulesCount() const {
- return GranulesInProcessing.size();
- }
-
- bool IsInProgress(const ui64 granuleId) const {
- return GranulesInProcessing.contains(granuleId);
- }
-
- void Abort();
-
- ui32 GetCount() const {
- return GranulesInProcessing.size();
- }
-
- void StartBlobProcessing(const ui64 granuleId, const TBlobRange& range);
-
- TGranule::TPtr ExtractReadyVerified(const ui64 granuleId);
-
- TGranule::TPtr GetGranuleVerified(const ui64 granuleId) const;
-
- bool IsFinished() const { return GranulesWaiting.empty() && NotIndexedBatchesInitialized; }
-
- TGranule::TPtr InsertGranule(TGranule::TPtr g);
-
- TGranule::TPtr GetGranule(const ui64 granuleId) const;
-
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
index 55f1d79ace..832d00a0c0 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -8,8 +8,10 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
result["reverse"] = ReverseSort;
result["records_count"] = RecordsCount;
result["position"] = Position;
- result["sorting"] = Sorting.DebugJson(Position);
- result["data"] = Data.DebugJson(Position);
+ result["sorting"] = Sorting->DebugJson(Position);
+ if (Data) {
+ result["data"] = Data->DebugJson(Position);
+ }
return result;
}
@@ -95,7 +97,7 @@ void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition
}
void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
- if (!batch || !batch->num_rows() || (filter && filter->IsTotalDenyFilter())) {
+ if (!batch || !batch->num_rows()) {
return;
}
Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema));
@@ -134,51 +136,75 @@ void TMergePartialStream::RemoveControlPoint() {
SortHeap.pop_back();
}
-bool TMergePartialStream::DrainCurrent(std::shared_ptr<TRecordBatchBuilder> builder, const std::optional<TSortableBatchPosition>& readTo, const bool includeFinish) {
- if (SortHeap.empty()) {
- return false;
- }
- while (SortHeap.size()) {
- if (readTo) {
- auto position = TBatchIterator::TPosition(SortHeap.front());
- if (includeFinish) {
- if (position.GetKeyColumns().Compare(*readTo) == std::partial_ordering::greater) {
- return true;
- }
- } else {
- if (position.GetKeyColumns().Compare(*readTo) != std::partial_ordering::less) {
- return true;
- }
- }
+void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) {
+#ifndef NDEBUG
+ if (CurrentKeyColumns) {
+ const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextKeyColumnsPosition) == std::partial_ordering::less;
+ if (!linearExecutionCorrectness) {
+ const bool newSegmentScan = nextKeyColumnsPosition.GetPosition() == 0;
+ AFL_VERIFY(newSegmentScan && nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::less)
+ ("merge_debug", DebugJson())("current_ext", nextKeyColumnsPosition.DebugJson())("newSegmentScan", newSegmentScan);
}
+ }
+ CurrentKeyColumns = nextKeyColumnsPosition;
+#else
+ Y_UNUSED(nextKeyColumnsPosition);
+#endif
+}
- auto currentPosition = DrainCurrentPosition();
- if (currentPosition.IsControlPoint()) {
- return false;
- }
- if (currentPosition.IsDeleted()) {
- continue;
- }
- auto& nextKeyColumnsPosition = currentPosition.GetKeyColumns();
- if (CurrentKeyColumns) {
- const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextKeyColumnsPosition) == std::partial_ordering::less;
- if (!linearExecutionCorrectness) {
- const bool newSegmentScan = nextKeyColumnsPosition.GetPosition() == 0;
- AFL_VERIFY(newSegmentScan && nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::less)
- ("merge_debug", DebugJson())("current_ext", nextKeyColumnsPosition.DebugJson())("newSegmentScan", newSegmentScan);
+bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish) {
+ Y_VERIFY((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
+ PutControlPoint(std::make_shared<TSortableBatchPosition>(readTo));
+ bool cpReachedFlag = false;
+ while (SortHeap.size() && !cpReachedFlag) {
+ if (SortHeap.front().IsControlPoint()) {
+ RemoveControlPoint();
+ cpReachedFlag = true;
+ if (SortHeap.empty() || !includeFinish || SortHeap.front().GetKeyColumns().Compare(readTo) == std::partial_ordering::greater) {
+ return true;
}
}
- CurrentKeyColumns = currentPosition.GetKeyColumns();
- if (builder) {
- builder->AddRecord(*CurrentKeyColumns);
+
+ if (auto currentPosition = DrainCurrentPosition()) {
+ CheckSequenceInDebug(*currentPosition);
+ builder.AddRecord(*currentPosition);
}
- if (!readTo) {
- return true;
+ }
+ return false;
+}
+
+bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
+ Y_VERIFY((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
+ while (SortHeap.size()) {
+ if (auto currentPosition = DrainCurrentPosition()) {
+ CheckSequenceInDebug(*currentPosition);
+ builder.AddRecord(*currentPosition);
}
}
return false;
}
+std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() {
+ Y_VERIFY(SortHeap.size());
+ Y_VERIFY(!SortHeap.front().IsControlPoint());
+ TSortableBatchPosition result = SortHeap.front().GetKeyColumns();
+ TSortableBatchPosition resultVersion = SortHeap.front().GetVersionColumns();
+ bool isFirst = true;
+ const bool deletedFlag = SortHeap.front().IsDeleted();
+ while (SortHeap.size() && (isFirst || result.Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) {
+ auto& anotherIterator = SortHeap.front();
+ if (!isFirst) {
+ Y_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater);
+ }
+ NextInHeap(true);
+ isFirst = false;
+ }
+ if (deletedFlag) {
+ return {};
+ }
+ return result;
+}
+
NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
NJson::TJsonValue result;
result["is_cp"] = IsControlPoint();
@@ -202,7 +228,7 @@ TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch,
}
void TRecordBatchBuilder::AddRecord(const TSortableBatchPosition& position) {
- Y_VERIFY(position.GetData().GetColumns().size() == Builders.size());
+ Y_VERIFY_DEBUG(position.GetData().GetColumns().size() == Builders.size());
Y_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields));
// AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson());
for (ui32 i = 0; i < position.GetData().GetColumns().size(); ++i) {
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index 211beadf74..f4cded7787 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -63,8 +63,8 @@ protected:
YDB_READONLY(i64, Position, 0);
i64 RecordsCount = 0;
bool ReverseSort = false;
- TSortableScanData Sorting;
- TSortableScanData Data;
+ std::shared_ptr<TSortableScanData> Sorting;
+ std::shared_ptr<TSortableScanData> Data;
std::shared_ptr<arrow::RecordBatch> Batch;
static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include);
@@ -72,7 +72,7 @@ public:
TSortableBatchPosition() = default;
const TSortableScanData& GetData() const {
- return Data;
+ return *Data;
}
bool IsReverseSort() const {
@@ -81,11 +81,11 @@ public:
NJson::TJsonValue DebugJson() const;
TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const {
- return TSortableBatchPosition(batch, position, Sorting.GetFieldNames(), Data.GetFieldNames(), ReverseSort);
+ return TSortableBatchPosition(batch, position, Sorting->GetFieldNames(), Data->GetFieldNames(), ReverseSort);
}
bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) {
- return Sorting.IsSameSchema(schema);
+ return Sorting->IsSameSchema(schema);
}
static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) {
@@ -105,19 +105,21 @@ public:
: Position(position)
, RecordsCount(batch->num_rows())
, ReverseSort(reverseSort)
- , Sorting(batch, sortingColumns)
- , Data(batch, dataColumns)
+ , Sorting(std::make_shared<TSortableScanData>(batch, sortingColumns))
, Batch(batch)
{
+ if (dataColumns.size()) {
+ Data = std::make_shared<TSortableScanData>(batch, dataColumns);
+ }
Y_VERIFY(batch->num_rows());
Y_VERIFY_DEBUG(batch->ValidateFull().ok());
- Y_VERIFY(Sorting.GetColumns().size());
+ Y_VERIFY(Sorting->GetColumns().size());
}
std::partial_ordering Compare(const TSortableBatchPosition& item) const {
Y_VERIFY(item.ReverseSort == ReverseSort);
- Y_VERIFY(item.Sorting.GetColumns().size() == Sorting.GetColumns().size());
- const auto directResult = NArrow::ColumnsCompare(Sorting.GetColumns(), Position, item.Sorting.GetColumns(), item.Position);
+ Y_VERIFY(item.Sorting->GetColumns().size() == Sorting->GetColumns().size());
+ const auto directResult = NArrow::ColumnsCompare(Sorting->GetColumns(), Position, item.Sorting->GetColumns(), item.Position);
if (ReverseSort) {
if (directResult == std::partial_ordering::less) {
return std::partial_ordering::greater;
@@ -161,7 +163,9 @@ public:
class TMergePartialStream {
private:
+#ifndef NDEBUG
std::optional<TSortableBatchPosition> CurrentKeyColumns;
+#endif
class TBatchIterator {
private:
bool ControlPointFlag;
@@ -193,6 +197,10 @@ private:
return KeyColumns;
}
+ const TSortableBatchPosition& GetVersionColumns() const {
+ return VersionColumns;
+ }
+
TBatchIterator(const TSortableBatchPosition& keyColumns)
: ControlPointFlag(true)
, KeyColumns(keyColumns)
@@ -213,8 +221,7 @@ private:
Y_VERIFY(KeyColumns.InitPosition(GetFirstPosition()));
Y_VERIFY(VersionColumns.InitPosition(GetFirstPosition()));
if (Filter) {
- FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort));
- Y_VERIFY(Filter->Size() == RecordsCount);
+ FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount));
}
}
@@ -222,42 +229,6 @@ private:
return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less;
}
- class TPosition {
- private:
- TSortableBatchPosition KeyColumns;
- TSortableBatchPosition VersionColumns;
- bool DeletedFlag;
- bool ControlPointFlag;
- public:
- const TSortableBatchPosition& GetKeyColumns() const {
- return KeyColumns;
- }
-
- bool IsControlPoint() const {
- return ControlPointFlag;
- }
-
- bool IsDeleted() const {
- return DeletedFlag;
- }
-
- void TakeIfMoreActual(const TBatchIterator& anotherIterator) {
- Y_VERIFY_DEBUG(KeyColumns.Compare(anotherIterator.KeyColumns) == std::partial_ordering::equivalent);
- if (VersionColumns.Compare(anotherIterator.VersionColumns) == std::partial_ordering::less) {
- DeletedFlag = anotherIterator.IsDeleted();
- ControlPointFlag = anotherIterator.IsControlPoint();
- }
- }
-
- TPosition(const TBatchIterator& owner)
- : KeyColumns(owner.KeyColumns)
- , VersionColumns(owner.VersionColumns)
- , DeletedFlag(owner.IsDeleted())
- , ControlPointFlag(owner.IsControlPoint())
- {
- }
- };
-
bool IsDeleted() const {
if (!FilterIterator) {
return false;
@@ -307,9 +278,11 @@ private:
NJson::TJsonValue DebugJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
+#ifndef NDEBUG
if (CurrentKeyColumns) {
result["current"] = CurrentKeyColumns->DebugJson();
}
+#endif
for (auto&& i : SortHeap) {
result["heap"].AppendValue(i.DebugJson());
}
@@ -352,25 +325,10 @@ private:
const bool Reverse;
ui32 ControlPoints = 0;
- TBatchIterator::TPosition DrainCurrentPosition() {
- Y_VERIFY(SortHeap.size());
- auto position = TBatchIterator::TPosition(SortHeap.front());
- if (SortHeap.front().IsControlPoint()) {
- return position;
- }
- bool isFirst = true;
- while (SortHeap.size() && (isFirst || position.GetKeyColumns().Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) {
- if (!isFirst) {
- position.TakeIfMoreActual(SortHeap.front());
- }
- Y_VERIFY(!SortHeap.front().IsControlPoint());
- NextInHeap(true);
- isFirst = false;
- }
- return position;
- }
+ std::optional<TSortableBatchPosition> DrainCurrentPosition();
void AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap);
+ void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition);
public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse)
: SortSchema(sortSchema)
@@ -393,10 +351,6 @@ public:
return it->second.size();
}
- const std::optional<TSortableBatchPosition>& GetCurrentKeyColumns() const {
- return CurrentKeyColumns;
- }
-
void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point);
void RemoveControlPoint();
@@ -411,13 +365,14 @@ public:
return SortHeap.empty();
}
- bool DrainCurrent(std::shared_ptr<TRecordBatchBuilder> builder = nullptr, const std::optional<TSortableBatchPosition>& readTo = {}, const bool includeFinish = false);
+ bool DrainAll(TRecordBatchBuilder& builder);
+ bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish);
};
class TRecordBatchBuilder {
private:
std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders;
- std::vector<std::shared_ptr<arrow::Field>> Fields;
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);
YDB_READONLY(ui32, RecordsCount, 0);
bool IsSameFieldsSequence(const std::vector<std::shared_ptr<arrow::Field>>& f1, const std::vector<std::shared_ptr<arrow::Field>>& f2) {
@@ -433,6 +388,18 @@ private:
}
public:
+ ui32 GetBuildersCount() const {
+ return Builders.size();
+ }
+
+ TString GetColumnNames() const {
+ TStringBuilder result;
+ for (auto&& f : Fields) {
+ result << f->name() << ",";
+ }
+ return result;
+ }
+
TRecordBatchBuilder(const std::vector<std::shared_ptr<arrow::Field>>& fields)
: Fields(fields) {
Y_VERIFY(Fields.size());
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index 0f8611e2d1..f57cb658c4 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -1,12 +1,8 @@
#include "read_metadata.h"
-#include "order_control/default.h"
-#include "order_control/pk_with_limit.h"
-#include "order_control/not_sorted.h"
#include "plain_reader/plain_read_data.h"
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
#include <ydb/core/tx/columnshard/columnshard__stats_scan.h>
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
#include <util/string/join.h>
namespace NKikimr::NOlap {
@@ -143,23 +139,6 @@ std::set<ui32> TReadMetadata::GetPKColumnIds() const {
return result;
}
-std::set<ui32> TReadMetadata::GetUsedColumnIds() const {
- std::set<ui32> result;
- auto& indexInfo = ResultIndexSchema->GetIndexInfo();
- if (Snapshot.GetPlanStep()) {
- auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
- for (auto&& i : snapSchema->fields()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name());
- result.emplace(indexInfo.GetColumnId(i->name()));
- }
- }
- result.insert(AllColumns.begin(), AllColumns.end());
- for (auto&& i : indexInfo.GetPrimaryKey()) {
- Y_VERIFY(result.contains(indexInfo.GetColumnId(i.first)));
- }
- return result;
-}
-
std::vector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetResultYqlSchema() const {
return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, ResultColumnIds);
}
@@ -192,39 +171,6 @@ void TReadStats::PrintToLog() {
;
}
-NIndexedReader::IOrderPolicy::TPtr TReadMetadata::DoBuildSortingPolicy() const {
- auto& indexInfo = ResultIndexSchema->GetIndexInfo();
- if (Limit && IsSorted() && indexInfo.IsSorted() &&
- indexInfo.GetReplaceKey()->Equals(indexInfo.GetIndexKey())) {
- ui32 idx = 0;
- for (auto&& i : indexInfo.GetPrimaryKey()) {
- if (idx >= indexInfo.GetSortingKey()->fields().size()) {
- break;
- }
- if (indexInfo.GetSortingKey()->fields()[idx]->name() != i.first) {
- return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this());
- }
- ++idx;
- }
-
- if (!idx || !GetProgram().HasEarlyFilterOnly()) {
- return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this());
- }
- return std::make_shared<NIndexedReader::TPKSortingWithLimit>(this->shared_from_this());
- } else if (IsSorted()) {
- return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this());
- } else {
- return std::make_shared<NIndexedReader::TNonSorting>(this->shared_from_this());
- }
-}
-
-std::shared_ptr<NIndexedReader::IOrderPolicy> TReadMetadata::BuildSortingPolicy() const {
- auto result = DoBuildSortingPolicy();
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "sorting_policy_constructed")("info", result->DebugString());
- NYDBTest::TControllers::GetColumnShardController()->OnSortingPolicy(result);
- return result;
-}
-
std::shared_ptr<NKikimr::NOlap::IDataReader> TReadMetadata::BuildReader(const NOlap::TReadContext& context, const TConstPtr& self) const {
return std::make_shared<NPlainReader::TPlainReadData>(self, context);
// auto result = std::make_shared<TIndexedReadData>(self, context);
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 1aa23d0366..9faa66ee63 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -22,10 +22,6 @@ class TScanIteratorBase;
namespace NKikimr::NOlap {
-namespace NIndexedReader {
-class IOrderPolicy;
-}
-
struct TReadStats {
TInstant BeginTimestamp;
ui32 SelectedIndex{0};
@@ -138,7 +134,6 @@ private:
std::shared_ptr<ISnapshotSchema> ResultIndexSchema;
std::vector<ui32> AllColumns;
std::vector<ui32> ResultColumnsIds;
- std::shared_ptr<NIndexedReader::IOrderPolicy> DoBuildSortingPolicy() const;
mutable std::map<TSnapshot, ISnapshotSchema::TPtr> SchemasByVersionCache;
mutable ISnapshotSchema::TPtr EmptyVersionSchemaCache;
public:
@@ -159,8 +154,6 @@ public:
return Snapshot;
}
- std::shared_ptr<NIndexedReader::IOrderPolicy> BuildSortingPolicy() const;
-
TReadMetadata(const TVersionedIndex& info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram)
: TBase(sorting, ssaProgram)
, IndexVersions(info)
@@ -223,7 +216,6 @@ public:
}
std::set<ui32> GetEarlyFilterColumnIds() const;
- std::set<ui32> GetUsedColumnIds() const;
std::set<ui32> GetPKColumnIds() const;
bool Empty() const {
diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make
index 58fc56e898..2ca09c00d5 100644
--- a/ydb/core/tx/columnshard/engines/reader/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/ya.make
@@ -1,20 +1,13 @@
LIBRARY()
SRCS(
- batch.cpp
common.cpp
conveyor_task.cpp
description.cpp
- filling_context.cpp
- filter_assembler.cpp
- granule.cpp
- processing_context.cpp
- postfilter_assembler.cpp
queue.cpp
read_filter_merger.cpp
read_metadata.cpp
read_context.cpp
- granule_preparation.cpp
)
PEERDIR(
@@ -25,7 +18,6 @@ PEERDIR(
ydb/core/tx/columnshard/hooks/abstract
ydb/core/tx/columnshard/resources
ydb/core/tx/program
- ydb/core/tx/columnshard/engines/reader/order_control
ydb/core/tx/columnshard/engines/reader/plain_reader
ydb/core/tx/columnshard/engines/scheme
)
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
index 8d0ebfccf8..a392c75ce5 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
@@ -89,8 +89,8 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TStr
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", status.ToString());
return nullptr;
}
- batch = NArrow::SortBatch(batch, sortingKey);
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, sortingKey));
+ batch = NArrow::SortBatch(batch, sortingKey, true);
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, sortingKey));
return batch;
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 3d6b808e4e..51e1b6b9ae 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -328,17 +328,27 @@ std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId)
return features.GetLoader(*this);
}
-std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const {
- std::shared_ptr<arrow::Schema> schema = Schema;
- if (IsSpecialColumn(columnId)) {
- schema = ArrowSchemaSnapshot();
+std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnsSchema(const std::set<ui32>& columnIds) const {
+ Y_VERIFY(columnIds.size());
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ for (auto&& i : columnIds) {
+ std::shared_ptr<arrow::Schema> schema;
+ if (IsSpecialColumn(i)) {
+ schema = ArrowSchemaSnapshot();
+ } else {
+ schema = Schema;
+ }
+ auto field = schema->GetFieldByName(GetColumnName(i));
+ Y_VERIFY(field);
+ fields.emplace_back(field);
}
- auto field = schema->GetFieldByName(GetColumnName(columnId));
- Y_VERIFY(field);
- std::vector<std::shared_ptr<arrow::Field>> fields = { field };
return std::make_shared<arrow::Schema>(fields);
}
+std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const {
+ return GetColumnsSchema({columnId});
+}
+
bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
if (schema.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema");
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 62b034283f..2dd955dd6f 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -99,6 +99,7 @@ public:
}
std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const;
+ std::shared_ptr<arrow::Schema> GetColumnsSchema(const std::set<ui32>& columnIds) const;
TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const;
std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const;
diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make
index 54de6c172c..4b058ca9bb 100644
--- a/ydb/core/tx/columnshard/engines/ya.make
+++ b/ydb/core/tx/columnshard/engines/ya.make
@@ -10,7 +10,6 @@ SRCS(
column_features.cpp
db_wrapper.cpp
index_info.cpp
- indexed_read_data.cpp
filter.cpp
portion_info.cpp
tier_info.cpp
diff --git a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt
index d07c2a1b8e..006b0b2e92 100644
--- a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt
@@ -12,7 +12,7 @@ target_link_libraries(columnshard-hooks-testing PUBLIC
contrib-libs-cxxsupp
yutil
columnshard-hooks-abstract
- engines-reader-order_control
+ columnshard-engines-changes
)
target_sources(columnshard-hooks-testing PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/hooks/testing/controller.cpp
diff --git a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt
index eab89d03d7..c374737b33 100644
--- a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt
@@ -13,7 +13,7 @@ target_link_libraries(columnshard-hooks-testing PUBLIC
contrib-libs-cxxsupp
yutil
columnshard-hooks-abstract
- engines-reader-order_control
+ columnshard-engines-changes
)
target_sources(columnshard-hooks-testing PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/hooks/testing/controller.cpp
diff --git a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt
index eab89d03d7..c374737b33 100644
--- a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt
@@ -13,7 +13,7 @@ target_link_libraries(columnshard-hooks-testing PUBLIC
contrib-libs-cxxsupp
yutil
columnshard-hooks-abstract
- engines-reader-order_control
+ columnshard-engines-changes
)
target_sources(columnshard-hooks-testing PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/hooks/testing/controller.cpp
diff --git a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt
index d07c2a1b8e..006b0b2e92 100644
--- a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt
@@ -12,7 +12,7 @@ target_link_libraries(columnshard-hooks-testing PUBLIC
contrib-libs-cxxsupp
yutil
columnshard-hooks-abstract
- engines-reader-order_control
+ columnshard-engines-changes
)
target_sources(columnshard-hooks-testing PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/hooks/testing/controller.cpp
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.cpp b/ydb/core/tx/columnshard/hooks/testing/controller.cpp
index bd6c9d93c8..9f1f5cdb4b 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.cpp
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.cpp
@@ -1,27 +1,10 @@
#include "controller.h"
-#include <ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h>
-#include <ydb/core/tx/columnshard/engines/reader/order_control/default.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::NYDBTest::NColumnShard {
-bool TController::DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> policy) {
- if (dynamic_cast<const NOlap::NIndexedReader::TPKSortingWithLimit*>(policy.get())) {
- SortingWithLimit.Inc();
- } else if (dynamic_cast<const NOlap::NIndexedReader::TAnySorting*>(policy.get())) {
- AnySorting.Inc();
- } else {
- Y_VERIFY(false);
- }
- return true;
-}
-
-bool TController::HasPKSortingOnly() const {
- return SortingWithLimit.Val() && !AnySorting.Val();
-}
-
bool TController::DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) {
if (batch) {
FilteredRecordsCount.Add(batch->num_rows());
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index 1ab5407430..1158fcba97 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -5,13 +5,10 @@ namespace NKikimr::NYDBTest::NColumnShard {
class TController: public ICSController {
private:
- YDB_READONLY(TAtomicCounter, SortingWithLimit, 0);
- YDB_READONLY(TAtomicCounter, AnySorting, 0);
YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0);
YDB_READONLY(TAtomicCounter, InternalCompactions, 0);
YDB_READONLY(TAtomicCounter, SplitCompactions, 0);
protected:
- virtual bool DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> policy) override;
virtual bool DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) override;
virtual bool DoOnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) override;
diff --git a/ydb/core/tx/columnshard/hooks/testing/ya.make b/ydb/core/tx/columnshard/hooks/testing/ya.make
index be421974c1..5a56a25ea2 100644
--- a/ydb/core/tx/columnshard/hooks/testing/ya.make
+++ b/ydb/core/tx/columnshard/hooks/testing/ya.make
@@ -6,7 +6,7 @@ SRCS(
PEERDIR(
ydb/core/tx/columnshard/hooks/abstract
- ydb/core/tx/columnshard/engines/reader/order_control
+ ydb/core/tx/columnshard/engines/changes
)
END()
diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h
index 261cdf3bc3..3ef8ebf6f8 100644
--- a/ydb/core/tx/columnshard/inflight_request_tracker.h
+++ b/ydb/core/tx/columnshard/inflight_request_tracker.h
@@ -1,7 +1,8 @@
#pragma once
#include "blob.h"
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+#include "engines/reader/read_metadata.h"
+#include "engines/column_engine.h"
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index d4c88590ff..c708a261ed 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -1,5 +1,4 @@
#include <ydb/core/tx/columnshard/columnshard_impl.h>
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
#include <ydb/core/tx/columnshard/blob_cache.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -19,7 +18,7 @@ private:
size_t next = 1;
for (auto it = ready.begin(); it != ready.end(); ++it, ++next) {
const bool lastOne = IndexedData->IsFinished() && (next == ready.size());
- SendResult(ctx, it->GetResultBatch(), lastOne);
+ SendResult(ctx, it->GetResultBatchPtrVerified(), lastOne);
}
}
}