diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-06 19:11:04 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-06 19:37:46 +0300 |
commit | 2efe58fb7a98d40e700c0f6304bd0fa752fb663c (patch) | |
tree | 36c3275421242af471b26d7d17e23d4756607f2a | |
parent | 88ec26760cfd1b45a5e1a3831ca94e2afb5ab344 (diff) | |
download | ydb-2efe58fb7a98d40e700c0f6304bd0fa752fb663c.tar.gz |
KIKIMR-19213: restore optimizations. fixes. remove deprecated reader
99 files changed, 673 insertions, 3230 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index bb0e839ec8..cb63286d9f 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -130,6 +130,63 @@ void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatc } } } + +} + +bool TColumnFilter::TIterator::Next(const ui32 size) { + Y_VERIFY(size); + if (CurrentRemainVolume > size) { + InternalPosition += size; + CurrentRemainVolume -= size; + return true; + } + if (!FilterPointer && CurrentRemainVolume == size) { + InternalPosition += size; + CurrentRemainVolume -= size; + return false; + } + Y_VERIFY(FilterPointer); + ui32 sizeRemain = size; + while (Position != FinishPosition) { + const ui32 currentVolume = (*FilterPointer)[Position]; + if (currentVolume > sizeRemain + InternalPosition) { + InternalPosition += sizeRemain; + CurrentRemainVolume = currentVolume - InternalPosition; + return true; + } else { + sizeRemain -= currentVolume - InternalPosition; + InternalPosition = 0; + CurrentValue = !CurrentValue; + Position += DeltaPosition; + } + } + Y_VERIFY(Position == FinishPosition); + CurrentRemainVolume = 0; + return false; +} + +TString TColumnFilter::TIterator::DebugString() const { + TStringBuilder sb; + if (FilterPointer) { + sb << "filter_pointer="; + ui32 idx = 0; + ui64 sum = 0; + for (auto&& i : *FilterPointer) { + sb << i << ","; + sum += i; + if (++idx > 100) { + break; + } + } + sb << ";sum=" << sum << ";"; + } + sb << "internal_position=" << InternalPosition << ";"; + sb << "position=" << Position << ";"; + sb << "current=" << CurrentValue << ";"; + sb << "finish=" << FinishPosition << ";"; + sb << "delta=" << DeltaPosition << ";"; + sb << "remain=" << CurrentRemainVolume << ";"; + return sb; } std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter(const ui32 expectedSize) const { @@ -456,4 +513,13 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter } } +TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const { + if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) { + return TIterator(reverse, expectedSize, CurrentValue); + } else { + Y_VERIFY(expectedSize == Size()); + return TIterator(reverse, Filter, GetStartValue(reverse)); + } +} + } diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h index 67423ec640..f8c0158cd7 100644 --- a/ydb/core/formats/arrow/arrow_filter.h +++ b/ydb/core/formats/arrow/arrow_filter.h @@ -55,69 +55,60 @@ public: class TIterator { private: - ui32 InternalPosition = 0; - ui32 CurrentRemainVolume = 0; - const std::vector<ui32>& Filter; + i64 InternalPosition = 0; + i64 CurrentRemainVolume = 0; + const std::vector<ui32>* FilterPointer = nullptr; i32 Position = 0; bool CurrentValue; const i32 FinishPosition; const i32 DeltaPosition; public: + TString DebugString() const; + TIterator(const bool reverse, const std::vector<ui32>& filter, const bool startValue) - : Filter(filter) + : FilterPointer(&filter) , CurrentValue(startValue) - , FinishPosition(reverse ? -1 : Filter.size()) + , FinishPosition(reverse ? -1 : FilterPointer->size()) , DeltaPosition(reverse ? -1 : 1) { - if (!Filter.size()) { + if (!FilterPointer->size()) { Position = FinishPosition; } else { if (reverse) { - Position = Filter.size() - 1; + Position = FilterPointer->size() - 1; } - CurrentRemainVolume = Filter[Position]; + CurrentRemainVolume = (*FilterPointer)[Position]; + } + } + + TIterator(const bool reverse, const ui32 size, const bool startValue) + : CurrentValue(startValue) + , FinishPosition(reverse ? -1 : 1) + , DeltaPosition(reverse ? -1 : 1) { + if (!size) { + Position = FinishPosition; + } else { + if (reverse) { + Position = 0; + } + CurrentRemainVolume = size; } } bool GetCurrentAcceptance() const { - Y_VERIFY_DEBUG(CurrentRemainVolume); + Y_VERIFY(CurrentRemainVolume); return CurrentValue; } bool IsBatchForSkip(const ui32 size) const { - Y_VERIFY_DEBUG(CurrentRemainVolume); + Y_VERIFY(CurrentRemainVolume); return !CurrentValue && CurrentRemainVolume >= size; } - bool Next(const ui32 size) { - Y_VERIFY(size); - if (CurrentRemainVolume > size) { - InternalPosition += size; - CurrentRemainVolume -= size; - return true; - } - ui32 sizeRemain = size; - while (Position != FinishPosition) { - const ui32 currentVolume = Filter[Position]; - if (currentVolume - InternalPosition > sizeRemain) { - InternalPosition = sizeRemain; - CurrentRemainVolume = currentVolume - InternalPosition - sizeRemain; - return true; - } else { - sizeRemain -= currentVolume - InternalPosition; - InternalPosition = 0; - CurrentValue = !CurrentValue; - Position += DeltaPosition; - } - } - CurrentRemainVolume = 0; - return false; - } + bool Next(const ui32 size); }; - TIterator GetIterator(const bool reverse) const { - return TIterator(reverse, Filter, GetStartValue(reverse)); - } + TIterator GetIterator(const bool reverse, const ui32 expectedSize) const; bool empty() const { return Filter.empty(); diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 43af9fb5dc..d6857dafab 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -341,8 +341,8 @@ bool IsNoOp(const arrow::UInt64Array& permutation) { } std::shared_ptr<arrow::RecordBatch> Reorder(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::UInt64Array>& permutation) { - Y_VERIFY(permutation->length() == batch->num_rows()); + const std::shared_ptr<arrow::UInt64Array>& permutation, const bool canRemove) { + Y_VERIFY(permutation->length() == batch->num_rows() || canRemove); auto res = IsNoOp(*permutation) ? batch : arrow::compute::Take(batch, permutation); Y_VERIFY(res.ok()); @@ -373,7 +373,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared Y_VERIFY_OK(builder.Finish(&permutation)); } - auto reorderedBatch = Reorder(batch, permutation); + auto reorderedBatch = Reorder(batch, permutation, false); std::vector<std::shared_ptr<arrow::RecordBatch>> out(numShards); @@ -689,9 +689,9 @@ int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr } std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::Schema>& sortingKey) { - auto sortPermutation = MakeSortPermutation(batch, sortingKey); - return Reorder(batch, sortPermutation); + const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) { + auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique); + return Reorder(batch, sortPermutation, andUnique); } std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec) { diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index fb012391c4..8314d4d9ab 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -98,7 +98,7 @@ bool ReserveData(arrow::ArrayBuilder& builder, const size_t size); bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder = {}, const bool orderFieldsAreNecessary = true); std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::Schema>& sortingKey); + const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique); bool IsSorted(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, bool desc = false); diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp index f16f43b9c3..ed7f410b8a 100644 --- a/ydb/core/formats/arrow/permutations.cpp +++ b/ydb/core/formats/arrow/permutations.cpp @@ -39,7 +39,7 @@ std::shared_ptr<arrow::UInt64Array> MakePermutation(const int size, const bool r } std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::Schema>& sortingKey) { + const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) { auto keyBatch = ExtractColumns(batch, sortingKey); auto keyColumns = std::make_shared<TArrayVec>(keyBatch->columns()); std::vector<TRawReplaceKey> points; @@ -70,8 +70,21 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar arrow::UInt64Builder builder; TStatusValidator::Validate(builder.Reserve(points.size())); + TRawReplaceKey* predKey = nullptr; for (auto& point : points) { + if (andUnique) { + if (predKey) { + if (haveNulls) { + if (*predKey == point) { + continue; + } + } else if (predKey->CompareNotNull(point) == std::partial_ordering::equivalent) { + continue; + } + } + } TStatusValidator::Validate(builder.Append(point.GetPosition())); + predKey = &point; } std::shared_ptr<arrow::UInt64Array> out; diff --git a/ydb/core/formats/arrow/permutations.h b/ydb/core/formats/arrow/permutations.h index c811cfe680..9a1175887b 100644 --- a/ydb/core/formats/arrow/permutations.h +++ b/ydb/core/formats/arrow/permutations.h @@ -8,7 +8,7 @@ namespace NKikimr::NArrow { std::shared_ptr<arrow::UInt64Array> MakePermutation(const int size, const bool reverse = false); std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64>& indexes); std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::Schema>& sortingKey); + const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique); std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& source, const std::vector<ui64>& indexes); diff --git a/ydb/core/formats/arrow/ut_arrow.cpp b/ydb/core/formats/arrow/ut_arrow.cpp index 8bbef7c388..3716d31b44 100644 --- a/ydb/core/formats/arrow/ut_arrow.cpp +++ b/ydb/core/formats/arrow/ut_arrow.cpp @@ -648,7 +648,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 1000); UNIT_ASSERT(!CheckSorted1000(batch)); - auto sortPermutation = NArrow::MakeSortPermutation(batch, table->schema()); + auto sortPermutation = NArrow::MakeSortPermutation(batch, table->schema(), false); auto res = arrow::compute::Take(batch, sortPermutation); UNIT_ASSERT(res.ok()); diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 6ff5f6e3b3..5465ed35fa 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -21,7 +21,7 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data IndexedData->AddData(blobRange, data); } -NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { +std::optional<NOlap::TPartialReadResult> TColumnShardScanIterator::GetBatch() { FillReadyResults(); return ReadyResults.pop_front(); } @@ -34,17 +34,17 @@ void TColumnShardScanIterator::FillReadyResults() { auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch); i64 limitLeft = ReadMetadata->Limit == 0 ? INT64_MAX : ReadMetadata->Limit - ItemsRead; for (size_t i = 0; i < ready.size() && limitLeft; ++i) { - if (ready[i].GetResultBatch()->num_rows() == 0 && !ready[i].GetLastReadKey()) { + if (ready[i].GetResultBatch().num_rows() == 0 && !ready[i].GetLastReadKey()) { Y_VERIFY(i + 1 == ready.size(), "Only last batch can be empty!"); break; } auto& batch = ReadyResults.emplace_back(std::move(ready[i])); - if (batch.GetResultBatch()->num_rows() > limitLeft) { + if (batch.GetResultBatch().num_rows() > limitLeft) { batch.Slice(0, limitLeft); } - limitLeft -= batch.GetResultBatch()->num_rows(); - ItemsRead += batch.GetResultBatch()->num_rows(); + limitLeft -= batch.GetResultBatch().num_rows(); + ItemsRead += batch.GetResultBatch().num_rows(); } if (limitLeft == 0) { diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 6d8a77a2af..09fb17f9db 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -2,7 +2,8 @@ #include "columnshard__scan.h" #include "columnshard_common.h" -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> +#include "engines/reader/read_metadata.h" +#include "engines/reader/read_context.h" namespace NKikimr::NColumnShard { @@ -23,7 +24,6 @@ public: } }; - using NOlap::TUnifiedBlobId; using NOlap::TBlobRange; @@ -40,7 +40,7 @@ public: << "records_count:" << RecordsCount << ";" ; if (Data.size()) { - sb << "schema=" << Data.front().GetResultBatch()->schema()->ToString() << ";"; + sb << "schema=" << Data.front().GetResultBatch().schema()->ToString() << ";"; } return sb; } @@ -50,16 +50,16 @@ public: } NOlap::TPartialReadResult& emplace_back(NOlap::TPartialReadResult&& v) { - RecordsCount += v.GetResultBatch()->num_rows(); + RecordsCount += v.GetResultBatch().num_rows(); Data.emplace_back(std::move(v)); return Data.back(); } - NOlap::TPartialReadResult pop_front() { + std::optional<NOlap::TPartialReadResult> pop_front() { if (Data.empty()) { - return NOlap::TPartialReadResult(); + return {}; } auto result = std::move(Data.front()); - RecordsCount -= result.GetResultBatch()->num_rows(); + RecordsCount -= result.GetResultBatch().num_rows(); Data.pop_front(); return result; } @@ -104,7 +104,7 @@ public: return IndexedData->IsFinished() && ReadyResults.empty(); } - NOlap::TPartialReadResult GetBatch() override; + std::optional<NOlap::TPartialReadResult> GetBatch() override; std::optional<NBlobCache::TBlobRange> GetNextBlobToRead() override; diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index a2182f63e4..b888d503b1 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -5,7 +5,6 @@ #include <ydb/core/tx/columnshard/columnshard_schema.h> #include <ydb/core/tx/columnshard/columnshard__read_base.h> #include <ydb/core/tx/columnshard/columnshard__index_scan.h> -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index f7e67460f8..337b52ce98 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -277,7 +277,12 @@ private: return false; } - auto result = ScanIterator->GetBatch(); + auto resultOpt = ScanIterator->GetBatch(); + if (!resultOpt) { + ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString()); + return false; + } + auto& result = *resultOpt; if (!result.ErrorString.empty()) { ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", result.ErrorString); SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size())); @@ -291,12 +296,7 @@ private: ResultYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetResultYqlSchema(); } - if (!result.GetResultBatch()) { - ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString()); - return false; - } - - auto& batch = result.GetResultBatch(); + auto batch = result.GetResultBatchPtrVerified(); int numRows = batch->num_rows(); int numColumns = batch->num_columns(); if (!numRows) { diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 464f5b0fcc..ca72f0597c 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -97,24 +97,29 @@ public: } } - const std::shared_ptr<arrow::RecordBatch>& GetResultBatch() const { + const std::shared_ptr<arrow::RecordBatch>& GetResultBatchPtrVerified() const { + Y_VERIFY(ResultBatch); return ResultBatch; } + const arrow::RecordBatch& GetResultBatch() const { + Y_VERIFY(ResultBatch); + return *ResultBatch; + } + const std::shared_ptr<arrow::RecordBatch>& GetLastReadKey() const { return LastReadKey; } std::string ErrorString; - TPartialReadResult() = default; - explicit TPartialReadResult( std::shared_ptr<TScanMemoryLimiter::TGuard> memGuard, std::shared_ptr<arrow::RecordBatch> batch) : MemoryGuardExternal(memGuard) , ResultBatch(batch) { + Y_VERIFY(ResultBatch); } explicit TPartialReadResult( @@ -124,6 +129,7 @@ public: , ResultBatch(batch) , LastReadKey(lastKey) { + Y_VERIFY(ResultBatch); } }; } @@ -143,7 +149,7 @@ public: virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/) {} virtual bool HasWaitingTasks() const = 0; virtual bool Finished() const = 0; - virtual NOlap::TPartialReadResult GetBatch() = 0; + virtual std::optional<NOlap::TPartialReadResult> GetBatch() = 0; virtual std::optional<NBlobCache::TBlobRange> GetNextBlobToRead() { return {}; } virtual TString DebugString() const { return "NO_DATA"; diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp index 3056242b13..021c4a2a49 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp @@ -2,7 +2,7 @@ namespace NKikimr::NColumnShard { -NKikimr::NOlap::TPartialReadResult TStatsIterator::GetBatch() { +std::optional<NOlap::TPartialReadResult> TStatsIterator::GetBatch() { // Take next raw batch auto batch = FillStatsBatch(); diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index 825846f308..a334dff2a1 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -53,7 +53,7 @@ public: return IndexStats.empty(); } - NOlap::TPartialReadResult GetBatch() override; + std::optional<NOlap::TPartialReadResult> GetBatch() override; private: NOlap::TReadStatsMetadata::TConstPtr ReadMetadata; diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt index 94f29e6893..41ff6614c5 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt @@ -46,7 +46,6 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_features.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt index 08ceed7b24..e5129086a4 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt @@ -47,7 +47,6 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_features.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt index 08ceed7b24..e5129086a4 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt @@ -47,7 +47,6 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_features.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt index 94f29e6893..41ff6614c5 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt @@ -46,7 +46,6 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_features.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 7c76aa0f15..c58d1d964d 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -1,6 +1,5 @@ #include "column_engine_logs.h" #include "filter.h" -#include "indexed_read_data.h" #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/formats/arrow/one_batch_input_stream.h> diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp deleted file mode 100644 index cdfb768fd0..0000000000 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ /dev/null @@ -1,425 +0,0 @@ -#include "defs.h" -#include "indexed_read_data.h" -#include "filter.h" -#include "column_engine_logs.h" -#include "changes/mark_granules.h" -#include <ydb/core/tx/columnshard/columnshard__index_scan.h> -#include <ydb/core/tx/columnshard/columnshard__stats_scan.h> -#include <ydb/core/formats/arrow/one_batch_input_stream.h> -#include <ydb/core/formats/arrow/merging_sorted_input_stream.h> -#include <ydb/core/formats/arrow/custom_registry.h> - -namespace NKikimr::NOlap { - -namespace { - -// Slices a batch into smaller batches and appends them to result vector (which might be non-empty already) -std::vector<std::shared_ptr<arrow::RecordBatch>> SliceBatch(const std::shared_ptr<arrow::RecordBatch>& batch, - const int64_t maxRowsInBatch) -{ - std::vector<std::shared_ptr<arrow::RecordBatch>> result; - if (!batch->num_rows()) { - return result; - } - result.reserve(result.size() + batch->num_rows() / maxRowsInBatch + 1); - if (batch->num_rows() <= maxRowsInBatch) { - result.push_back(batch); - return result; - } - - int64_t offset = 0; - while (offset < batch->num_rows()) { - int64_t rows = std::min<int64_t>(maxRowsInBatch, batch->num_rows() - offset); - result.emplace_back(batch->Slice(offset, rows)); - offset += rows; - } - return result; -}; - -} - -void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { - Y_VERIFY(IndexedBlobSubscriber.emplace(range, batch.GetBatchAddress()).second); - if (batch.GetFetchedInfo().GetFilter()) { - Context.GetCounters().PostFilterBytes->Add(range.Size); - ReadMetadata->ReadStats->DataAdditionalBytes += range.Size; - PriorityBlobsQueue.emplace_back(batch.GetGranule(), range); - } else { - Context.GetCounters().FilterBytes->Add(range.Size); - ReadMetadata->ReadStats->DataFilterBytes += range.Size; - FetchBlobsQueue.emplace_back(batch.GetGranule(), range); - } -} - -void TIndexedReadData::RegisterZeroGranula() { - for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i) { - const auto& cmtBlob = ReadMetadata->CommittedBlobs[i]; - WaitCommitted.emplace(cmtBlob.GetBlobId(), cmtBlob); - } - for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { - AddNotIndexed(blobId, batch); - } - for (const auto& [blobId, _] : WaitCommitted) { - AddBlobToFetchInFront(0, TBlobRange(blobId, 0, blobId.BlobSize())); - } -} - -void TIndexedReadData::InitRead() { - Y_VERIFY(!GranulesContext); - GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode); - - RegisterZeroGranula(); - - auto& indexInfo = ReadMetadata->GetIndexInfo(); - Y_VERIFY(indexInfo.GetSortingKey()); - Y_VERIFY(indexInfo.GetIndexKey() && indexInfo.GetIndexKey()->num_fields()); - - SortReplaceDescription = indexInfo.SortReplaceDescription(); - - ui64 portionsBytes = 0; - std::set<ui64> granulesReady; - ui64 prevGranule = 0; - THashSet<ui32> columnIdsHash; - for (auto&& i : ReadMetadata->GetAllColumns()) { - columnIdsHash.emplace(i); - } - for (auto& portionSorted : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) { - auto portionInfo = portionSorted->CopyWithFilteredColumns(columnIdsHash); - portionsBytes += portionInfo.BlobsBytes(); - Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata); - - NIndexedReader::TGranule::TPtr granule = GranulesContext->UpsertGranule(portionInfo.GetGranule()); - if (prevGranule != portionInfo.GetGranule()) { - Y_VERIFY(granulesReady.emplace(portionInfo.GetGranule()).second); - prevGranule = portionInfo.GetGranule(); - } - granule->RegisterBatchForFetching(std::move(portionInfo)); - } - GranulesContext->PrepareForStart(); - - Context.GetCounters().PortionBytes->Add(portionsBytes); - auto& stats = ReadMetadata->ReadStats; - stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size(); - stats->IndexPortions = ReadMetadata->SelectInfo->PortionsOrderedPK.size(); - stats->IndexBatches = ReadMetadata->NumIndexedBlobs(); - stats->CommittedBatches = ReadMetadata->CommittedBlobs.size(); - stats->SchemaColumns = ReadMetadata->GetSchemaColumnsCount(); - stats->FilterColumns = GranulesContext->GetEarlyFilterColumns().size(); - stats->AdditionalColumns = GranulesContext->GetPostFilterColumns().size(); - stats->PortionsBytes = portionsBytes; -} - -void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data) { - Y_VERIFY(GranulesContext); - NIndexedReader::TBatch* portionBatch = nullptr; - { - auto it = IndexedBlobSubscriber.find(blobRange); - Y_VERIFY(it != IndexedBlobSubscriber.end()); - portionBatch = GranulesContext->GetBatchInfo(it->second); - if (!portionBatch) { - ACFL_INFO("event", "batch for finished granule")("address", it->second.ToString()); - return; - } - IndexedBlobSubscriber.erase(it); - } - if (!portionBatch->AddIndexedReady(blobRange, data)) { - return; - } -} - -std::shared_ptr<arrow::RecordBatch> -TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const TSnapshot& snapshot) const { - Y_VERIFY(srcBatch); - - // Extract columns (without check), filter, attach snapshot, extract columns with check - // (do not filter snapshot columns) - auto dataSchema = ReadMetadata->GetLoadSchema(snapshot); - - auto batch = NArrow::ExtractExistedColumns(srcBatch, dataSchema->GetSchema()); - Y_VERIFY(batch); - - auto filter = FilterNotIndexed(batch, *ReadMetadata); - if (filter.IsTotalDenyFilter()) { - return nullptr; - } - auto preparedBatch = batch; - preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, snapshot); - auto resultSchema = ReadMetadata->GetLoadSchema(); - preparedBatch = resultSchema->NormalizeBatch(*dataSchema, preparedBatch); - preparedBatch = NArrow::ExtractColumns(preparedBatch, resultSchema->GetSchema()); - Y_VERIFY(preparedBatch); - - filter.Apply(preparedBatch); - return preparedBatch; -} - -std::vector<TPartialReadResult> TIndexedReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) { - Y_VERIFY(SortReplaceDescription); - auto& indexInfo = ReadMetadata->GetIndexInfo(); - - if (WaitCommitted.size()) { - // Wait till we have all not indexed data so we could replace keys in granules - return {}; - } - - // First time extract OutNotIndexed data - if (NotIndexed.size()) { - auto mergedBatch = MergeNotIndexed(std::move(NotIndexed)); // merged has no dups - if (mergedBatch) { - // Init split by granules structures - Y_VERIFY(ReadMetadata->SelectInfo); - TMarksGranules marksGranules(*ReadMetadata->SelectInfo); - - // committed data before the first granule would be placed in fake preceding granule - // committed data after the last granule would be placed into the last granule - marksGranules.MakePrecedingMark(indexInfo); - Y_VERIFY(!marksGranules.Empty()); - - auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, indexInfo); - GranulesContext->DrainNotIndexedBatches(&outNotIndexed); - if (outNotIndexed.size() == 1) { - auto it = outNotIndexed.begin(); - if (it->first) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("incorrect_granule_id", it->first); - Y_FAIL(); - } - NotIndexedOutscopeBatch = it->second; - } - } else { - GranulesContext->DrainNotIndexedBatches(nullptr); - } - NotIndexed.clear(); - } else { - GranulesContext->DrainNotIndexedBatches(nullptr); - } - - return ReadyToOut(maxRowsInBatch); -} - -/// @return batches that are not blocked by others -std::vector<TPartialReadResult> TIndexedReadData::ReadyToOut(const i64 maxRowsInBatch) { - Y_VERIFY(SortReplaceDescription); - Y_VERIFY(GranulesContext); - - std::vector<NIndexedReader::TGranule::TPtr> ready = GranulesContext->DetachReadyInOrder(); - std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out; - out.reserve(ready.size() + 1); - - // Prepend not indexed data (less then first granule) before granules for ASC sorting - if (ReadMetadata->IsAscSorted() && NotIndexedOutscopeBatch) { - out.push_back({}); - out.back().push_back(NotIndexedOutscopeBatch); - NotIndexedOutscopeBatch = nullptr; - } - - for (auto&& granule : ready) { - std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = granule->GetReadyBatches(); - - if (inGranule.empty()) { - continue; - } - out.emplace_back(std::move(inGranule)); - } - - // Append not indexed data (less then first granule) after granules for DESC sorting - if (GranulesContext->GetSortingPolicy()->ReadyForAddNotIndexedToEnd() && NotIndexedOutscopeBatch) { - out.push_back({}); - out.back().push_back(NotIndexedOutscopeBatch); - NotIndexedOutscopeBatch = nullptr; - } - - return MakeResult(std::move(out), maxRowsInBatch); -} - -std::shared_ptr<arrow::RecordBatch> -TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const { - Y_VERIFY(ReadMetadata->IsSorted()); - auto& indexInfo = ReadMetadata->GetIndexInfo(); - Y_VERIFY(indexInfo.GetSortingKey()); - - { - const auto pred = [](const std::shared_ptr<arrow::RecordBatch>& b) { - return !b || !b->num_rows(); - }; - batches.erase(std::remove_if(batches.begin(), batches.end(), pred), batches.end()); - } - - if (batches.empty()) { - return {}; - } - - // We could merge data here only if backpressure limits committed data size. KIKIMR-12520 - auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription()); - Y_VERIFY(merged); - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey())); - return merged; -} - -void TIndexedReadData::MergeTooSmallBatches(const std::shared_ptr<TMemoryAggregation>& memAggregation, std::vector<TPartialReadResult>& out) const { - if (out.size() < 10) { - return; - } - - i64 sumRows = 0; - for (auto& result : out) { - sumRows += result.GetResultBatch()->num_rows(); - } - if (sumRows / out.size() > 100) { - return; - } - - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.reserve(out.size()); - for (auto& batch : out) { - batches.push_back(batch.GetResultBatch()); - } - - auto res = arrow::Table::FromRecordBatches(batches); - if (!res.ok()) { - Y_VERIFY_DEBUG(false, "Cannot make table from batches"); - return; - } - - res = (*res)->CombineChunks(); - if (!res.ok()) { - Y_VERIFY_DEBUG(false, "Cannot combine chunks"); - return; - } - auto batch = NArrow::ToBatch(*res); - - std::vector<TPartialReadResult> merged; - - auto g = std::make_shared<TScanMemoryLimiter::TGuard>(GetMemoryAccessor(), memAggregation); - g->Take(NArrow::GetBatchMemorySize(batch)); - merged.emplace_back(TPartialReadResult(g, batch, out.back().GetLastReadKey())); - out.swap(merged); -} - -std::vector<TPartialReadResult> TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, - int64_t maxRowsInBatch) const { - Y_VERIFY(ReadMetadata->IsSorted()); - Y_VERIFY(SortReplaceDescription); - auto& indexInfo = ReadMetadata->GetIndexInfo(); - - std::vector<TPartialReadResult> out; - - bool isDesc = ReadMetadata->IsDescSorted(); - - for (auto& batches : granules) { - if (isDesc) { - std::reverse(batches.begin(), batches.end()); - } - for (auto& batch : batches) { - if (isDesc) { - auto permutation = NArrow::MakePermutation(batch->num_rows(), true); - batch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(batch, permutation)).record_batch(); - } - std::shared_ptr<TScanMemoryLimiter::TGuard> memGuard = std::make_shared<TScanMemoryLimiter::TGuard>( - GetMemoryAccessor(), Context.GetCounters().Aggregations.GetResultsReady()); - memGuard->Take(NArrow::GetBatchMemorySize(batch)); - - std::vector<std::shared_ptr<arrow::RecordBatch>> splitted = SliceBatch(batch, maxRowsInBatch); - - for (auto& batch : splitted) { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey(), isDesc)); - // Extract the last row's PK - auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); - auto lastKey = keyBatch->Slice(keyBatch->num_rows() - 1, 1); - - // Leave only requested columns - auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema()); - out.emplace_back(TPartialReadResult(memGuard, resultBatch, lastKey)); - } - } - } - - if (ReadMetadata->GetProgram().HasProgram()) { - MergeTooSmallBatches(Context.GetCounters().Aggregations.GetResultsReady(), out); - for (auto& result : out) { - result.ApplyProgram(ReadMetadata->GetProgram()); - } - } - return out; -} - -TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const TReadContext& context) - : TBase(context, readMetadata) - , OnePhaseReadMode(context.GetIsInternalRead()) -{ -} - -bool TIndexedReadData::DoIsFinished() const { - Y_VERIFY(GranulesContext); - return NotIndexed.empty() && FetchBlobsQueue.empty() && PriorityBlobsQueue.empty() && GranulesContext->IsFinished(); -} - -void TIndexedReadData::DoAbort() { - Y_VERIFY(GranulesContext); - Context.MutableProcessor().Stop(); - FetchBlobsQueue.Stop(); - PriorityBlobsQueue.Stop(); - GranulesContext->Abort(); - IndexedBlobSubscriber.clear(); - WaitCommitted.clear(); -} - -std::optional<TBlobRange> TIndexedReadData::DoExtractNextBlob(const bool hasReadyResults) { - Y_VERIFY(GranulesContext); - while (auto* f = PriorityBlobsQueue.front()) { - if (!GranulesContext->IsGranuleActualForProcessing(f->GetObjectId())) { - ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetObjectId()); - PriorityBlobsQueue.pop_front(); - continue; - } - - GranulesContext->ForceStartProcessGranule(f->GetObjectId(), f->GetRange()); - Context.GetCounters().OnPriorityFetch(f->GetRange().Size); - return PriorityBlobsQueue.pop_front(); - } - - while (auto* f = FetchBlobsQueue.front()) { - if (!GranulesContext->IsGranuleActualForProcessing(f->GetObjectId())) { - ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetObjectId()); - FetchBlobsQueue.pop_front(); - continue; - } - - if (GranulesContext->TryStartProcessGranule(f->GetObjectId(), f->GetRange(), hasReadyResults)) { - Context.GetCounters().OnGeneralFetch(f->GetRange().Size); - return FetchBlobsQueue.pop_front(); - } else { - Context.GetCounters().OnProcessingOverloaded(); - return {}; - } - } - return {}; -} - -void TIndexedReadData::AddNotIndexed(const TBlobRange& blobRange, const TString& column) { - auto it = WaitCommitted.find(blobRange.BlobId); - Y_VERIFY(it != WaitCommitted.end()); - auto batch = NArrow::DeserializeBatch(column, ReadMetadata->GetBlobSchema(it->second.GetSchemaSnapshot())); - NotIndexed.emplace_back(MakeNotIndexedBatch(batch, it->second.GetSchemaSnapshot())); - WaitCommitted.erase(it); -} - -void TIndexedReadData::AddNotIndexed(const TUnifiedBlobId& blobId, const std::shared_ptr<arrow::RecordBatch>& batch) { - auto it = WaitCommitted.find(blobId); - Y_VERIFY(it != WaitCommitted.end()); - NotIndexed.emplace_back(MakeNotIndexedBatch(batch, it->second.GetSchemaSnapshot())); - WaitCommitted.erase(it); -} - -void TIndexedReadData::DoAddData(const TBlobRange& blobRange, const TString& data) { - if (GranulesContext->IsFinished()) { - ACFL_DEBUG("event", "AddData on GranulesContextFinished"); - return; - } - if (IsIndexedBlob(blobRange)) { - AddIndexed(blobRange, data); - } else { - AddNotIndexed(blobRange, data); - } -} - -} diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h deleted file mode 100644 index 13fa7e27b3..0000000000 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ /dev/null @@ -1,96 +0,0 @@ -#pragma once -#include "defs.h" -#include "column_engine.h" -#include "predicate/predicate.h" -#include "reader/queue.h" -#include "reader/granule.h" -#include "reader/batch.h" -#include "reader/filling_context.h" -#include "reader/read_metadata.h" - -#include <ydb/library/accessor/accessor.h> -#include <ydb/core/tx/columnshard/counters.h> - -namespace NKikimr::NColumnShard { -class TScanIteratorBase; -} - -namespace NKikimr::NOlap { - -class TIndexedReadData: public IDataReader, TNonCopyable { -private: - using TBase = IDataReader; - std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext; - THashMap<TUnifiedBlobId, NOlap::TCommittedBlob> WaitCommitted; - - TFetchBlobsQueue FetchBlobsQueue; - TFetchBlobsQueue PriorityBlobsQueue; - bool OnePhaseReadMode = false; - std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; - - THashMap<TBlobRange, NIndexedReader::TBatchAddress> IndexedBlobSubscriber; - std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch; - std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; - - void AddNotIndexed(const TBlobRange& blobRange, const TString& column); - void AddNotIndexed(const TUnifiedBlobId& blobId, const std::shared_ptr<arrow::RecordBatch>& batch); - void RegisterZeroGranula(); - - void AddIndexed(const TBlobRange& blobRange, const TString& column); - bool IsIndexedBlob(const TBlobRange& blobRange) const { - return IndexedBlobSubscriber.contains(blobRange); - } -protected: - virtual TString DoDebugString() const override { - return TStringBuilder() - << "wait_committed:" << WaitCommitted.size() << ";" - << "granules_context:(" << (GranulesContext ? GranulesContext->DebugString() : "NO") << ");" - ; - } - - /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) - virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override; - - virtual void DoAbort() override; - virtual bool DoIsFinished() const override; - - virtual void DoAddData(const TBlobRange& blobRange, const TString& data) override; - virtual std::optional<TBlobRange> DoExtractNextBlob(const bool hasReadyResults) override; -public: - TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const TReadContext& context); - - NIndexedReader::TGranulesFillingContext& GetGranulesContext() { - Y_VERIFY(GranulesContext); - return *GranulesContext; - } - - void InitRead(); - - NOlap::TReadMetadata::TConstPtr GetReadMetadata() const { - return ReadMetadata; - } - - void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); - void OnBatchReady(const NIndexedReader::TBatch& /*batchInfo*/, std::shared_ptr<arrow::RecordBatch> batch) { - if (batch && batch->num_rows()) { - ReadMetadata->ReadStats->SelectedRows += batch->num_rows(); - } - } - - void AddBlobToFetchInFront(const ui64 granuleId, const TBlobRange& range) { - PriorityBlobsQueue.emplace_back(granuleId, range); - } - -private: - std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( - const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) const; - - std::shared_ptr<arrow::RecordBatch> MergeNotIndexed( - std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const; - std::vector<TPartialReadResult> ReadyToOut(const i64 maxRowsInBatch); - void MergeTooSmallBatches(const std::shared_ptr<TMemoryAggregation>& memAggregation, std::vector<TPartialReadResult>& out) const; - std::vector<TPartialReadResult> MakeResult( - std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, int64_t maxRowsInBatch) const; -}; - -} diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.cpp b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp index 092e852abc..64fa210150 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/meta.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp @@ -6,7 +6,7 @@ bool TInsertedDataMeta::DeserializeFromProto(const NKikimrTxColumnShard::TLogica if (proto.HasDirtyWriteTimeSeconds()) { DirtyWriteTime = TInstant::Seconds(proto.GetDirtyWriteTimeSeconds()); } - if (proto.HasRawBytes()) { + if (proto.HasSpecialKeysRawData()) { SpecialKeys = NArrow::TFirstLastSpecialKeys(proto.GetSpecialKeysRawData()); } NumRows = proto.GetNumRows(); diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h index 0e17895476..a460b22d8a 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.h +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -100,10 +100,6 @@ public: return ColumnId == item.ColumnId && Chunk == item.Chunk; } - std::optional<ui32> GetChunkRowsCount() const { - return Meta.GetNumRows(); - } - bool Valid() const { return ColumnId && ValidBlob(); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 21b0180180..1c7cd58e18 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -29,7 +29,7 @@ const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName) { const auto& indexInfo = snapshotSchema.GetIndexInfo(); - + Y_VERIFY(batch->num_rows() == NumRows()); Meta = {}; Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); Meta.FillBatchInfo(batch, indexInfo); @@ -89,7 +89,7 @@ ui64 TPortionInfo::GetRawBytes(const std::vector<ui32>& columnIds) const { } else { for (auto&& r : Records) { if (r.ColumnId == i) { - sum += r.GetMeta().GetRawBytes().value_or(0); + sum += r.GetMeta().GetRawBytesVerified(); } } } @@ -110,6 +110,8 @@ TString TPortionInfo::DebugString() const { sb << "(portion_id:" << Portion << ";" << "granule_id:" << Granule << ";records_count:" << NumRows() << ";" "min_snapshot:(" << MinSnapshot.DebugString() << ");" << +// "from:" << IndexKeyStart().DebugString() << ";" << +// "to:" << IndexKeyEnd().DebugString() << ";" << "size:" << BlobsBytes() << ";" << "meta:(" << Meta.DebugString() << ");"; if (RemoveSnapshot.Valid()) { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index b98c6e9d64..d7f008e55a 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -224,7 +224,7 @@ public: std::optional<ui32> columnIdFirst; for (auto&& i : Records) { if (!columnIdFirst || *columnIdFirst == i.ColumnId) { - result += i.GetMeta().GetNumRows().value_or(0); + result += i.GetMeta().GetNumRowsVerified(); columnIdFirst = i.ColumnId; } } @@ -235,7 +235,7 @@ public: ui32 result = 0; for (auto&& i : Records) { if (columnId == i.ColumnId) { - result += i.GetMeta().GetNumRows().value_or(0); + result += i.GetMeta().GetNumRowsVerified(); } } return result; @@ -246,7 +246,7 @@ public: ui64 RawBytesSum() const { ui64 result = 0; for (auto&& i : Records) { - result += i.GetMeta().GetRawBytes().value_or(0); + result += i.GetMeta().GetRawBytesVerified(); } return result; } diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index 46264ebdea..3f60021c20 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -6,7 +6,6 @@ # original buildsystem will not be accepted. -add_subdirectory(order_control) add_subdirectory(plain_reader) get_built_tool_path( TOOL_enum_parser_bin @@ -29,26 +28,18 @@ target_link_libraries(columnshard-engines-reader PUBLIC columnshard-hooks-abstract tx-columnshard-resources core-tx-program - engines-reader-order_control engines-reader-plain_reader columnshard-engines-scheme tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/processing_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index 8b5d772ca2..2702824112 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -6,7 +6,6 @@ # original buildsystem will not be accepted. -add_subdirectory(order_control) add_subdirectory(plain_reader) get_built_tool_path( TOOL_enum_parser_bin @@ -30,26 +29,18 @@ target_link_libraries(columnshard-engines-reader PUBLIC columnshard-hooks-abstract tx-columnshard-resources core-tx-program - engines-reader-order_control engines-reader-plain_reader columnshard-engines-scheme tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/processing_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index 8b5d772ca2..2702824112 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -6,7 +6,6 @@ # original buildsystem will not be accepted. -add_subdirectory(order_control) add_subdirectory(plain_reader) get_built_tool_path( TOOL_enum_parser_bin @@ -30,26 +29,18 @@ target_link_libraries(columnshard-engines-reader PUBLIC columnshard-hooks-abstract tx-columnshard-resources core-tx-program - engines-reader-order_control engines-reader-plain_reader columnshard-engines-scheme tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/processing_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index 46264ebdea..3f60021c20 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -6,7 +6,6 @@ # original buildsystem will not be accepted. -add_subdirectory(order_control) add_subdirectory(plain_reader) get_built_tool_path( TOOL_enum_parser_bin @@ -29,26 +28,18 @@ target_link_libraries(columnshard-engines-reader PUBLIC columnshard-hooks-abstract tx-columnshard-resources core-tx-program - engines-reader-order_control engines-reader-plain_reader columnshard-engines-scheme tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/description.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/processing_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp deleted file mode 100644 index 5e5823dd1b..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ /dev/null @@ -1,226 +0,0 @@ -#include "batch.h" -#include "granule.h" -#include "filter_assembler.h" -#include "postfilter_assembler.h" -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> -#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> - -namespace NKikimr::NOlap::NIndexedReader { - -TBatch::TBatch(const TBatchAddress& address, TGranule& owner, TPortionInfo&& portionInfoExt, const ui64 predictedBatchSize) - : BatchAddress(address) - , Portion(portionInfoExt.GetPortion()) - , Granule(owner.GetGranuleId()) - , PredictedBatchSize(predictedBatchSize) - , Owner(&owner) - , PortionInfo(std::move(portionInfoExt)) -{ - Y_VERIFY(Granule == PortionInfo.GetGranule()); - Y_VERIFY(PortionInfo.Records.size()); - - if (PortionInfo.CanIntersectOthers()) { - ACFL_TRACE("event", "intersect_portion"); - Owner->SetDuplicationsAvailable(true); - if (PortionInfo.CanHaveDups()) { - ACFL_TRACE("event", "dup_portion"); - DuplicationsAvailableFlag = true; - } - } -} - -NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) { - Y_VERIFY(WaitIndexed.empty()); - Y_VERIFY(PortionInfo.Produced()); - Y_VERIFY(!FetchedInfo.GetFilteredBatch()); - - auto blobSchema = readMetadata->GetLoadSchema(PortionInfo.GetMinSnapshot()); - auto readSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot()); - ISnapshotSchema::TPtr resultSchema; - if (CurrentColumnIds) { - resultSchema = std::make_shared<TFilteredSnapshotSchema>(readSchema, *CurrentColumnIds); - } else { - resultSchema = readSchema; - } - auto batchConstructor = PortionInfo.PrepareForAssemble(*blobSchema, *resultSchema, Data); - Data.clear(); - if (!FetchedInfo.GetFilter()) { - return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, - *this, Owner->GetEarlyFilterColumns(), processor, Owner->GetOwner().GetSortingPolicy()); - } else { - return std::make_shared<TAssembleBatch>(std::move(batchConstructor), *this, readMetadata->GetColumnsOrder(), processor); - } -} - -bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const { - if (!CurrentColumnIds) { - return true; - } - if (AskedColumnIds.size() < columnIds.size()) { - return false; - } - for (auto&& i : columnIds) { - if (!AskedColumnIds.contains(i)) { - return false; - } - } - return true; -} - -ui64 TBatch::GetFetchBytes(const std::set<ui32>& columnIds) { - ui64 result = 0; - for (const NOlap::TColumnRecord& rec : PortionInfo.Records) { - if (!columnIds.contains(rec.ColumnId)) { - continue; - } - Y_VERIFY(rec.Valid()); - result += rec.BlobRange.Size; - } - return result; -} - -void TBatch::ResetCommon(const std::set<ui32>& columnIds) { - CurrentColumnIds = columnIds; - Y_VERIFY(CurrentColumnIds->size()); - for (auto&& i : *CurrentColumnIds) { - Y_VERIFY(AskedColumnIds.emplace(i).second); - } - - Y_VERIFY(WaitIndexed.empty()); - Y_VERIFY(Data.empty()); - WaitingBytes = 0; - FetchedBytes = 0; -} - -void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) { - Y_VERIFY(!FetchedInfo.GetFilter()); - ResetCommon(columnIds); - for (const NOlap::TColumnRecord& rec : PortionInfo.Records) { - if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { - continue; - } - Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second); - Owner->AddBlobForFetch(rec.BlobRange, *this); - Y_VERIFY(rec.Valid()); - WaitingBytes += rec.BlobRange.Size; - } -} - -void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { - Y_VERIFY(FetchedInfo.GetFilter()); - ResetCommon(columnIds); - std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects; - for (const NOlap::TColumnRecord& rec : PortionInfo.Records) { - if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { - continue; - } - orderedObjects[rec.ColumnId][rec.Chunk] = &rec; - Y_VERIFY(rec.Valid()); - } - - for (auto&& columnInfo : orderedObjects) { - ui32 expected = 0; - auto it = FetchedInfo.GetFilter()->GetIterator(false); - bool undefinedShift = false; - bool itFinished = false; - for (auto&& [chunk, rec] : columnInfo.second) { - Y_VERIFY(!itFinished); - Y_VERIFY(expected++ == chunk); - if (!rec->GetChunkRowsCount()) { - undefinedShift = true; - } - if (!undefinedShift && it.IsBatchForSkip(*rec->GetChunkRowsCount())) { - Data.emplace(rec->BlobRange, TPortionInfo::TAssembleBlobInfo(*rec->GetChunkRowsCount())); - } else { - Y_VERIFY(WaitIndexed.emplace(rec->BlobRange).second); - Owner->AddBlobForFetch(rec->BlobRange, *this); - WaitingBytes += rec->BlobRange.Size; - } - if (!undefinedShift) { - itFinished = !it.Next(*rec->GetChunkRowsCount()); - } - } - } - CheckReadyForAssemble(); -} - -bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, - const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter) -{ - FetchedInfo.InitFilter(filter, filterBatch, originalRecordsCount, notAppliedEarlyFilter); - return Owner->OnFilterReady(*this); -} - -void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) { - FetchedInfo.InitBatch(batch); - Owner->OnBatchReady(*this, batch); -} - -bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) { - if (!WaitIndexed.erase(bRange)) { - Y_ASSERT(false); - return false; - } - WaitingBytes -= bRange.Size; - FetchedBytes += bRange.Size; - Data.emplace(bRange, TPortionInfo::TAssembleBlobInfo(blobData)); - Owner->OnBlobReady(bRange); - CheckReadyForAssemble(); - return true; -} - -ui64 TBatch::GetUsefulBytes(const ui64 bytes) const { - return bytes * FetchedInfo.GetUsefulDataKff(); -} - -std::shared_ptr<TSortableBatchPosition> TBatch::GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const { - if (!FirstPK || !ReverseFirstPK) { - std::shared_ptr<TSortableBatchPosition> from; - std::shared_ptr<TSortableBatchPosition> to; - GetPKBorders(reverse, indexInfo, from, to); - } - if (reverse) { - return *ReverseFirstPK; - } else { - return *FirstPK; - } -} - -void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const { - auto indexKey = indexInfo.GetIndexKey(); - Y_VERIFY(PortionInfo.Valid()); - if (!FirstPK) { - const NArrow::TReplaceKey& minRecord = PortionInfo.IndexKeyStart(); - auto batch = minRecord.ToBatch(indexKey); - Y_VERIFY(batch); - FirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), false); - ReverseLastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), true); - } - if (!LastPK) { - const NArrow::TReplaceKey& maxRecord = PortionInfo.IndexKeyEnd(); - auto batch = maxRecord.ToBatch(indexKey); - Y_VERIFY(batch); - LastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), false); - ReverseFirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), true); - } - if (reverse) { - from = *ReverseFirstPK; - to = *ReverseLastPK; - } else { - from = *FirstPK; - to = *LastPK; - } -} - -bool TBatch::CheckReadyForAssemble() { - if (IsFetchingReady()) { - auto& context = Owner->GetOwner(); - auto processor = context.GetTasksProcessor(); - if (auto assembleBatchTask = AssembleTask(processor.GetObject(), context.GetReadMetadata())) { - processor.Add(context.GetOwner(), assembleBatchTask); - } - return true; - } - return false; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h deleted file mode 100644 index 338a610718..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ /dev/null @@ -1,161 +0,0 @@ -#pragma once -#include "common.h" -#include "conveyor_task.h" -#include "read_filter_merger.h" - -#include <ydb/core/formats/arrow/arrow_filter.h> -#include <ydb/core/formats/arrow/size_calcer.h> -#include <ydb/core/tx/columnshard/blob.h> -#include <ydb/core/tx/columnshard/engines/portion_info.h> - -#include <ydb/library/accessor/accessor.h> - -#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> - -namespace NKikimr::NOlap { -struct TReadMetadata; -} - -namespace NKikimr::NOlap::NIndexedReader { - -class TGranule; -class TBatch; - -class TBatchFetchedInfo { -private: - YDB_READONLY_DEF(std::optional<ui64>, BatchSize); - YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch); - YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch); - YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); - YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedEarlyFilter); - ui32 OriginalRecordsCount = 0; -public: - void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, - const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter) { - Y_VERIFY(filter); - Y_VERIFY(!Filter); - Y_VERIFY(!FilterBatch); - Filter = filter; - FilterBatch = filterBatch; - OriginalRecordsCount = originalRecordsCount; - NotAppliedEarlyFilter = notAppliedEarlyFilter; - } - - double GetUsefulDataKff() const { - if (!FilterBatch || !FilterBatch->num_rows()) { - return 0; - } - Y_VERIFY_DEBUG(OriginalRecordsCount); - if (!OriginalRecordsCount) { - return 0; - } - return 1.0 * FilterBatch->num_rows() / OriginalRecordsCount; - } - - bool IsFiltered() const { - return !!Filter; - } - - void InitBatch(std::shared_ptr<arrow::RecordBatch> fullBatch) { - Y_VERIFY(!FilteredBatch); - FilteredBatch = fullBatch; - BatchSize = NArrow::GetBatchMemorySize(FilteredBatch); - } - - ui32 GetFilteredRecordsCount() const { - Y_VERIFY(IsFiltered()); - if (!FilterBatch) { - return 0; - } else { - return FilterBatch->num_rows(); - } - } -}; - -class TBatch: TNonCopyable { -private: - const TBatchAddress BatchAddress; - YDB_READONLY(ui64, Portion, 0); - YDB_READONLY(ui64, Granule, 0); - YDB_READONLY(ui64, WaitingBytes, 0); - YDB_READONLY(ui64, FetchedBytes, 0); - const ui64 PredictedBatchSize; - - THashSet<TBlobRange> WaitIndexed; - mutable std::optional<std::shared_ptr<TSortableBatchPosition>> FirstPK; - mutable std::optional<std::shared_ptr<TSortableBatchPosition>> LastPK; - mutable std::optional<std::shared_ptr<TSortableBatchPosition>> ReverseFirstPK; - mutable std::optional<std::shared_ptr<TSortableBatchPosition>> ReverseLastPK; - - YDB_READONLY_FLAG(DuplicationsAvailable, false); - YDB_READONLY_DEF(TBatchFetchedInfo, FetchedInfo); - THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; - TGranule* Owner; - const TPortionInfo PortionInfo; - - YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds); - std::set<ui32> AskedColumnIds; - void ResetCommon(const std::set<ui32>& columnIds); - ui64 GetUsefulBytes(const ui64 bytes) const; - bool CheckReadyForAssemble(); - bool IsFetchingReady() const { - return WaitIndexed.empty(); - } - -public: - std::shared_ptr<TSortableBatchPosition> GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const; - void GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const; - - ui64 GetPredictedBatchSize() const { - return PredictedBatchSize; - } - - ui64 GetRealBatchSizeVerified() const { - auto result = FetchedInfo.GetBatchSize(); - Y_VERIFY(result); - return *result; - } - - bool AllowEarlyFilter() const { - return PortionInfo.AllowEarlyFilter(); - } - const TBatchAddress& GetBatchAddress() const { - return BatchAddress; - } - - ui64 GetUsefulWaitingBytes() const { - return GetUsefulBytes(WaitingBytes); - } - - ui64 GetUsefulFetchedBytes() const { - return GetUsefulBytes(FetchedBytes); - } - - TBatch(const TBatchAddress& address, TGranule& owner, TPortionInfo&& portionInfo, const ui64 predictedBatchSize); - bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData); - bool AskedColumnsAlready(const std::set<ui32>& columnIds) const; - - void ResetNoFilter(const std::set<ui32>& columnIds); - void ResetWithFilter(const std::set<ui32>& columnIds); - ui64 GetFetchBytes(const std::set<ui32>& columnIds); - - ui32 GetFilteredRecordsCount() const; - bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, - const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter); - void InitBatch(std::shared_ptr<arrow::RecordBatch> batch); - - NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata); - - const THashSet<TBlobRange>& GetWaitingBlobs() const { - return WaitIndexed; - } - - const TGranule& GetOwner() const { - return *Owner; - } - - const TPortionInfo& GetPortionInfo() const { - return PortionInfo; - } -}; -} diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp index 2116c74733..a3c005fd2a 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp @@ -1,5 +1,5 @@ #include "conveyor_task.h" -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> +#include "read_context.h" namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp deleted file mode 100644 index b7bc120a42..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "filling_context.h" -#include "order_control/not_sorted.h" -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> -#include <ydb/core/tx/columnshard/resources/memory.h> -#include <util/string/join.h> - -namespace NKikimr::NOlap::NIndexedReader { - -TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading) - : ReadMetadata(readMetadata) - , InternalReading(internalReading) - , Processing(owner.GetMemoryAccessor(), owner.GetCounters()) - , Result(owner.GetCounters()) - , GranulesLiveContext(std::make_shared<TGranulesLiveControl>()) - , Owner(owner) - , Counters(owner.GetCounters()) -{ - SortingPolicy = InternalReading ? std::make_shared<TNonSorting>(ReadMetadata) : ReadMetadata->BuildSortingPolicy(); - - UsedColumns = ReadMetadata->GetUsedColumnIds(); - EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(); - FilterStageColumns = SortingPolicy->GetFilterStageColumns(); - PKColumnNames = ReadMetadata->GetReplaceKey()->field_names(); - PostFilterColumns = ReadMetadata->GetUsedColumnIds(); - for (auto&& i : FilterStageColumns) { - PostFilterColumns.erase(i); - } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TGranulesFillingContext")("used", UsedColumns.size())("early", EarlyFilterColumns.size()) - ("filter_stage", FilterStageColumns.size())("PKColumnNames", JoinSeq(",", PKColumnNames))("post_filter", PostFilterColumns.size()); -} - -bool TGranulesFillingContext::PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const { - if (!portionInfo.AllowEarlyFilter()) { - return false; - } - if (EarlyFilterColumns.empty()) { - return false; - } - if (TIndexInfo::IsSpecialColumns(EarlyFilterColumns)) { - return false; - } - return true; -} - -void TGranulesFillingContext::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { - return Owner.AddBlobForFetch(range, batch); -} - -void TGranulesFillingContext::OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { - return Owner.OnBatchReady(batchInfo, batch); -} - -NIndexedReader::TBatch* TGranulesFillingContext::GetBatchInfo(const TBatchAddress& address) { - return Processing.GetBatchInfo(address); -} - -NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfoVerified(const TBatchAddress& address) { - return Processing.GetBatchInfoVerified(address); -} - -NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const { - return Owner.GetTasksProcessor(); -} - -void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) { - Processing.DrainNotIndexedBatches(batches); -} - -bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range, const bool hasReadyResults) { - Y_VERIFY_DEBUG(!Result.IsReady(granuleId)); - if (InternalReading || Processing.IsInProgress(granuleId) - || (!GranulesLiveContext->GetCount() && !hasReadyResults) - || GetMemoryAccessor()->GetLimiter().HasBufferOrSubscribe(GetMemoryAccessor()) - ) - { - Processing.StartBlobProcessing(granuleId, range); - return true; - } else { - return false; - } -} - -bool TGranulesFillingContext::ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range) { - Y_VERIFY_DEBUG(!Result.IsReady(granuleId)); - Processing.StartBlobProcessing(granuleId, range); - return true; -} - -void TGranulesFillingContext::OnGranuleReady(const ui64 granuleId) { - Result.AddResult(Processing.ExtractReadyVerified(granuleId)); -} - -std::vector<NKikimr::NOlap::NIndexedReader::TGranule::TPtr> TGranulesFillingContext::DetachReadyInOrder() { - Y_VERIFY(SortingPolicy); - return SortingPolicy->DetachReadyGranules(Result); -} - -const std::shared_ptr<NKikimr::NOlap::TActorBasedMemoryAccesor>& TGranulesFillingContext::GetMemoryAccessor() const { - return Owner.GetMemoryAccessor(); -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h deleted file mode 100644 index 04e6cd01e7..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ /dev/null @@ -1,140 +0,0 @@ -#pragma once -#include "conveyor_task.h" -#include "granule.h" -#include "processing_context.h" -#include "order_control/abstract.h" -#include <ydb/core/tx/columnshard/counters/common/object_counter.h> -#include <util/generic/hash.h> - -namespace NKikimr::NOlap { -class TIndexedReadData; -} - -namespace NKikimr::NOlap::NIndexedReader { - -class TGranulesFillingContext: TNonCopyable, public NColumnShard::TMonitoringObjectsCounter<TGranulesFillingContext, true, false> { -private: - YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames); - TReadMetadata::TConstPtr ReadMetadata; - bool StartedFlag = false; - const bool InternalReading = false; - TProcessingController Processing; - TResultController Result; - std::shared_ptr<TGranulesLiveControl> GranulesLiveContext; - TIndexedReadData& Owner; - std::set<ui32> EarlyFilterColumns; - std::set<ui32> PostFilterColumns; - std::set<ui32> FilterStageColumns; - std::set<ui32> UsedColumns; - IOrderPolicy::TPtr SortingPolicy; - NColumnShard::TConcreteScanCounters Counters; - bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const; - - bool CheckBufferAvailable() const; -public: - TIndexedReadData& GetOwner() { - return Owner; - } - - std::shared_ptr<TGranulesLiveControl> GetGranulesLiveContext() const { - return GranulesLiveContext; - } - bool IsGranuleActualForProcessing(const ui64 granuleId) const { - return Processing.IsGranuleActualForProcessing(granuleId); - } - bool ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range); - bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range, const bool hasReadyResults); - TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading); - - const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const; - - TString DebugString() const { - return TStringBuilder() - << "processing:(" << Processing.DebugString() << ");" - << "result:(" << Result.DebugString() << ");" - << "sorting_policy:(" << SortingPolicy->DebugString() << ");" - ; - } - - void OnBlobReady(const ui64 /*granuleId*/, const TBlobRange& /*range*/) noexcept { - } - - TReadMetadata::TConstPtr GetReadMetadata() const noexcept { - return ReadMetadata; - } - - const std::set<ui32>& GetEarlyFilterColumns() const noexcept { - return EarlyFilterColumns; - } - - const std::set<ui32>& GetPostFilterColumns() const noexcept { - return PostFilterColumns; - } - - IOrderPolicy::TPtr GetSortingPolicy() const noexcept { - return SortingPolicy; - } - - const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept { - return Counters; - } - - NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const; - - void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); - NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address); - NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address); - - void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); - void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); - - TGranule::TPtr GetGranuleVerified(const ui64 granuleId) { - return Processing.GetGranuleVerified(granuleId); - } - - bool IsFinished() const { - return Processing.IsFinished() && !Result.GetCount(); - } - - void OnNewBatch(TBatch& batch) { - Y_VERIFY(!StartedFlag); - if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) { - batch.ResetNoFilter(FilterStageColumns); - } else { - batch.ResetNoFilter(UsedColumns); - } - } - - std::vector<TGranule::TPtr> DetachReadyInOrder(); - - void Abort() { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort"); - Processing.Abort(); - Result.Clear(); - Y_VERIFY(IsFinished()); - } - - TGranule::TPtr UpsertGranule(const ui64 granuleId) { - Y_VERIFY(!StartedFlag); - auto g = Processing.GetGranule(granuleId); - if (!g) { - return Processing.InsertGranule(std::make_shared<TGranule>(granuleId, *this)); - } else { - return g; - } - } - - void OnGranuleReady(const ui64 granuleId); - - void Wakeup(TGranule& granule) { - SortingPolicy->Wakeup(granule, *this); - } - - void PrepareForStart() { - Y_VERIFY(!StartedFlag); - StartedFlag = true; - SortingPolicy->Fill(*this); - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp deleted file mode 100644 index 972046bffa..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ /dev/null @@ -1,69 +0,0 @@ -#include "filter_assembler.h" -#include <ydb/core/tx/columnshard/engines/filter.h> -#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> - -namespace NKikimr::NOlap::NIndexedReader { - -bool TAssembleFilter::DoExecuteImpl() { - /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey. - /// It's not OK to apply predicate before replacing key duplicates otherwise. - /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - - TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.IncludedColumnIds = FilterColumnIds; - auto batch = BatchConstructor.Assemble(options); - Y_VERIFY(batch); - Y_VERIFY(batch->num_rows()); - OriginalCount = batch->num_rows(); - Filter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata)); - if (!Filter->Apply(batch)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount)("columns_count", FilterColumnIds.size()); - FilteredBatch = nullptr; - return true; - } - auto earlyFilter = ReadMetadata->GetProgram().BuildEarlyFilter(batch); - if (earlyFilter) { - if (AllowEarlyFilter) { - Filter = std::make_shared<NArrow::TColumnFilter>(Filter->CombineSequentialAnd(*earlyFilter)); - if (!earlyFilter->Apply(batch)) { - NYDBTest::TControllers::GetColumnShardController()->OnAfterFilterAssembling(batch); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount)("columns_count", FilterColumnIds.size());; - FilteredBatch = nullptr; - return true; - } else { - NYDBTest::TControllers::GetColumnShardController()->OnAfterFilterAssembling(batch); - } - } else if (BatchesOrderPolicy->NeedNotAppliedEarlyFilter()) { - EarlyFilter = earlyFilter; - } - } - - if ((size_t)batch->schema()->num_fields() < BatchConstructor.GetColumnsCount()) { - TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.ExcludedColumnIds = FilterColumnIds; - auto addBatch = BatchConstructor.Assemble(options); - Y_VERIFY(addBatch); - Y_VERIFY(Filter->Apply(addBatch)); - Y_VERIFY(NArrow::MergeBatchColumns({ batch, addBatch }, batch, BatchConstructor.GetSchemaColumnNames(), true)); - } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data") - ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter) - ("filter_columns", FilterColumnIds.size()); - - FilteredBatch = batch; - return true; -} - -bool TAssembleFilter::DoApply(IDataReader& owner) const { - Y_VERIFY(OriginalCount); - auto& reader = owner.GetMeAs<TIndexedReadData>(); - reader.GetCounters().OriginalRowsCount->Add(OriginalCount); - reader.GetCounters().AssembleFilterCount->Add(1); - TBatch* batch = reader.GetGranulesContext().GetBatchInfo(BatchAddress); - if (batch) { - batch->InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter); - } - return true; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h deleted file mode 100644 index 0422a62051..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h +++ /dev/null @@ -1,51 +0,0 @@ -#pragma once -#include "common.h" -#include "conveyor_task.h" - -#include <ydb/core/formats/arrow/arrow_filter.h> -#include <ydb/core/tx/columnshard/engines/portion_info.h> -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> -#include <ydb/core/tx/columnshard/counters/common/object_counter.h> - -#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> - -namespace NKikimr::NOlap::NIndexedReader { - - class TAssembleFilter: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TAssembleFilter, true, true> { - private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; - TPortionInfo::TPreparedBatchData BatchConstructor; - std::shared_ptr<arrow::RecordBatch> FilteredBatch; - NOlap::TReadMetadata::TConstPtr ReadMetadata; - std::shared_ptr<NArrow::TColumnFilter> Filter; - std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; - const TBatchAddress BatchAddress; - ui32 OriginalCount = 0; - bool AllowEarlyFilter = false; - std::set<ui32> FilterColumnIds; - IOrderPolicy::TPtr BatchesOrderPolicy; - protected: - virtual bool DoApply(IDataReader& owner) const override; - virtual bool DoExecuteImpl() override; - public: - - virtual TString GetTaskClassIdentifier() const override { - return "Reading::TAssembleFilter"; - } - - TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, - TBatch& batch, const std::set<ui32>& filterColumnIds, NColumnShard::IDataTasksProcessor::TPtr processor, - IOrderPolicy::TPtr batchesOrderPolicy) - : TBase(processor) - , BatchConstructor(batchConstructor) - , ReadMetadata(readMetadata) - , BatchAddress(batch.GetBatchAddress()) - , AllowEarlyFilter(batch.AllowEarlyFilter()) - , FilterColumnIds(filterColumnIds) - , BatchesOrderPolicy(batchesOrderPolicy) - { - TBase::SetPriority(TBase::EPriority::Normal); - } - }; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp deleted file mode 100644 index ed8f5c4053..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ /dev/null @@ -1,187 +0,0 @@ -#include "granule.h" -#include "granule_preparation.h" -#include "filling_context.h" -#include <ydb/core/tx/columnshard/engines/portion_info.h> -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> -#include <ydb/core/tx/columnshard/engines/filter.h> -#include <ydb/core/tx/columnshard/engines/index_info.h> - -namespace NKikimr::NOlap::NIndexedReader { - -void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { - if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) { - return; - } - Y_VERIFY(!ReadyFlag); - Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchAddress().GetBatchGranuleIdx())); - if (InConstruction) { - GranuleDataSize.Take(batchInfo.GetRealBatchSizeVerified()); - GranuleDataSize.Free(batchInfo.GetPredictedBatchSize()); - RawDataSizeReal += batchInfo.GetRealBatchSizeVerified(); - } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId) - ("batch_address", batchInfo.GetBatchAddress().ToString())("count", WaitBatches.size())("in_construction", InConstruction); - if (batch && batch->num_rows()) { - RecordBatches.emplace_back(batch); - - auto& indexInfo = Owner->GetReadMetadata()->GetIndexInfo(); - if (!batchInfo.IsDuplicationsAvailable()) { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey(), false)); - } else { - AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion_on_ready"); - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, indexInfo.GetReplaceKey(), false)); - Y_VERIFY(IsDuplicationsAvailable()); - BatchesToDedup.insert(batch.get()); - } - } - Owner->OnBatchReady(batchInfo, batch); - CheckReady(); -} - -TBatch& TGranule::RegisterBatchForFetching(TPortionInfo&& portionInfo) { - const ui64 batchSize = portionInfo.GetRawBytes(Owner->GetReadMetadata()->GetAllColumns()); - RawDataSize += batchSize; - const ui64 filtersSize = portionInfo.NumRows() * (8 + 8); - RawDataSize += filtersSize; - ACFL_DEBUG("event", "RegisterBatchForFetching") - ("columns_count", Owner->GetReadMetadata()->GetAllColumns().size())("batch_raw_size", batchSize)("granule_size", RawDataSize) - ("filter_size", filtersSize); - - Y_VERIFY(!ReadyFlag); - ui32 batchGranuleIdx = Batches.size(); - WaitBatches.emplace(batchGranuleIdx); - Batches.emplace_back(TBatchAddress(GranuleId, batchGranuleIdx), *this, std::move(portionInfo), batchSize); - Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second); - Owner->OnNewBatch(Batches.back()); - return Batches.back(); -} - -void TGranule::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const { - Owner->AddBlobForFetch(range, batch); -} - -const std::set<ui32>& TGranule::GetEarlyFilterColumns() const { - return Owner->GetEarlyFilterColumns(); -} - -bool TGranule::OnFilterReady(TBatch& batchInfo) { - if (ReadyFlag) { - return false; - } - return Owner->GetSortingPolicy()->OnFilterReady(batchInfo, *this, *Owner); -} - -std::deque<TGranule::TBatchForMerge> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata) { - std::deque<TBatchForMerge> batches; - for (auto&& i : Batches) { - std::shared_ptr<TSortableBatchPosition> from; - std::shared_ptr<TSortableBatchPosition> to; - i.GetPKBorders(reverse, readMetadata->GetIndexInfo(), from, to); - batches.emplace_back(TBatchForMerge(&i, from, to)); - } - std::sort(batches.begin(), batches.end()); - ui32 currentPoolId = 0; - std::map<TSortableBatchPosition, ui32> poolIds; - for (auto&& i : batches) { - if (!i.GetFrom()) { - Y_VERIFY(!currentPoolId); - continue; - } - auto it = poolIds.rbegin(); - for (; it != poolIds.rend(); ++it) { - if (it->first.Compare(*i.GetFrom()) < 0) { - break; - } - } - if (it != poolIds.rend()) { - i.SetPoolId(it->second); - poolIds.erase(it->first); - } else { - i.SetPoolId(++currentPoolId); - } - if (i.GetTo()) { - poolIds.emplace(*i.GetTo(), *i.GetPoolId()); - } - } - return batches; -} - -void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { - if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) { - return; - } - Y_VERIFY(!ReadyFlag); - Y_VERIFY(!NotIndexedBatchReadyFlag || !batch); - if (!NotIndexedBatchReadyFlag) { - ACFL_TRACE("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size()); - } else { - return; - } - NotIndexedBatchReadyFlag = true; - if (batch && batch->num_rows()) { - GranuleDataSize.Take(NArrow::GetBatchDataSize(batch)); - Y_VERIFY(!NotIndexedBatch); - NotIndexedBatch = batch; - if (NotIndexedBatch) { - RecordBatches.emplace_back(NotIndexedBatch); - } - NotIndexedBatchFutureFilter = Owner->GetReadMetadata()->GetProgram().BuildEarlyFilter(batch); - DuplicationsAvailableFlag = true; - } - CheckReady(); - Owner->Wakeup(*this); -} - -void TGranule::OnGranuleDataPrepared(std::vector<std::shared_ptr<arrow::RecordBatch>>&& data) { - ReadyFlag = true; - RecordBatches = data; - Owner->OnGranuleReady(GranuleId); -} - -void TGranule::CheckReady() { - if (WaitBatches.empty() && NotIndexedBatchReadyFlag) { - - if (RecordBatches.empty() || !IsDuplicationsAvailable()) { - ReadyFlag = true; - ACFL_DEBUG("event", "granule_ready")("predicted_size", RawDataSize)("real_size", RawDataSizeReal); - Owner->OnGranuleReady(GranuleId); - } else { - ACFL_DEBUG("event", "granule_preparation")("predicted_size", RawDataSize)("real_size", RawDataSizeReal); - std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = std::move(RecordBatches); - auto processor = Owner->GetTasksProcessor(); - processor.Add(Owner->GetOwner(), std::make_shared<TTaskGranulePreparation>(std::move(inGranule), std::move(BatchesToDedup), GranuleId, Owner->GetReadMetadata(), processor.GetObject())); - } - } -} - -void TGranule::OnBlobReady(const TBlobRange& range) noexcept { - Y_VERIFY(InConstruction); - if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) { - return; - } - Y_VERIFY(!ReadyFlag); - Owner->OnBlobReady(GranuleId, range); -} - -TGranule::~TGranule() { - if (InConstruction) { - LiveController->Dec(); - } -} - -TGranule::TGranule(const ui64 granuleId, TGranulesFillingContext& owner) - : GranuleId(granuleId) - , LiveController(owner.GetGranulesLiveContext()) - , Owner(&owner) - , GranuleDataSize(Owner->GetMemoryAccessor(), Owner->GetCounters().Aggregations.GetGranulesProcessing()) -{ - -} - -void TGranule::StartConstruction() { - InConstruction = true; - LiveController->Inc(); - GranuleDataSize.Take(RawDataSize); -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h deleted file mode 100644 index 41de01f2be..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ /dev/null @@ -1,159 +0,0 @@ -#pragma once -#include "batch.h" -#include "read_metadata.h" - -#include <ydb/library/accessor/accessor.h> -#include <ydb/core/tx/columnshard/engines/portion_info.h> - -#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> - -namespace NKikimr::NOlap::NIndexedReader { - -class TGranulesFillingContext; -class TGranulesLiveControl { -private: - TAtomicCounter GranulesCounter = 0; -public: - i64 GetCount() const { - return GranulesCounter.Val(); - } - - void Inc() { - GranulesCounter.Inc(); - } - void Dec() { - Y_VERIFY(GranulesCounter.Dec() >= 0); - } -}; - -class TGranule { -public: - using TPtr = std::shared_ptr<TGranule>; -private: - ui64 GranuleId = 0; - ui64 RawDataSize = 0; - ui64 RawDataSizeReal = 0; - - bool NotIndexedBatchReadyFlag = false; - bool InConstruction = false; - std::shared_ptr<arrow::RecordBatch> NotIndexedBatch; - std::shared_ptr<NArrow::TColumnFilter> NotIndexedBatchFutureFilter; - - std::vector<std::shared_ptr<arrow::RecordBatch>> RecordBatches; - bool DuplicationsAvailableFlag = false; - bool ReadyFlag = false; - std::deque<TBatch> Batches; - std::set<ui32> WaitBatches; - std::set<ui32> GranuleBatchNumbers; - std::shared_ptr<TGranulesLiveControl> LiveController; - TGranulesFillingContext* Owner = nullptr; - THashSet<const void*> BatchesToDedup; - - TScanMemoryLimiter::TGuard GranuleDataSize; - void CheckReady(); -public: - TGranule(const ui64 granuleId, TGranulesFillingContext& owner); - ~TGranule(); - - ui64 GetGranuleId() const noexcept { - return GranuleId; - } - - void OnGranuleDataPrepared(std::vector<std::shared_ptr<arrow::RecordBatch>>&& data); - - const THashSet<const void*>& GetBatchesToDedup() const noexcept { - return BatchesToDedup; - } - - const std::shared_ptr<arrow::RecordBatch>& GetNotIndexedBatch() const noexcept { - return NotIndexedBatch; - } - - const std::shared_ptr<NArrow::TColumnFilter>& GetNotIndexedBatchFutureFilter() const noexcept { - return NotIndexedBatchFutureFilter; - } - - bool IsNotIndexedBatchReady() const noexcept { - return NotIndexedBatchReadyFlag; - } - - bool IsDuplicationsAvailable() const noexcept { - return DuplicationsAvailableFlag; - } - - void SetDuplicationsAvailable(bool val) noexcept { - DuplicationsAvailableFlag = val; - } - - bool IsReady() const noexcept { - return ReadyFlag; - } - - std::vector<std::shared_ptr<arrow::RecordBatch>> GetReadyBatches() const { - return RecordBatches; - } - - TBatch& GetBatchInfo(const ui32 batchIdx) { - Y_VERIFY(batchIdx < Batches.size()); - return Batches[batchIdx]; - } - - void AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch); - - const TGranulesFillingContext& GetOwner() const { - return *Owner; - } - - TGranulesFillingContext& GetOwner() { - return *Owner; - } - - class TBatchForMerge { - private: - TBatch* Batch = nullptr; - YDB_ACCESSOR_DEF(std::optional<ui32>, PoolId); - YDB_ACCESSOR_DEF(std::shared_ptr<TSortableBatchPosition>, From); - YDB_ACCESSOR_DEF(std::shared_ptr<TSortableBatchPosition>, To); - public: - TBatch* operator->() { - return Batch; - } - - TBatch& operator*() { - return *Batch; - } - - TBatchForMerge(TBatch* batch, std::shared_ptr<TSortableBatchPosition> from, std::shared_ptr<TSortableBatchPosition> to) - : Batch(batch) - , From(from) - , To(to) - { - Y_VERIFY(Batch); - } - - bool operator<(const TBatchForMerge& item) const { - if (!From && !item.From) { - return false; - } else if (From && item.From) { - return From->Compare(*item.From) == std::partial_ordering::less; - } else if (!From) { - return true; - } else { - return false; - } - } - }; - - std::deque<TBatchForMerge> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata); - - const std::set<ui32>& GetEarlyFilterColumns() const; - void StartConstruction(); - void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); - void OnBlobReady(const TBlobRange& range) noexcept; - bool OnFilterReady(TBatch& batchInfo); - TBatch& RegisterBatchForFetching(TPortionInfo&& portionInfo); - void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const; - -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp deleted file mode 100644 index 14ad92c436..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include "granule_preparation.h" -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> - -namespace NKikimr::NOlap::NIndexedReader { - -std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TTaskGranulePreparation::GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo) { - std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo] - rangesSlices.reserve(batches.size()); - { - TMap<NArrow::TReplaceKey, std::vector<std::shared_ptr<arrow::RecordBatch>>> points; - - for (auto& batch : batches) { - auto compositeKey = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); - Y_VERIFY(compositeKey && compositeKey->num_rows() > 0); - auto keyColumns = std::make_shared<NArrow::TArrayVec>(compositeKey->columns()); - - NArrow::TReplaceKey min(keyColumns, 0); - NArrow::TReplaceKey max(keyColumns, compositeKey->num_rows() - 1); - - points[min].push_back(batch); // insert start - points[max].push_back({}); // insert end - } - - int sum = 0; - for (auto& [key, vec] : points) { - if (!sum) { // count(start) == count(end), start new range - rangesSlices.push_back({}); - rangesSlices.back().reserve(batches.size()); - } - - for (auto& batch : vec) { - if (batch) { - ++sum; - rangesSlices.back().push_back(batch); - } else { - --sum; - } - } - } - } - return rangesSlices; -} - -std::vector<std::shared_ptr<arrow::RecordBatch>> TTaskGranulePreparation::SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo, const std::shared_ptr<NArrow::TSortDescription>& description, const THashSet<const void*>& batchesToDedup) { - auto rangesSlices = GroupInKeyRanges(batches, indexInfo); - - // Merge slices in ranges - std::vector<std::shared_ptr<arrow::RecordBatch>> out; - out.reserve(rangesSlices.size()); - for (auto& slices : rangesSlices) { - if (slices.empty()) { - continue; - } - - // Do not merge slice if it's alone in its key range - if (slices.size() == 1) { - auto batch = slices[0]; - if (batchesToDedup.count(batch.get())) { - NArrow::DedupSortedBatch(batch, description->ReplaceKey, out); - } else { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); - out.push_back(batch); - } - continue; - } - auto batch = NArrow::CombineSortedBatches(slices, description); - out.push_back(batch); - } - - return out; -} - -bool TTaskGranulePreparation::DoApply(IDataReader& indexedDataRead) const { - auto& readData = indexedDataRead.GetMeAs<TIndexedReadData>(); - readData.GetGranulesContext().GetGranuleVerified(GranuleId)->OnGranuleDataPrepared(std::move(BatchesInGranule)); - return true; -} - -bool TTaskGranulePreparation::DoExecuteImpl() { - auto& indexInfo = ReadMetadata->GetIndexInfo(); - for (auto& batch : BatchesInGranule) { - Y_VERIFY(batch->num_rows()); - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, indexInfo.SortReplaceDescription()->ReplaceKey)); - } - BatchesInGranule = SpecialMergeSorted(BatchesInGranule, indexInfo, indexInfo.SortReplaceDescription(), BatchesToDedup); - return true; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h deleted file mode 100644 index a7ff39f854..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h +++ /dev/null @@ -1,46 +0,0 @@ -#pragma once -#include "conveyor_task.h" -#include "filling_context.h" -#include "read_metadata.h" -#include <ydb/core/formats/arrow/sort_cursor.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> - -namespace NKikimr::NOlap::NIndexedReader { - -class TTaskGranulePreparation: public NColumnShard::IDataTasksProcessor::ITask { -private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; - mutable std::vector<std::shared_ptr<arrow::RecordBatch>> BatchesInGranule; - THashSet<const void*> BatchesToDedup; - const ui64 GranuleId; - NOlap::TReadMetadata::TConstPtr ReadMetadata; - - static std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> - GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo); - - static std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const TIndexInfo& indexInfo, - const std::shared_ptr<NArrow::TSortDescription>& description, - const THashSet<const void*>& batchesToDedup); - -protected: - virtual bool DoApply(IDataReader& indexedDataRead) const override; - virtual bool DoExecuteImpl() override; -public: - - virtual TString GetTaskClassIdentifier() const override { - return "Reading::GranulePreparation"; - } - - TTaskGranulePreparation(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches, THashSet<const void*>&& batchesToDedup, - const ui64 granuleId, NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::IDataTasksProcessor::TPtr processor) - : TBase(processor) - , BatchesInGranule(std::move(batches)) - , BatchesToDedup(std::move(batchesToDedup)) - , GranuleId(granuleId) - , ReadMetadata(readMetadata) { - - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt deleted file mode 100644 index 0349917f0d..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.darwin-x86_64.txt +++ /dev/null @@ -1,24 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(engines-reader-order_control) -target_link_libraries(engines-reader-order_control PUBLIC - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow -) -target_sources(engines-reader-order_control PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp -) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt deleted file mode 100644 index a6f577627b..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-aarch64.txt +++ /dev/null @@ -1,25 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(engines-reader-order_control) -target_link_libraries(engines-reader-order_control PUBLIC - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow -) -target_sources(engines-reader-order_control PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp -) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt deleted file mode 100644 index a6f577627b..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.linux-x86_64.txt +++ /dev/null @@ -1,25 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(engines-reader-order_control) -target_link_libraries(engines-reader-order_control PUBLIC - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow -) -target_sources(engines-reader-order_control PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp -) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt deleted file mode 100644 index f8b31df0c1..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-aarch64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") - include(CMakeLists.darwin-x86_64.txt) -elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) - include(CMakeLists.windows-x86_64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-x86_64.txt) -endif() diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt deleted file mode 100644 index 0349917f0d..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/CMakeLists.windows-x86_64.txt +++ /dev/null @@ -1,24 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(engines-reader-order_control) -target_link_libraries(engines-reader-order_control PUBLIC - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow -) -target_sources(engines-reader-order_control PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp -) diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp deleted file mode 100644 index 63e96f8527..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include "abstract.h" -#include <ydb/core/tx/columnshard/engines/reader/filling_context.h> - -namespace NKikimr::NOlap::NIndexedReader { - -void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFillingContext& context) { - auto& batch = batchOriginal.GetFetchedInfo(); - Y_VERIFY(!!batch.GetFilter()); - if (!batch.GetFilteredRecordsCount()) { - context.GetCounters().EmptyFilterCount->Add(1); - context.GetCounters().EmptyFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes()); - context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns())); - batchOriginal.InitBatch(nullptr); - } else { - context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows()); - if (batchOriginal.AskedColumnsAlready(context.GetPostFilterColumns())) { - context.GetCounters().FilterOnlyCount->Add(1); - context.GetCounters().FilterOnlyFetchedBytes->Add(batchOriginal.GetFetchedBytes()); - context.GetCounters().FilterOnlyUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); - context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns())); - - batchOriginal.InitBatch(batch.GetFilterBatch()); - } else { - context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes()); - context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); - - batchOriginal.ResetWithFilter(context.GetPostFilterColumns()); - - context.GetCounters().TwoPhasesCount->Add(1); - context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes()); - context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batchOriginal.GetUsefulWaitingBytes()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") - ("filtered_count", batch.GetFilterBatch()->num_rows()) - ("blobs_count", batchOriginal.GetWaitingBlobs().size()) - ("columns_count", batchOriginal.GetCurrentColumnIds()->size()) - ("fetch_size", batchOriginal.GetWaitingBytes()) - ; - } - } -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h deleted file mode 100644 index 21c0d4ff6b..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.h +++ /dev/null @@ -1,83 +0,0 @@ -#pragma once -#include "result.h" -#include <ydb/core/tx/columnshard/engines/reader/granule.h> -#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> - -namespace NKikimr::NOlap::NIndexedReader { - -class TGranulesFillingContext; - -class IOrderPolicy { -public: - enum class EFeatures: ui32 { - CanInterrupt = 1, - NeedNotAppliedEarlyFilter = 1 << 1 - }; - using TFeatures = ui32; -protected: - TReadMetadata::TConstPtr ReadMetadata; - virtual TString DoDebugString() const = 0; - virtual void DoFill(TGranulesFillingContext& context) = 0; - virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) { - return true; - } - virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(TResultController& granulesToOut) = 0; - virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) { - OnBatchFilterInitialized(batchInfo, context); - return true; - } - - void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context); - virtual TFeatures DoGetFeatures() const { - return 0; - } - TFeatures GetFeatures() const { - return DoGetFeatures(); - } -public: - using TPtr = std::shared_ptr<IOrderPolicy>; - virtual ~IOrderPolicy() = default; - - virtual std::set<ui32> GetFilterStageColumns() { - return ReadMetadata->GetEarlyFilterColumnIds(); - } - - IOrderPolicy(TReadMetadata::TConstPtr readMetadata) - : ReadMetadata(readMetadata) - { - - } - - bool CanInterrupt() const { - return GetFeatures() & (TFeatures)EFeatures::CanInterrupt; - } - - bool NeedNotAppliedEarlyFilter() const { - return GetFeatures() & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; - } - - bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) { - return DoOnFilterReady(batchInfo, granule, context); - } - - - virtual bool ReadyForAddNotIndexedToEnd() const = 0; - - std::vector<TGranule::TPtr> DetachReadyGranules(TResultController& granulesToOut) { - return DoDetachReadyGranules(granulesToOut); - } - - void Fill(TGranulesFillingContext& context) { - DoFill(context); - } - - bool Wakeup(const TGranule& granule, TGranulesFillingContext& context) { - return DoWakeup(granule, context); - } - - TString DebugString() const { - return DoDebugString(); - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp deleted file mode 100644 index e3fb33176b..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp +++ /dev/null @@ -1,27 +0,0 @@ -#include "default.h" -#include <ydb/core/tx/columnshard/engines/reader/filling_context.h> - -namespace NKikimr::NOlap::NIndexedReader { - -void TAnySorting::DoFill(TGranulesFillingContext& context) { - for (auto&& granule : ReadMetadata->SelectInfo->GetGranulesOrdered(ReadMetadata->IsDescSorted())) { - TGranule::TPtr g = context.GetGranuleVerified(granule.Granule); - GranulesOutOrder.emplace_back(g); - } -} - -std::vector<TGranule::TPtr> TAnySorting::DoDetachReadyGranules(TResultController& granulesToOut) { - std::vector<TGranule::TPtr> result; - while (GranulesOutOrder.size()) { - NIndexedReader::TGranule::TPtr granule = GranulesOutOrder.front(); - if (!granule->IsReady()) { - break; - } - result.emplace_back(granule); - Y_VERIFY(granulesToOut.ExtractResult(granule->GetGranuleId())); - GranulesOutOrder.pop_front(); - } - return result; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.h b/ydb/core/tx/columnshard/engines/reader/order_control/default.h deleted file mode 100644 index 41ca14d498..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/default.h +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once -#include "abstract.h" - -namespace NKikimr::NOlap::NIndexedReader { - -class TAnySorting: public IOrderPolicy { -private: - using TBase = IOrderPolicy; - std::deque<TGranule::TPtr> GranulesOutOrder; -protected: - virtual void DoFill(TGranulesFillingContext& context) override; - virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(TResultController& granulesToOut) override; - virtual TString DoDebugString() const override { - return TStringBuilder() << "type=AnySorting;granules_count=" << GranulesOutOrder.size() << ";"; - } - -public: - TAnySorting(TReadMetadata::TConstPtr readMetadata) - :TBase(readMetadata) { - - } - virtual bool ReadyForAddNotIndexedToEnd() const override { - return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp deleted file mode 100644 index ff9b2d28a8..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "not_sorted.h" - -namespace NKikimr::NOlap::NIndexedReader { - -std::vector<TGranule::TPtr> TNonSorting::DoDetachReadyGranules(TResultController& granulesToOut) { - std::vector<TGranule::TPtr> result; - result.reserve(granulesToOut.GetCount()); - while (granulesToOut.GetCount()) { - result.emplace_back(granulesToOut.ExtractFirst()); - } - return result; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h b/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h deleted file mode 100644 index 91f230191c..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/not_sorted.h +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once -#include "abstract.h" - -namespace NKikimr::NOlap::NIndexedReader { - -class TNonSorting: public IOrderPolicy { -private: - using TBase = IOrderPolicy; -protected: - virtual TString DoDebugString() const override { - return TStringBuilder() << "type=NonSorting;"; - } - - virtual void DoFill(TGranulesFillingContext& /*context*/) override { - } - - virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(TResultController& granulesToOut) override; -public: - TNonSorting(TReadMetadata::TConstPtr readMetadata) - :TBase(readMetadata) - { - - } - - virtual bool ReadyForAddNotIndexedToEnd() const override { - return true; - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp deleted file mode 100644 index 8ae30314fa..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp +++ /dev/null @@ -1,123 +0,0 @@ -#include "pk_with_limit.h" -#include <ydb/core/tx/columnshard/engines/reader/filling_context.h> - -namespace NKikimr::NOlap::NIndexedReader { - -bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingContext& context) { - Y_VERIFY(ReadMetadata->Limit); - if (!CurrentItemsLimit) { - return false; - } - Y_VERIFY(GranulesOutOrderForPortions.size()); - if (GranulesOutOrderForPortions.front().GetGranule()->GetGranuleId() != granule.GetGranuleId()) { - return false; - } - while (GranulesOutOrderForPortions.size() && CurrentItemsLimit) { - auto& g = GranulesOutOrderForPortions.front(); - // granule have to wait NotIndexedBatch initialization, at first (NotIndexedBatchReadyFlag initialization). - // other batches will be delivered in OrderedBatches[granuleId] order (not sortable at first in according to granule.SortBatchesByPK) - if (!g.GetGranule()->IsNotIndexedBatchReady()) { - break; - } - if (g.Start()) { - ++CountProcessedGranules; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size()); - MergeStream.AddPoolSource({}, g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter()); - } - auto& batches = g.GetOrderedBatches(); - while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) { - TGranule::TBatchForMerge& b = batches.front(); - if (!b.GetPoolId()) { - ++CountNotSortedPortions; - } else { - ++CountBatchesByPools[*b.GetPoolId()]; - } - ++CountProcessedBatches; - MergeStream.AddPoolSource(b.GetPoolId(), b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter()); - OnBatchFilterInitialized(*b, context); - const bool currentHasFrom = !!batches.front().GetFrom(); - batches.pop_front(); - if (batches.size()) { - if (!batches.front().GetFrom() || !currentHasFrom) { - continue; - } - Y_VERIFY(b.GetFrom()->Compare(*batches.front().GetFrom()) != std::partial_ordering::greater); - MergeStream.PutControlPoint(batches.front().GetFrom()); - } - while (CurrentItemsLimit && MergeStream.DrainCurrent()) { - --CurrentItemsLimit; - } - if (MergeStream.ControlPointEnriched()) { - MergeStream.RemoveControlPoint(); - } else if (batches.size()) { - Y_VERIFY(!CurrentItemsLimit); - } - } - if (batches.empty()) { - Y_VERIFY(MergeStream.IsEmpty() || !CurrentItemsLimit); - GranulesOutOrderForPortions.pop_front(); - } else { - break; - } - } - while (GranulesOutOrderForPortions.size() && !CurrentItemsLimit) { - auto& g = GranulesOutOrderForPortions.front(); - g.GetGranule()->AddNotIndexedBatch(nullptr); - auto& batches = g.GetOrderedBatches(); - while (batches.size()) { - auto b = batches.front(); - context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns())); - ++CountSkippedBatches; - b->InitBatch(nullptr); - batches.pop_front(); - } - ++CountSkippedGranules; - GranulesOutOrderForPortions.pop_front(); - } - if (GranulesOutOrderForPortions.empty()) { - if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "stop_on_limit") - ("limit", ReadMetadata->Limit)("limit_reached", !CurrentItemsLimit) - ("processed_batches", CountProcessedBatches)("processed_granules", CountProcessedGranules) - ("skipped_batches", CountSkippedBatches)("skipped_granules", CountSkippedGranules) - ("pools_count", CountBatchesByPools.size())("bad_pool_size", CountNotSortedPortions) - ; - } - } - return true; -} - -bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) { - return Wakeup(granule, context); -} - -void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { - for (auto&& granule : ReadMetadata->SelectInfo->GetGranulesOrdered(ReadMetadata->IsDescSorted())) { - TGranule::TPtr g = context.GetGranuleVerified(granule.Granule); - GranulesOutOrder.emplace_back(g); - GranulesOutOrderForPortions.emplace_back(g->SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), g); - } -} - -std::vector<TGranule::TPtr> TPKSortingWithLimit::DoDetachReadyGranules(TResultController& granulesToOut) { - std::vector<TGranule::TPtr> result; - while (GranulesOutOrder.size()) { - NIndexedReader::TGranule::TPtr granule = GranulesOutOrder.front(); - if (!granule->IsReady()) { - break; - } - result.emplace_back(granule); - Y_VERIFY(granulesToOut.ExtractResult(granule->GetGranuleId())); - GranulesOutOrder.pop_front(); - } - return result; -} - -TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) - : TBase(readMetadata) - , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), nullptr, readMetadata->IsDescSorted()) -{ - CurrentItemsLimit = ReadMetadata->Limit; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h deleted file mode 100644 index b8cd01b075..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h +++ /dev/null @@ -1,78 +0,0 @@ -#pragma once -#include "abstract.h" -#include <ydb/core/tx/columnshard/engines/reader/read_filter_merger.h> - -namespace NKikimr::NOlap::NIndexedReader { - -class TGranuleOrdered { -private: - bool StartedFlag = false; - std::deque<TGranule::TBatchForMerge> OrderedBatches; - TGranule::TPtr Granule; -public: - bool Start() { - if (!StartedFlag) { - StartedFlag = true; - return true; - } else { - return false; - } - - } - - TGranuleOrdered(std::deque<TGranule::TBatchForMerge>&& orderedBatches, TGranule::TPtr granule) - : OrderedBatches(std::move(orderedBatches)) - , Granule(granule) - { - } - - std::deque<TGranule::TBatchForMerge>& GetOrderedBatches() noexcept { - return OrderedBatches; - } - - TGranule::TPtr GetGranule() const noexcept { - return Granule; - } -}; - -class TPKSortingWithLimit: public IOrderPolicy { -private: - using TBase = IOrderPolicy; - std::deque<TGranule::TPtr> GranulesOutOrder; - std::deque<TGranuleOrdered> GranulesOutOrderForPortions; - ui32 CurrentItemsLimit = 0; - THashMap<ui32, ui32> CountBatchesByPools; - ui32 CountProcessedGranules = 0; - ui32 CountSkippedBatches = 0; - ui32 CountProcessedBatches = 0; - ui32 CountNotSortedPortions = 0; - ui32 CountSkippedGranules = 0; - TMergePartialStream MergeStream; -protected: - virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override; - virtual void DoFill(TGranulesFillingContext& context) override; - virtual std::vector<TGranule::TPtr> DoDetachReadyGranules(TResultController& granulesToOut) override; - virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override; - virtual TFeatures DoGetFeatures() const override { - return (TFeatures)EFeatures::CanInterrupt | (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; - } - - virtual TString DoDebugString() const override { - return TStringBuilder() << "type=PKSortingWithLimit;granules_count=" << GranulesOutOrder.size() << ";limit=" << CurrentItemsLimit << ";"; - } -public: - virtual std::set<ui32> GetFilterStageColumns() override { - std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds(); - for (auto&& i : ReadMetadata->GetPKColumnIds()) { - result.emplace(i); - } - return result; - } - - TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata); - virtual bool ReadyForAddNotIndexedToEnd() const override { - return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp deleted file mode 100644 index e366ce42a1..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include "result.h" - -namespace NKikimr::NOlap::NIndexedReader { - -TGranule::TPtr TResultController::ExtractFirst() { - TGranule::TPtr result; - if (GranulesToOut.size()) { - result = GranulesToOut.begin()->second; - GranulesToOut.erase(GranulesToOut.begin()); - } - return result; -} - -void TResultController::AddResult(TGranule::TPtr granule) { - Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second); - Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second); -} - -TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) { - auto it = GranulesToOut.find(granuleId); - if (it == GranulesToOut.end()) { - return nullptr; - } - TGranule::TPtr result = it->second; - GranulesToOut.erase(it); - return result; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.h b/ydb/core/tx/columnshard/engines/reader/order_control/result.h deleted file mode 100644 index 84a4366749..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/result.h +++ /dev/null @@ -1,45 +0,0 @@ -#pragma once -#include <ydb/core/tx/columnshard/engines/reader/granule.h> -#include <ydb/core/tx/columnshard/counters/scan.h> - -namespace NKikimr::NOlap::NIndexedReader { - -class TResultController { -protected: - THashMap<ui64, TGranule::TPtr> GranulesToOut; - std::set<ui64> ReadyGranulesAccumulator; - const NColumnShard::TConcreteScanCounters Counters; -public: - TString DebugString() const { - return TStringBuilder() - << "to_out:" << GranulesToOut.size() << ";" - << "ready:" << ReadyGranulesAccumulator.size() << ";" - ; - } - - TResultController(const NColumnShard::TConcreteScanCounters& counters) - : Counters(counters) - { - - } - - void Clear() { - GranulesToOut.clear(); - } - - bool IsReady(const ui64 granuleId) const { - return ReadyGranulesAccumulator.contains(granuleId); - } - - ui32 GetCount() const { - return GranulesToOut.size(); - } - - TGranule::TPtr ExtractFirst(); - - void AddResult(TGranule::TPtr granule); - - TGranule::TPtr ExtractResult(const ui64 granuleId); -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/ya.make b/ydb/core/tx/columnshard/engines/reader/order_control/ya.make deleted file mode 100644 index a49000742a..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/order_control/ya.make +++ /dev/null @@ -1,17 +0,0 @@ -LIBRARY() - -SRCS( - abstract.cpp - not_sorted.cpp - pk_with_limit.cpp - result.cpp - default.cpp -) - -PEERDIR( - contrib/libs/apache/arrow - ydb/core/protos - ydb/core/formats/arrow -) - -END() diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt index a8df8ca6eb..5d1bd47355 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt @@ -22,4 +22,5 @@ target_sources(engines-reader-plain_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt index 8d181e1b18..ef7f01c766 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt @@ -23,4 +23,5 @@ target_sources(engines-reader-plain_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt index 8d181e1b18..ef7f01c766 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt @@ -23,4 +23,5 @@ target_sources(engines-reader-plain_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt index a8df8ca6eb..5d1bd47355 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt @@ -22,4 +22,5 @@ target_sources(engines-reader-plain_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp index 3d8a7ae3dd..baba224ae9 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp @@ -21,13 +21,8 @@ bool TAssembleBatch::DoExecuteImpl() { return true; } -bool TAssemblePKBatch::DoApply(IDataReader& owner) const { - owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitPK(Result); - return true; -} - bool TAssembleFFBatch::DoApply(IDataReader& owner) const { - owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitFF(Result); + owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitFetchStageData(Result); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h index 0a746a9939..31d0f2b60a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h @@ -25,15 +25,6 @@ public: const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter, const NColumnShard::IDataTasksProcessor::TPtr& processor); }; -class TAssemblePKBatch: public TAssembleBatch { -private: - using TBase = TAssembleBatch; -protected: - virtual bool DoApply(IDataReader& owner) const override; -public: - using TBase::TBase; -}; - class TAssembleFFBatch: public TAssembleBatch { private: using TBase = TAssembleBatch; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp new file mode 100644 index 0000000000..99db988f36 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp @@ -0,0 +1,13 @@ +#include "columns_set.h" +#include <util/string/join.h> + +namespace NKikimr::NOlap::NPlainReader { + +TString TColumnsSet::DebugString() const { + return TStringBuilder() << "(" + << "column_ids=" << JoinSeq(",", ColumnIds) << ";" + << "column_names=" << JoinSeq(",", ColumnNames) << ";" + << ");"; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h new file mode 100644 index 0000000000..edf4a7e191 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h @@ -0,0 +1,110 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> + +namespace NKikimr::NOlap::NPlainReader { + +class TColumnsSet { +private: + YDB_READONLY_DEF(std::set<ui32>, ColumnIds); + YDB_READONLY_DEF(std::set<TString>, ColumnNames); + mutable std::optional<std::vector<TString>> ColumnNamesVector; + YDB_READONLY_DEF(std::shared_ptr<arrow::Schema>, Schema); +public: + TColumnsSet() = default; + + const std::vector<TString>& GetColumnNamesVector() const { + if (!ColumnNamesVector) { + ColumnNamesVector = std::vector<TString>(ColumnNames.begin(), ColumnNames.end()); + } + return *ColumnNamesVector; + } + + ui32 GetSize() const { + return ColumnIds.size(); + } + + bool ColumnsOnly(const std::vector<std::string>& fieldNames) const { + if (fieldNames.size() != GetSize()) { + return false; + } + std::set<std::string> fieldNamesSet; + for (auto&& i : fieldNames) { + if (!fieldNamesSet.emplace(i).second) { + return false; + } + if (!ColumnNames.contains(TString(i.data(), i.size()))) { + return false; + } + } + return true; + } + + TColumnsSet(const std::set<ui32>& columnIds, const TIndexInfo& indexInfo) { + ColumnIds = columnIds; + Schema = indexInfo.GetColumnsSchema(ColumnIds); + for (auto&& i : ColumnIds) { + ColumnNames.emplace(indexInfo.GetColumnName(i)); + } + } + + TColumnsSet(const std::vector<ui32>& columnIds, const TIndexInfo& indexInfo) { + for (auto&& i : columnIds) { + Y_VERIFY(ColumnIds.emplace(i).second); + ColumnNames.emplace(indexInfo.GetColumnName(i)); + } + Schema = indexInfo.GetColumnsSchema(ColumnIds); + } + + bool Contains(const std::shared_ptr<TColumnsSet>& columnsSet) const { + if (!columnsSet) { + return true; + } + return Contains(*columnsSet); + } + + bool Contains(const TColumnsSet& columnsSet) const { + for (auto&& i : columnsSet.ColumnIds) { + if (!ColumnIds.contains(i)) { + return false; + } + } + return true; + } + + TString DebugString() const; + + TColumnsSet operator+(const TColumnsSet& external) const { + TColumnsSet result = *this; + result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end()); + result.ColumnNames.insert(external.ColumnNames.begin(), external.ColumnNames.end()); + auto fields = result.Schema->fields(); + for (auto&& i : external.Schema->fields()) { + if (!result.Schema->GetFieldByName(i->name())) { + fields.emplace_back(i); + } + } + result.Schema = std::make_shared<arrow::Schema>(fields); + return result; + } + + TColumnsSet operator-(const TColumnsSet& external) const { + TColumnsSet result = *this; + for (auto&& i : external.ColumnIds) { + result.ColumnIds.erase(i); + } + for (auto&& i : external.ColumnNames) { + result.ColumnNames.erase(i); + } + arrow::FieldVector fields; + for (auto&& i : Schema->fields()) { + if (!external.Schema->GetFieldByName(i->name())) { + fields.emplace_back(i); + } + } + result.Schema = std::make_shared<arrow::Schema>(fields); + return result; + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp index 0e16286d48..ddfa7c12be 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp @@ -21,7 +21,7 @@ TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAsse void TEFTaskConstructor::DoOnDataReady(IDataReader& reader) { reader.GetContext().MutableProcessor().Add(reader, std::make_shared<TAssembleFilter>(BuildBatchAssembler(reader), - reader.GetReadMetadata(), SourceIdx, ColumnIds, reader.GetContext().GetProcessor().GetObject(), false)); + reader.GetReadMetadata(), SourceIdx, ColumnIds, reader.GetContext().GetProcessor().GetObject(), UseEarlyFilter)); } void TFFColumnsTaskConstructor::DoOnDataReady(IDataReader& reader) { @@ -29,9 +29,4 @@ void TFFColumnsTaskConstructor::DoOnDataReady(IDataReader& reader) { SourceIdx, AppliedFilter, reader.GetContext().GetProcessor().GetObject())); } -void TPKColumnsTaskConstructor::DoOnDataReady(IDataReader& reader) { - reader.GetContext().MutableProcessor().Add(reader, std::make_shared<TAssemblePKBatch>(BuildBatchAssembler(reader), - SourceIdx, AppliedFilter, reader.GetContext().GetProcessor().GetObject())); -} - } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h index 07ea259a73..71c892777f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h @@ -11,13 +11,15 @@ private: IDataReader& Reader; bool Started = false; protected: - THashMap<TBlobRange, TColumnRecord> RecordsByBlobRange; - THashMap<TBlobRange, TString> Data; + THashSet<TBlobRange> WaitingData; + THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; virtual void DoOnDataReady(IDataReader& reader) = 0; void OnDataReady(IDataReader& reader) { - Constructed = true; - return DoOnDataReady(reader); + if (WaitingData.empty()) { + Constructed = true; + return DoOnDataReady(reader); + } } public: IFetchTaskConstructor(IDataReader& reader) @@ -28,9 +30,7 @@ public: void StartDataWaiting() { Started = true; - if (Data.size() == RecordsByBlobRange.size()) { - OnDataReady(Reader); - } + OnDataReady(Reader); } void Abort() { @@ -41,17 +41,21 @@ public: Y_VERIFY(Constructed); } - void AddChunk(const TColumnRecord& rec) { + void AddWaitingRecord(const TColumnRecord& rec) { Y_VERIFY(!Started); - Y_VERIFY(RecordsByBlobRange.emplace(rec.BlobRange, rec).second); + Y_VERIFY(WaitingData.emplace(rec.BlobRange).second); } void AddData(const TBlobRange& range, TString&& data) { Y_VERIFY(Started); + Y_VERIFY(WaitingData.erase(range)); Y_VERIFY(Data.emplace(range, std::move(data)).second); - if (Data.size() == RecordsByBlobRange.size()) { - OnDataReady(Reader); - } + OnDataReady(Reader); + } + + void AddNullData(const TBlobRange& range, const ui32 rowsCount) { + Y_VERIFY(!Started); + Y_VERIFY(Data.emplace(range, rowsCount).second); } }; @@ -82,31 +86,21 @@ private: public: TFFColumnsTaskConstructor(const std::set<ui32>& columnIds, const TPortionDataSource& portion, IDataReader& reader) : TBase(columnIds, portion, reader) - , AppliedFilter(portion.GetEFData().GetAppliedFilter()) + , AppliedFilter(portion.GetFilterStageData().GetAppliedFilter()) { } }; -class TPKColumnsTaskConstructor: public TAssembleColumnsTaskConstructor { -private: - using TBase = TAssembleColumnsTaskConstructor; - std::shared_ptr<NArrow::TColumnFilter> AppliedFilter; - virtual void DoOnDataReady(IDataReader& reader) override; -public: - TPKColumnsTaskConstructor(const std::set<ui32>& columnIds, const TPortionDataSource& portion, IDataReader& reader) - : TBase(columnIds, portion, reader) - , AppliedFilter(portion.GetEFData().GetAppliedFilter()) { - } -}; - class TEFTaskConstructor: public TAssembleColumnsTaskConstructor { private: + bool UseEarlyFilter = false; using TBase = TAssembleColumnsTaskConstructor; virtual void DoOnDataReady(IDataReader& reader) override; public: - TEFTaskConstructor(const std::set<ui32>& columnIds, const TPortionDataSource& portion, IDataReader& reader) - : TBase(columnIds, portion, reader) { - Y_VERIFY(!portion.HasEFData()); + TEFTaskConstructor(const std::set<ui32>& columnIds, const TPortionDataSource& portion, IDataReader& reader, const bool useEarlyFilter) + : TBase(columnIds, portion, reader) + , UseEarlyFilter(useEarlyFilter) + { } }; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index ea9f51c6a0..f038d81d47 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -16,13 +16,25 @@ public: } }; -class TEarlyFilterData: public TFetchedData { +class TFilterStageData: public TFetchedData { private: using TBase = TFetchedData; YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, AppliedFilter); YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedEarlyFilter); public: - TEarlyFilterData(std::shared_ptr<NArrow::TColumnFilter> appliedFilter, std::shared_ptr<NArrow::TColumnFilter> earlyFilter, std::shared_ptr<arrow::RecordBatch> batch) + bool IsEmptyFilter() const { + return (AppliedFilter && AppliedFilter->IsTotalDenyFilter()) || (NotAppliedEarlyFilter && NotAppliedEarlyFilter->IsTotalDenyFilter()); + } + + std::shared_ptr<NArrow::TColumnFilter> GetActualFilter() const { + if (NotAppliedEarlyFilter) { + return NotAppliedEarlyFilter; + } else { + return AppliedFilter; + } + } + + TFilterStageData(std::shared_ptr<NArrow::TColumnFilter> appliedFilter, std::shared_ptr<NArrow::TColumnFilter> earlyFilter, std::shared_ptr<arrow::RecordBatch> batch) : TBase(batch) , AppliedFilter(appliedFilter) , NotAppliedEarlyFilter(earlyFilter) @@ -31,14 +43,7 @@ public: } }; -class TPrimaryKeyData: public TFetchedData { -private: - using TBase = TFetchedData; -public: - using TBase::TBase; -}; - -class TFullData: public TFetchedData { +class TFetchStageData: public TFetchedData { private: using TBase = TFetchedData; public: diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp index 29284f2f5c..2a37c87504 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp @@ -51,7 +51,7 @@ bool TAssembleFilter::DoExecuteImpl() { } bool TAssembleFilter::DoApply(IDataReader& owner) const { - owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitEF(AppliedFilter, EarlyFilter, FilteredBatch); + owner.GetMeAs<TPlainReadData>().GetSourceByIdxVerified(SourceIdx).InitFilterStageData(AppliedFilter, EarlyFilter, FilteredBatch); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 6665565e13..9e58f7e5b7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -3,49 +3,65 @@ namespace NKikimr::NOlap::NPlainReader { +bool TFetchingInterval::IsExclusiveSource() const { + return IncludeStart && Sources.size() == 1 && IncludeFinish; +} + void TFetchingInterval::ConstructResult() { - if (Merger && IsSourcesFFReady()) { + if (!Merger || !IsSourcesReady()) { + return; + } + if (!IsExclusiveSource()) { for (auto&& [_, i] : Sources) { if (i->GetStart().Compare(Start) == std::partial_ordering::equivalent && !i->IsMergingStarted()) { auto rb = i->GetBatch(); if (rb) { - Merger->AddPoolSource({}, rb, i->GetEFData().GetNotAppliedEarlyFilter()); + Merger->AddPoolSource({}, rb, i->GetFilterStageData().GetNotAppliedEarlyFilter()); } i->StartMerging(); } } - Merger->DrainCurrent(RBBuilder, Finish, IncludeFinish); + Merger->DrainCurrentTo(*RBBuilder, Finish, IncludeFinish); Scanner.OnIntervalResult(RBBuilder->Finalize(), GetIntervalIdx()); + } else { + Y_VERIFY(Merger->IsEmpty()); + Sources.begin()->second->StartMerging(); + auto batch = Sources.begin()->second->GetBatch(); + if (batch && batch->num_rows()) { + if (Scanner.IsReverse()) { + auto permutation = NArrow::MakePermutation(batch->num_rows(), true); + batch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(batch, permutation)).record_batch(); + } + batch = NArrow::ExtractExistedColumns(batch, RBBuilder->GetFields()); + AFL_VERIFY((ui32)batch->num_columns() == RBBuilder->GetFields().size())("batch", batch->num_columns())("builder", RBBuilder->GetFields().size()) + ("batch_columns", JoinSeq(",", batch->schema()->field_names()))("builder_columns", RBBuilder->GetColumnNames()); + } + Scanner.OnIntervalResult(batch, GetIntervalIdx()); } } -void TFetchingInterval::OnSourceEFReady(const ui32 /*sourceIdx*/) { +void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) { ConstructResult(); } -void TFetchingInterval::OnSourcePKReady(const ui32 /*sourceIdx*/) { - ConstructResult(); -} - -void TFetchingInterval::OnSourceFFReady(const ui32 /*sourceIdx*/) { +void TFetchingInterval::OnSourceFilterStageReady(const ui32 /*sourceIdx*/) { ConstructResult(); } void TFetchingInterval::StartMerge(std::shared_ptr<NIndexedReader::TMergePartialStream> merger) { + Y_VERIFY(!Merger); Merger = merger; -// if (Merger->GetCurrentKeyColumns()) { -// AFL_VERIFY(Merger->GetCurrentKeyColumns()->Compare(Start) == std::partial_ordering::less)("current", Merger->GetCurrentKeyColumns()->DebugJson())("start", Start.DebugJson()); -// } ConstructResult(); } TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, TScanHead& scanner, - std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish) + std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish, const bool includeStart) : Scanner(scanner) , Start(start) , Finish(finish) , IncludeFinish(includeFinish) + , IncludeStart(includeStart) , Sources(sources) , IntervalIdx(intervalIdx) , RBBuilder(builder) @@ -54,11 +70,7 @@ TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPositio for (auto&& [_, i] : Sources) { i->RegisterInterval(this); Scanner.AddSourceByIdx(i); - i->NeedEF(); - if (Scanner.GetContext().GetIsInternalRead()) { - i->NeedPK(); - i->NeedFF(); - } + i->InitFetchingPlan(Scanner.GetColumnsFetchingPlan(IsExclusiveSource())); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h index af60f7bc7c..fc49fc16ba 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h @@ -12,11 +12,13 @@ private: NIndexedReader::TSortableBatchPosition Start; NIndexedReader::TSortableBatchPosition Finish; const bool IncludeFinish = true; + const bool IncludeStart = false; std::map<ui32, std::shared_ptr<IDataSource>> Sources; YDB_READONLY(ui32, IntervalIdx, 0); std::shared_ptr<NIndexedReader::TMergePartialStream> Merger; std::shared_ptr<NIndexedReader::TRecordBatchBuilder> RBBuilder; + bool IsExclusiveSource() const; void ConstructResult(); IDataSource& GetSourceVerified(const ui32 idx) { @@ -25,9 +27,9 @@ private: return *it->second; } - bool IsSourcesFFReady() { + bool IsSourcesReady() { for (auto&& [_, s] : Sources) { - if (!s->HasFFData()) { + if (!s->IsDataReady()) { return false; } } @@ -60,15 +62,14 @@ public: return !!Merger; } - void OnSourceEFReady(const ui32 sourceIdx); - void OnSourcePKReady(const ui32 sourceIdx); - void OnSourceFFReady(const ui32 /*sourceIdx*/); + void OnSourceFetchStageReady(const ui32 sourceIdx); + void OnSourceFilterStageReady(const ui32 sourceIdx); void StartMerge(std::shared_ptr<NIndexedReader::TMergePartialStream> merger); TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, TScanHead& scanner, - std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish); + std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish, const bool includeStart); }; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp index f70f056c91..21f0d065a3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp @@ -4,33 +4,17 @@ namespace NKikimr::NOlap::NPlainReader { TPlainReadData::TPlainReadData(TReadMetadata::TConstPtr readMetadata, const TReadContext& context) : TBase(context, readMetadata) + , EFColumns(std::make_shared<TColumnsSet>(GetReadMetadata()->GetEarlyFilterColumnIds(), GetReadMetadata()->GetIndexInfo())) + , PKColumns(std::make_shared<TColumnsSet>(GetReadMetadata()->GetPKColumnIds(), GetReadMetadata()->GetIndexInfo())) + , FFColumns(std::make_shared<TColumnsSet>(GetReadMetadata()->GetAllColumns(), GetReadMetadata()->GetIndexInfo())) + , TrivialEFFlag(EFColumns->ColumnsOnly(GetReadMetadata()->GetIndexInfo().ArrowSchemaSnapshot()->field_names())) { - EFColumnIds = GetReadMetadata()->GetEarlyFilterColumnIds(); - PKColumnIds = GetReadMetadata()->GetPKColumnIds(); - FFColumnIds = GetReadMetadata()->GetUsedColumnIds(); - for (auto&& i : EFColumnIds) { - PKColumnIds.erase(i); - FFColumnIds.erase(i); - } - for (auto&& i : PKColumnIds) { - FFColumnIds.erase(i); - } - if (context.GetIsInternalRead()) { - EFColumnIds.insert(PKColumnIds.begin(), PKColumnIds.end()); - EFColumnIds.insert(FFColumnIds.begin(), FFColumnIds.end()); - PKColumnIds.clear(); - FFColumnIds.clear(); - } - for (auto&& i : EFColumnIds) { - EFColumnNames.emplace_back(GetReadMetadata()->GetIndexInfo().GetColumnName(i)); - } - for (auto&& i : PKColumnIds) { - PKColumnNames.emplace_back(GetReadMetadata()->GetIndexInfo().GetColumnName(i)); - } - for (auto&& i : FFColumnIds) { - FFColumnNames.emplace_back(GetReadMetadata()->GetIndexInfo().GetColumnName(i)); - } + PKFFColumns = std::make_shared<TColumnsSet>(*PKColumns + *FFColumns); + EFPKColumns = std::make_shared<TColumnsSet>(*EFColumns + *PKColumns); + FFMinusEFColumns = std::make_shared<TColumnsSet>(*FFColumns - *EFColumns); + FFMinusEFPKColumns = std::make_shared<TColumnsSet>(*FFColumns - *EFColumns - *PKColumns); + Y_VERIFY(FFColumns->Contains(EFColumns)); ui32 sourceIdx = 0; std::deque<std::shared_ptr<IDataSource>> sources; const auto& portionsOrdered = GetReadMetadata()->SelectInfo->GetPortionsOrdered(GetReadMetadata()->IsDescSorted()); @@ -124,4 +108,28 @@ void TPlainReadData::OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch) } } +NKikimr::NOlap::NPlainReader::TFetchingPlan TPlainReadData::GetColumnsFetchingPlan(const bool exclusiveSource) const { + if (exclusiveSource) { + if (Context.GetIsInternalRead()) { + return TFetchingPlan(FFColumns, EmptyColumns, true); + } else { + if (TrivialEFFlag) { + return TFetchingPlan(FFColumns, EmptyColumns, true); + } else { + return TFetchingPlan(EFColumns, FFMinusEFColumns, true); + } + } + } else { + if (GetContext().GetIsInternalRead()) { + return TFetchingPlan(PKFFColumns, EmptyColumns, false); + } else { + if (TrivialEFFlag) { + return TFetchingPlan(PKFFColumns, EmptyColumns, false); + } else { + return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false); + } + } + } +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h index d41b405575..5a50f4c8fd 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h @@ -1,9 +1,11 @@ #pragma once +#include "columns_set.h" +#include "source.h" +#include "scanner.h" + #include <ydb/core/tx/columnshard/engines/reader/read_context.h> #include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> #include <ydb/core/tx/columnshard/engines/reader/queue.h> -#include "source.h" -#include "scanner.h" namespace NKikimr::NOlap::NPlainReader { @@ -11,12 +13,15 @@ class TPlainReadData: public IDataReader, TNonCopyable { private: using TBase = IDataReader; std::shared_ptr<TScanHead> Scanner; - YDB_READONLY_DEF(std::set<ui32>, EFColumnIds); - YDB_READONLY_DEF(std::set<ui32>, PKColumnIds); - YDB_READONLY_DEF(std::set<ui32>, FFColumnIds); - YDB_READONLY_DEF(std::vector<TString>, EFColumnNames); - YDB_READONLY_DEF(std::vector<TString>, PKColumnNames); - YDB_READONLY_DEF(std::vector<TString>, FFColumnNames); + YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, EFColumns); + YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PKColumns); + YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns); + std::shared_ptr<TColumnsSet> EmptyColumns = std::make_shared<TColumnsSet>(); + std::shared_ptr<TColumnsSet> PKFFColumns; + std::shared_ptr<TColumnsSet> EFPKColumns; + std::shared_ptr<TColumnsSet> FFMinusEFColumns; + std::shared_ptr<TColumnsSet> FFMinusEFPKColumns; + const bool TrivialEFFlag = false; std::vector<TPartialReadResult> PartialResults; ui32 ReadyResultsCount = 0; TFetchBlobsQueue Queue; @@ -25,9 +30,9 @@ private: protected: virtual TString DoDebugString() const override { return TStringBuilder() << - "ef_columns=" << JoinSeq(",", EFColumnIds) << ";" << - "pk_columns=" << JoinSeq(",", PKColumnIds) << ";" << - "ff_columns=" << JoinSeq(",", FFColumnIds) << ";" + "ef=" << EFColumns->DebugString() << ";" << + "pk=" << PKColumns->DebugString() << ";" << + "ff=" << FFColumns->DebugString() << ";" ; } @@ -46,6 +51,8 @@ protected: virtual void DoAddData(const TBlobRange& blobRange, const TString& data) override; virtual std::optional<TBlobRange> DoExtractNextBlob(const bool hasReadyResults) override; public: + TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const; + IDataSource& GetSourceByIdxVerified(const ui32 sourceIdx) { return *Scanner->GetSourceVerified(sourceIdx); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp index 72b2884ea2..307c5ff86a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NOlap::NPlainReader { -void TScanHead::OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch, const ui32 intervalIdx) { +void TScanHead::OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 intervalIdx) { Y_VERIFY(FetchingIntervals.size()); Y_VERIFY(FetchingIntervals.front().GetIntervalIdx() == intervalIdx); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "interval_result")("interval", FetchingIntervals.front().GetIntervalIdx())("count", batch ? batch->num_rows() : 0); @@ -18,7 +18,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, TPlainR , Sources(std::move(sources)) { auto resultSchema = reader.GetReadMetadata()->GetLoadSchema(reader.GetReadMetadata()->GetSnapshot()); - for (auto&& f : reader.GetReadMetadata()->GetUsedColumnIds()) { + for (auto&& f : reader.GetReadMetadata()->GetAllColumns()) { ResultFields.emplace_back(resultSchema->GetFieldByColumnIdVerified(f)); } Merger = std::make_shared<NIndexedReader::TMergePartialStream>(reader.GetReadMetadata()->GetReplaceKey(), std::make_shared<arrow::Schema>(ResultFields), reader.GetReadMetadata()->IsDescSorted()); @@ -30,6 +30,7 @@ bool TScanHead::BuildNextInterval() { Y_VERIFY(FrontEnds.size()); auto position = BorderPoints.begin()->first; auto firstBorderPointInfo = std::move(BorderPoints.begin()->second); + const bool isIncludeStart = CurrentSegments.empty(); for (auto&& i : firstBorderPointInfo.GetStartSources()) { CurrentSegments.emplace(i->GetSourceIdx(), i); @@ -38,7 +39,7 @@ bool TScanHead::BuildNextInterval() { if (firstBorderPointInfo.GetStartSources().size() && firstBorderPointInfo.GetFinishSources().size()) { FetchingIntervals.emplace_back( BorderPoints.begin()->first, BorderPoints.begin()->first, SegmentIdxCounter++, CurrentSegments, - *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), true); + *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), true, true); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "new_interval")("interval", FetchingIntervals.back().DebugJson()); } @@ -64,7 +65,7 @@ bool TScanHead::BuildNextInterval() { const bool includeFinish = BorderPoints.begin()->second.GetFinishSources().size() == CurrentSegments.size() && !BorderPoints.begin()->second.GetStartSources().size(); FetchingIntervals.emplace_back( *CurrentStart, BorderPoints.begin()->first, SegmentIdxCounter++, CurrentSegments, - *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), includeFinish); + *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), includeFinish, isIncludeStart); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "new_interval")("interval", FetchingIntervals.back().DebugJson()); return true; } @@ -100,4 +101,12 @@ NKikimr::NOlap::TReadContext& TScanHead::GetContext() { return Reader.GetContext(); } +bool TScanHead::IsReverse() const { + return Reader.GetReadMetadata()->IsDescSorted(); +} + +NKikimr::NOlap::NPlainReader::TFetchingPlan TScanHead::GetColumnsFetchingPlan(const bool exclusiveSource) const { + return Reader.GetColumnsFetchingPlan(exclusiveSource); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h index a46b613ef1..f20df47faa 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h @@ -37,6 +37,11 @@ private: void DrainSources(); public: + + TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const; + + bool IsReverse() const; + void Abort() { for (auto&& i : FetchingIntervals) { i.Abort(); @@ -62,7 +67,7 @@ public: TReadContext& GetContext(); - void OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch, const ui32 intervalIdx); + void OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 intervalIdx); std::shared_ptr<IDataSource> GetSourceVerified(const ui32 idx) const { auto it = SourceByIdx.find(idx); Y_VERIFY(it != SourceByIdx.end()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 483cda5070..3b7ba86529 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -7,59 +7,45 @@ namespace NKikimr::NOlap::NPlainReader { -void IDataSource::InitFF(const std::shared_ptr<arrow::RecordBatch>& batch) { - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFF")); - Y_VERIFY(!FFData); - FFData = std::make_shared<TFullData>(batch); - auto intervals = Intervals; - for (auto&& i : intervals) { - i->OnSourceFFReady(GetSourceIdx()); +void IDataSource::InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& batchExt) { + auto batch = batchExt; + if (!batch && FetchingPlan->GetFetchingStage()->GetSize()) { + const ui32 numRows = GetFilterStageData().GetBatch() ? GetFilterStageData().GetBatch()->num_rows() : 0; + batch = NArrow::MakeEmptyBatch(FetchingPlan->GetFetchingStage()->GetSchema(), numRows); } -} - -void IDataSource::InitEF(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch) { - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitEF")); - Y_VERIFY(!EFData); - EFData = std::make_shared<TEarlyFilterData>(appliedFilter, earlyFilter, batch); - auto intervals = Intervals; - for (auto&& i : intervals) { - i->OnSourceEFReady(GetSourceIdx()); + if (batch) { + Y_VERIFY((ui32)batch->num_columns() == FetchingPlan->GetFetchingStage()->GetSize()); } - NeedPK(); -} - -void IDataSource::InitPK(const std::shared_ptr<arrow::RecordBatch>& batch) { - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitPK")); - Y_VERIFY(!PKData); - PKData = std::make_shared<TPrimaryKeyData>(batch); + NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchStageData")); + Y_VERIFY(!FetchStageData); + FetchStageData = std::make_shared<TFetchStageData>(batch); auto intervals = Intervals; for (auto&& i : intervals) { - i->OnSourcePKReady(GetSourceIdx()); + i->OnSourceFetchStageReady(GetSourceIdx()); } - NeedFF(); } -void IDataSource::NeedEF() { - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "NeedEF")); - if (!NeedEFFlag) { - NeedEFFlag = true; - DoFetchEF(); +void IDataSource::InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch) { + NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFilterStageData")); + Y_VERIFY(!FilterStageData); + FilterStageData = std::make_shared<TFilterStageData>(appliedFilter, earlyFilter, batch); + if (batch) { + Y_VERIFY((ui32)batch->num_columns() == FetchingPlan->GetFilterStage()->GetSize()); } -} - -void IDataSource::NeedPK() { - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "NeedPK")); - if (!NeedPKFlag) { - NeedPKFlag = true; - DoFetchPK(); + auto intervals = Intervals; + for (auto&& i : intervals) { + i->OnSourceFilterStageReady(GetSourceIdx()); } + DoStartFetchStage(); } -void IDataSource::NeedFF() { - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "NeedFF")); - if (!NeedFFFlag) { - NeedFFFlag = true; - DoFetchFF(); +void IDataSource::InitFetchingPlan(const TFetchingPlan& fetchingPlan) { + if (!FilterStageFlag) { + FilterStageFlag = true; + Y_VERIFY(!FetchingPlan); + FetchingPlan = fetchingPlan; + NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); + DoStartFilterStage(); } } @@ -70,42 +56,50 @@ bool IDataSource::OnIntervalFinished(const ui32 intervalIdx) { return Intervals.empty(); } -void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, std::shared_ptr<IFetchTaskConstructor> constructor) { +void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, std::shared_ptr<IFetchTaskConstructor> constructor, const std::shared_ptr<NArrow::TColumnFilter>& filter) { + const NArrow::TColumnFilter& cFilter = filter ? *filter : NArrow::TColumnFilter::BuildAllowFilter(); for (auto&& i : columnIds) { auto columnChunks = Portion->GetColumnChunksPointers(i); + if (columnChunks.empty()) { + continue; + } + auto itFilter = cFilter.GetIterator(false, Portion->NumRows(i)); + bool itFinished = false; for (auto&& c : columnChunks) { - constructor->AddChunk(*c); - Y_VERIFY(BlobsWaiting.emplace(c->BlobRange, constructor).second); - ReadData.AddBlobForFetch(GetSourceIdx(), c->BlobRange); + Y_VERIFY(!itFinished); + if (!itFilter.IsBatchForSkip(c->GetMeta().GetNumRowsVerified())) { + constructor->AddWaitingRecord(*c); + Y_VERIFY(BlobsWaiting.emplace(c->BlobRange, constructor).second); + ReadData.AddBlobForFetch(GetSourceIdx(), c->BlobRange); + } else { + constructor->AddNullData(c->BlobRange, c->GetMeta().GetNumRowsVerified()); + } + itFinished = !itFilter.Next(c->GetMeta().GetNumRowsVerified()); } + AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Portion->NumRows(i)); } constructor->StartDataWaiting(); } -void TPortionDataSource::DoFetchEF() { +void TPortionDataSource::DoStartFilterStage() { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchEF"); - if (ReadData.GetEFColumnIds().size()) { - NeedFetchColumns(ReadData.GetEFColumnIds(), std::make_shared<TEFTaskConstructor>(ReadData.GetEFColumnIds(), *this, ReadData)); - } else { - InitEF(nullptr, nullptr, nullptr); - } -} - -void TPortionDataSource::DoFetchPK() { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchPK"); - if (ReadData.GetPKColumnIds().size()) { - NeedFetchColumns(ReadData.GetPKColumnIds(), std::make_shared<TPKColumnsTaskConstructor>(ReadData.GetPKColumnIds(), *this, ReadData)); - } else { - InitPK(nullptr); - } -} - -void TPortionDataSource::DoFetchFF() { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchFF"); - if (ReadData.GetFFColumnIds().size()) { - NeedFetchColumns(ReadData.GetFFColumnIds(), std::make_shared<TFFColumnsTaskConstructor>(ReadData.GetFFColumnIds(), *this, ReadData)); + Y_VERIFY(FetchingPlan->GetFilterStage()->GetSize()); + auto& columnIds = FetchingPlan->GetFilterStage()->GetColumnIds(); + NeedFetchColumns(columnIds, std::make_shared<TEFTaskConstructor>(columnIds, *this, ReadData, FetchingPlan->CanUseEarlyFilterImmediately()), nullptr); +} + +void TPortionDataSource::DoStartFetchStage() { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoStartFetchStage"); + Y_VERIFY(!FetchStageData); + Y_VERIFY(FilterStageData); + if (!FetchingPlan->GetFetchingStage()->GetSize()) { + InitFetchStageData(nullptr); + } else if (!FilterStageData->IsEmptyFilter()) { + auto& columnIds = FetchingPlan->GetFetchingStage()->GetColumnIds(); + NeedFetchColumns(columnIds, std::make_shared<TFFColumnsTaskConstructor>(columnIds, *this, ReadData), + GetFilterStageData().GetActualFilter()); } else { - InitFF(nullptr); + InitFetchStageData(nullptr); } } @@ -139,9 +133,8 @@ void TCommittedDataSource::AddData(const TBlobRange& /*range*/, TString&& data) resultBatch = ReadData.GetReadMetadata()->GetIndexInfo().AddSpecialColumns(resultBatch, CommittedBlob.GetSnapshot()); Y_VERIFY(resultBatch); ReadData.GetReadMetadata()->GetPKRangesFilter().BuildFilter(resultBatch).Apply(resultBatch); - InitEF(nullptr, ReadData.GetReadMetadata()->GetProgram().BuildEarlyFilter(resultBatch), NArrow::ExtractColumnsValidate(resultBatch, ReadData.GetEFColumnNames())); - InitPK(NArrow::ExtractColumnsValidate(resultBatch, ReadData.GetPKColumnNames())); - InitFF(NArrow::ExtractColumnsValidate(resultBatch, ReadData.GetFFColumnNames())); + InitFilterStageData(nullptr, ReadData.GetReadMetadata()->GetProgram().BuildEarlyFilter(resultBatch), NArrow::ExtractColumnsValidate(resultBatch, FetchingPlan->GetFilterStage()->GetColumnNamesVector())); + InitFetchStageData(NArrow::ExtractColumnsValidate(resultBatch, FetchingPlan->GetFetchingStage()->GetColumnNamesVector())); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index d46c957677..edc1275e25 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -1,5 +1,6 @@ #pragma once #include "fetched_data.h" +#include "columns_set.h" #include <ydb/core/tx/columnshard/engines/reader/read_filter_merger.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> @@ -16,6 +17,25 @@ class TFetchingInterval; class TPlainReadData; class IFetchTaskConstructor; +class TFetchingPlan { +private: + YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FilterStage); + YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FetchingStage); + bool CanUseEarlyFilterImmediatelyFlag = false; +public: + TFetchingPlan(const std::shared_ptr<TColumnsSet>& filterStage, const std::shared_ptr<TColumnsSet>& fetchingStage, const bool canUseEarlyFilterImmediately) + : FilterStage(filterStage) + , FetchingStage(fetchingStage) + , CanUseEarlyFilterImmediatelyFlag(canUseEarlyFilterImmediately) + { + + } + + bool CanUseEarlyFilterImmediately() const { + return CanUseEarlyFilterImmediatelyFlag; + } +}; + class IDataSource { private: YDB_READONLY(ui32, SourceIdx, 0); @@ -27,18 +47,15 @@ protected: TPlainReadData& ReadData; std::deque<TFetchingInterval*> Intervals; - // EF (EarlyFilter)->PK (PrimaryKey)->FF (FullyFetched) - std::shared_ptr<TEarlyFilterData> EFData; - std::shared_ptr<TPrimaryKeyData> PKData; - std::shared_ptr<TFullData> FFData; + std::shared_ptr<TFilterStageData> FilterStageData; + std::shared_ptr<TFetchStageData> FetchStageData; - bool NeedEFFlag = false; - bool NeedPKFlag = false; - bool NeedFFFlag = false; + std::optional<TFetchingPlan> FetchingPlan; - virtual void DoFetchEF() = 0; - virtual void DoFetchPK() = 0; - virtual void DoFetchFF() = 0; + bool FilterStageFlag = false; + + virtual void DoStartFilterStage() = 0; + virtual void DoStartFetchStage() = 0; virtual void DoAbort() = 0; public: @@ -55,10 +72,6 @@ public: DoAbort(); } - bool IsScannersFinished() const { - return Intervals.empty(); - } - NJson::TJsonValue DebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("source_idx", SourceIdx); @@ -71,41 +84,25 @@ public: bool OnIntervalFinished(const ui32 intervalIdx); std::shared_ptr<arrow::RecordBatch> GetBatch() const { - if (!EFData || !PKData || !FFData) { + if (!FilterStageData || !FetchStageData) { return nullptr; } - return NArrow::MergeColumns({EFData->GetBatch(), PKData->GetBatch(), FFData->GetBatch()}); - } - - bool HasFFData() const { - return HasPKData() && !!FFData; + return NArrow::MergeColumns({FilterStageData->GetBatch(), FetchStageData->GetBatch()}); } - bool HasPKData() const { - return HasEFData() && !!PKData; + bool IsDataReady() const { + return !!FilterStageData && !!FetchStageData; } - bool HasEFData() const { - return !!EFData; + const TFilterStageData& GetFilterStageData() const { + Y_VERIFY(FilterStageData); + return *FilterStageData; } - const TEarlyFilterData& GetEFData() const { - Y_VERIFY(EFData); - return *EFData; - } - - const TPrimaryKeyData& GetPKData() const { - Y_VERIFY(PKData); - return *PKData; - } + void InitFetchingPlan(const TFetchingPlan& fetchingPlan); - void NeedEF(); - void NeedPK(); - void NeedFF(); - - void InitEF(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch); - void InitFF(const std::shared_ptr<arrow::RecordBatch>& batch); - void InitPK(const std::shared_ptr<arrow::RecordBatch>& batch); + void InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch); + void InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& batch); void RegisterInterval(TFetchingInterval* interval) { Intervals.emplace_back(interval); @@ -123,7 +120,9 @@ public: Y_VERIFY(Start.Compare(Finish) != std::partial_ordering::greater); } - virtual ~IDataSource() = default; + virtual ~IDataSource() { + Y_VERIFY(Intervals.empty()); + } virtual void AddData(const TBlobRange& range, TString&& data) = 0; }; @@ -133,11 +132,10 @@ private: std::shared_ptr<TPortionInfo> Portion; THashMap<TBlobRange, std::shared_ptr<IFetchTaskConstructor>> BlobsWaiting; - void NeedFetchColumns(const std::set<ui32>& columnIds, std::shared_ptr<IFetchTaskConstructor> constructor); + void NeedFetchColumns(const std::set<ui32>& columnIds, std::shared_ptr<IFetchTaskConstructor> constructor, const std::shared_ptr<NArrow::TColumnFilter>& filter); - virtual void DoFetchEF() override; - virtual void DoFetchPK() override; - virtual void DoFetchFF() override; + virtual void DoStartFilterStage() override; + virtual void DoStartFetchStage() override; virtual NJson::TJsonValue DoDebugJson() const override { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("type", "portion"); @@ -177,15 +175,11 @@ private: } - virtual void DoFetchEF() override { - DoFetch(); - } - - virtual void DoFetchPK() override { + virtual void DoStartFilterStage() override { DoFetch(); } - virtual void DoFetchFF() override { + virtual void DoStartFetchStage() override { DoFetch(); } virtual NJson::TJsonValue DoDebugJson() const override { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make index c43747503a..c5a23130f4 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make @@ -9,6 +9,7 @@ SRCS( plain_read_data.cpp filter_assembler.cpp column_assembler.cpp + columns_set.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp deleted file mode 100644 index b87bc406ef..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include "postfilter_assembler.h" -#include "batch.h" -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> - -namespace NKikimr::NOlap::NIndexedReader { - -bool TAssembleBatch::DoExecuteImpl() { - /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey. - /// It's not OK to apply predicate before replacing key duplicates otherwise. - /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - - Y_VERIFY(BatchConstructor.GetColumnsCount()); - - TPortionInfo::TPreparedBatchData::TAssembleOptions options; - auto addBatch = BatchConstructor.Assemble(options); - Y_VERIFY(addBatch); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) - ("columns_count", addBatch->num_columns())("num_rows", addBatch->num_rows()); - Y_VERIFY(Filter->Apply(addBatch)); - Y_VERIFY(NArrow::MergeBatchColumns({ FilterBatch, addBatch }, FullBatch, FullColumnsOrder, true)); - - return true; -} - -bool TAssembleBatch::DoApply(IDataReader& owner) const { - auto& reader = owner.GetMeAs<TIndexedReadData>(); - TBatch* batch = reader.GetGranulesContext().GetBatchInfo(BatchAddress); - if (batch) { - batch->InitBatch(FullBatch); - } - return true; -} - -TAssembleBatch::TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor, - TBatch& currentBatch, const std::vector<std::string>& fullColumnsOrder, - NColumnShard::IDataTasksProcessor::TPtr processor) - : TBase(processor) - , BatchConstructor(batchConstructor) - , FullColumnsOrder(fullColumnsOrder) - , Filter(currentBatch.GetFetchedInfo().GetFilter()) - , FilterBatch(currentBatch.GetFetchedInfo().GetFilterBatch()) - , BatchAddress(currentBatch.GetBatchAddress()) -{ - TBase::SetPriority(TBase::EPriority::High); - Y_VERIFY(currentBatch.GetFetchedInfo().GetFilter()); - Y_VERIFY(currentBatch.GetFetchedInfo().GetFilterBatch()); - Y_VERIFY(!currentBatch.GetFetchedInfo().GetFilteredBatch()); -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h deleted file mode 100644 index 62e8d84bd4..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h +++ /dev/null @@ -1,35 +0,0 @@ -#pragma once -#include "common.h" -#include "conveyor_task.h" - -#include <ydb/core/tx/columnshard/counters/common/object_counter.h> -#include <ydb/core/tx/columnshard/engines/portion_info.h> -#include <ydb/core/formats/arrow/arrow_filter.h> - -#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> - -namespace NKikimr::NOlap::NIndexedReader { -class TBatch; -class TAssembleBatch: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TAssembleBatch, true, true> { -private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; - TPortionInfo::TPreparedBatchData BatchConstructor; - std::shared_ptr<arrow::RecordBatch> FullBatch; - std::vector<std::string> FullColumnsOrder; - - std::shared_ptr<NArrow::TColumnFilter> Filter; - std::shared_ptr<arrow::RecordBatch> FilterBatch; - - const TBatchAddress BatchAddress; -protected: - virtual bool DoApply(IDataReader& owner) const override; - virtual bool DoExecuteImpl() override; -public: - virtual TString GetTaskClassIdentifier() const override { - return "Reading::TAssembleBatch"; - } - - TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor, - TBatch& currentBatch, const std::vector<std::string>& fullColumnsOrder, NColumnShard::IDataTasksProcessor::TPtr processor); -}; -} diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp deleted file mode 100644 index 2a91e4af26..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp +++ /dev/null @@ -1,103 +0,0 @@ -#include "processing_context.h" - -namespace NKikimr::NOlap::NIndexedReader { - -void TProcessingController::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) { - if (NotIndexedBatchesInitialized) { - Y_VERIFY(!batches); - return; - } - NotIndexedBatchesInitialized = true; - GranulesInProcessing.erase(0); - auto granules = GranulesWaiting; - for (auto&& [_, gPtr] : granules) { - if (!batches) { - gPtr->AddNotIndexedBatch(nullptr); - } else { - auto it = batches->find(gPtr->GetGranuleId()); - if (it == batches->end()) { - gPtr->AddNotIndexedBatch(nullptr); - } else { - gPtr->AddNotIndexedBatch(it->second); - } - batches->erase(it); - } - } - GuardZeroGranuleData.FreeAll(); -} - -NKikimr::NOlap::NIndexedReader::TBatch* TProcessingController::GetBatchInfo(const TBatchAddress& address) { - auto it = GranulesWaiting.find(address.GetGranuleId()); - if (it == GranulesWaiting.end()) { - return nullptr; - } else { - return &it->second->GetBatchInfo(address.GetBatchGranuleIdx()); - } -} - -TGranule::TPtr TProcessingController::ExtractReadyVerified(const ui64 granuleId) { - Y_VERIFY(NotIndexedBatchesInitialized); - auto it = GranulesWaiting.find(granuleId); - Y_VERIFY(it != GranulesWaiting.end()); - TGranule::TPtr result = it->second; - GranulesInProcessing.erase(granuleId); - GranulesWaiting.erase(it); - return result; -} - -TGranule::TPtr TProcessingController::GetGranuleVerified(const ui64 granuleId) const { - auto it = GranulesWaiting.find(granuleId); - Y_VERIFY(it != GranulesWaiting.end()); - return it->second; -} - -TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) const { - auto itGranule = GranulesWaiting.find(granuleId); - if (itGranule == GranulesWaiting.end()) { - return nullptr; - } - return itGranule->second; -} - -TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) { - Y_VERIFY(GranulesWaiting.emplace(g->GetGranuleId(), g).second); - ++OriginalGranulesCount; - return g; -} - -void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) { - if (GranulesInProcessing.emplace(granuleId).second) { - if (granuleId) { - GetGranuleVerified(granuleId)->StartConstruction(); - Y_VERIFY(GranulesWaiting.contains(granuleId)); - } - } - if (!granuleId) { - Y_VERIFY(!NotIndexedBatchesInitialized); - GuardZeroGranuleData.Take(range.Size); - } -} - -void TProcessingController::Abort() { - NotIndexedBatchesInitialized = true; - GranulesWaiting.clear(); - GranulesInProcessing.clear(); -} - -TString TProcessingController::DebugString() const { - return TStringBuilder() - << "waiting:" << GranulesWaiting.size() << ";" - << "in_progress:" << GranulesInProcessing.size() << ";" - << "original_waiting:" << OriginalGranulesCount << ";" - << "common_granules_data:" << CommonGranuleData << ";" - << "common_initialized:" << NotIndexedBatchesInitialized << ";" - ; -} - -NKikimr::NOlap::NIndexedReader::TBatch& TProcessingController::GetBatchInfoVerified(const TBatchAddress& address) { - NIndexedReader::TBatch* bInfo = GetBatchInfo(address); - Y_VERIFY(bInfo); - return *bInfo; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h deleted file mode 100644 index bddd479e08..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.h +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once -#include "granule.h" -#include <ydb/core/tx/columnshard/counters/scan.h> -#include <ydb/core/tx/columnshard/resources/memory.h> - -namespace NKikimr::NOlap::NIndexedReader { - -class TProcessingController { -private: - THashMap<ui64, TGranule::TPtr> GranulesWaiting; - ui32 OriginalGranulesCount = 0; - ui64 CommonGranuleData = 0; - std::set<ui64> GranulesInProcessing; - bool NotIndexedBatchesInitialized = false; - const NColumnShard::TConcreteScanCounters Counters; - TScanMemoryLimiter::TGuard GuardZeroGranuleData; -public: - TString DebugString() const; - bool IsGranuleActualForProcessing(const ui64 granuleId) const { - return GranulesWaiting.contains(granuleId) || (granuleId == 0 && !NotIndexedBatchesInitialized); - } - - TProcessingController(TScanMemoryLimiter::IMemoryAccessor::TPtr memoryAccessor, const NColumnShard::TConcreteScanCounters& counters) - : Counters(counters) - , GuardZeroGranuleData(memoryAccessor, Counters.Aggregations.GetGranulesProcessing()) - { - } - - ~TProcessingController() { - Abort(); - } - - void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); - - NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address); - NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address); - - const std::set<ui64>& GetProcessingGranules() const { - return GranulesInProcessing; - } - - ui32 GetProcessingGranulesCount() const { - return GranulesInProcessing.size(); - } - - bool IsInProgress(const ui64 granuleId) const { - return GranulesInProcessing.contains(granuleId); - } - - void Abort(); - - ui32 GetCount() const { - return GranulesInProcessing.size(); - } - - void StartBlobProcessing(const ui64 granuleId, const TBlobRange& range); - - TGranule::TPtr ExtractReadyVerified(const ui64 granuleId); - - TGranule::TPtr GetGranuleVerified(const ui64 granuleId) const; - - bool IsFinished() const { return GranulesWaiting.empty() && NotIndexedBatchesInitialized; } - - TGranule::TPtr InsertGranule(TGranule::TPtr g); - - TGranule::TPtr GetGranule(const ui64 granuleId) const; - -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp index 55f1d79ace..832d00a0c0 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -8,8 +8,10 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const { result["reverse"] = ReverseSort; result["records_count"] = RecordsCount; result["position"] = Position; - result["sorting"] = Sorting.DebugJson(Position); - result["data"] = Data.DebugJson(Position); + result["sorting"] = Sorting->DebugJson(Position); + if (Data) { + result["data"] = Data->DebugJson(Position); + } return result; } @@ -95,7 +97,7 @@ void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition } void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { - if (!batch || !batch->num_rows() || (filter && filter->IsTotalDenyFilter())) { + if (!batch || !batch->num_rows()) { return; } Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema)); @@ -134,51 +136,75 @@ void TMergePartialStream::RemoveControlPoint() { SortHeap.pop_back(); } -bool TMergePartialStream::DrainCurrent(std::shared_ptr<TRecordBatchBuilder> builder, const std::optional<TSortableBatchPosition>& readTo, const bool includeFinish) { - if (SortHeap.empty()) { - return false; - } - while (SortHeap.size()) { - if (readTo) { - auto position = TBatchIterator::TPosition(SortHeap.front()); - if (includeFinish) { - if (position.GetKeyColumns().Compare(*readTo) == std::partial_ordering::greater) { - return true; - } - } else { - if (position.GetKeyColumns().Compare(*readTo) != std::partial_ordering::less) { - return true; - } - } +void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) { +#ifndef NDEBUG + if (CurrentKeyColumns) { + const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextKeyColumnsPosition) == std::partial_ordering::less; + if (!linearExecutionCorrectness) { + const bool newSegmentScan = nextKeyColumnsPosition.GetPosition() == 0; + AFL_VERIFY(newSegmentScan && nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::less) + ("merge_debug", DebugJson())("current_ext", nextKeyColumnsPosition.DebugJson())("newSegmentScan", newSegmentScan); } + } + CurrentKeyColumns = nextKeyColumnsPosition; +#else + Y_UNUSED(nextKeyColumnsPosition); +#endif +} - auto currentPosition = DrainCurrentPosition(); - if (currentPosition.IsControlPoint()) { - return false; - } - if (currentPosition.IsDeleted()) { - continue; - } - auto& nextKeyColumnsPosition = currentPosition.GetKeyColumns(); - if (CurrentKeyColumns) { - const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextKeyColumnsPosition) == std::partial_ordering::less; - if (!linearExecutionCorrectness) { - const bool newSegmentScan = nextKeyColumnsPosition.GetPosition() == 0; - AFL_VERIFY(newSegmentScan && nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::less) - ("merge_debug", DebugJson())("current_ext", nextKeyColumnsPosition.DebugJson())("newSegmentScan", newSegmentScan); +bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish) { + Y_VERIFY((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); + PutControlPoint(std::make_shared<TSortableBatchPosition>(readTo)); + bool cpReachedFlag = false; + while (SortHeap.size() && !cpReachedFlag) { + if (SortHeap.front().IsControlPoint()) { + RemoveControlPoint(); + cpReachedFlag = true; + if (SortHeap.empty() || !includeFinish || SortHeap.front().GetKeyColumns().Compare(readTo) == std::partial_ordering::greater) { + return true; } } - CurrentKeyColumns = currentPosition.GetKeyColumns(); - if (builder) { - builder->AddRecord(*CurrentKeyColumns); + + if (auto currentPosition = DrainCurrentPosition()) { + CheckSequenceInDebug(*currentPosition); + builder.AddRecord(*currentPosition); } - if (!readTo) { - return true; + } + return false; +} + +bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { + Y_VERIFY((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); + while (SortHeap.size()) { + if (auto currentPosition = DrainCurrentPosition()) { + CheckSequenceInDebug(*currentPosition); + builder.AddRecord(*currentPosition); } } return false; } +std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() { + Y_VERIFY(SortHeap.size()); + Y_VERIFY(!SortHeap.front().IsControlPoint()); + TSortableBatchPosition result = SortHeap.front().GetKeyColumns(); + TSortableBatchPosition resultVersion = SortHeap.front().GetVersionColumns(); + bool isFirst = true; + const bool deletedFlag = SortHeap.front().IsDeleted(); + while (SortHeap.size() && (isFirst || result.Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) { + auto& anotherIterator = SortHeap.front(); + if (!isFirst) { + Y_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater); + } + NextInHeap(true); + isFirst = false; + } + if (deletedFlag) { + return {}; + } + return result; +} + NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const { NJson::TJsonValue result; result["is_cp"] = IsControlPoint(); @@ -202,7 +228,7 @@ TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, } void TRecordBatchBuilder::AddRecord(const TSortableBatchPosition& position) { - Y_VERIFY(position.GetData().GetColumns().size() == Builders.size()); + Y_VERIFY_DEBUG(position.GetData().GetColumns().size() == Builders.size()); Y_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields)); // AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson()); for (ui32 i = 0; i < position.GetData().GetColumns().size(); ++i) { diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index 211beadf74..f4cded7787 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -63,8 +63,8 @@ protected: YDB_READONLY(i64, Position, 0); i64 RecordsCount = 0; bool ReverseSort = false; - TSortableScanData Sorting; - TSortableScanData Data; + std::shared_ptr<TSortableScanData> Sorting; + std::shared_ptr<TSortableScanData> Data; std::shared_ptr<arrow::RecordBatch> Batch; static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include); @@ -72,7 +72,7 @@ public: TSortableBatchPosition() = default; const TSortableScanData& GetData() const { - return Data; + return *Data; } bool IsReverseSort() const { @@ -81,11 +81,11 @@ public: NJson::TJsonValue DebugJson() const; TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const { - return TSortableBatchPosition(batch, position, Sorting.GetFieldNames(), Data.GetFieldNames(), ReverseSort); + return TSortableBatchPosition(batch, position, Sorting->GetFieldNames(), Data->GetFieldNames(), ReverseSort); } bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) { - return Sorting.IsSameSchema(schema); + return Sorting->IsSameSchema(schema); } static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) { @@ -105,19 +105,21 @@ public: : Position(position) , RecordsCount(batch->num_rows()) , ReverseSort(reverseSort) - , Sorting(batch, sortingColumns) - , Data(batch, dataColumns) + , Sorting(std::make_shared<TSortableScanData>(batch, sortingColumns)) , Batch(batch) { + if (dataColumns.size()) { + Data = std::make_shared<TSortableScanData>(batch, dataColumns); + } Y_VERIFY(batch->num_rows()); Y_VERIFY_DEBUG(batch->ValidateFull().ok()); - Y_VERIFY(Sorting.GetColumns().size()); + Y_VERIFY(Sorting->GetColumns().size()); } std::partial_ordering Compare(const TSortableBatchPosition& item) const { Y_VERIFY(item.ReverseSort == ReverseSort); - Y_VERIFY(item.Sorting.GetColumns().size() == Sorting.GetColumns().size()); - const auto directResult = NArrow::ColumnsCompare(Sorting.GetColumns(), Position, item.Sorting.GetColumns(), item.Position); + Y_VERIFY(item.Sorting->GetColumns().size() == Sorting->GetColumns().size()); + const auto directResult = NArrow::ColumnsCompare(Sorting->GetColumns(), Position, item.Sorting->GetColumns(), item.Position); if (ReverseSort) { if (directResult == std::partial_ordering::less) { return std::partial_ordering::greater; @@ -161,7 +163,9 @@ public: class TMergePartialStream { private: +#ifndef NDEBUG std::optional<TSortableBatchPosition> CurrentKeyColumns; +#endif class TBatchIterator { private: bool ControlPointFlag; @@ -193,6 +197,10 @@ private: return KeyColumns; } + const TSortableBatchPosition& GetVersionColumns() const { + return VersionColumns; + } + TBatchIterator(const TSortableBatchPosition& keyColumns) : ControlPointFlag(true) , KeyColumns(keyColumns) @@ -213,8 +221,7 @@ private: Y_VERIFY(KeyColumns.InitPosition(GetFirstPosition())); Y_VERIFY(VersionColumns.InitPosition(GetFirstPosition())); if (Filter) { - FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort)); - Y_VERIFY(Filter->Size() == RecordsCount); + FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount)); } } @@ -222,42 +229,6 @@ private: return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less; } - class TPosition { - private: - TSortableBatchPosition KeyColumns; - TSortableBatchPosition VersionColumns; - bool DeletedFlag; - bool ControlPointFlag; - public: - const TSortableBatchPosition& GetKeyColumns() const { - return KeyColumns; - } - - bool IsControlPoint() const { - return ControlPointFlag; - } - - bool IsDeleted() const { - return DeletedFlag; - } - - void TakeIfMoreActual(const TBatchIterator& anotherIterator) { - Y_VERIFY_DEBUG(KeyColumns.Compare(anotherIterator.KeyColumns) == std::partial_ordering::equivalent); - if (VersionColumns.Compare(anotherIterator.VersionColumns) == std::partial_ordering::less) { - DeletedFlag = anotherIterator.IsDeleted(); - ControlPointFlag = anotherIterator.IsControlPoint(); - } - } - - TPosition(const TBatchIterator& owner) - : KeyColumns(owner.KeyColumns) - , VersionColumns(owner.VersionColumns) - , DeletedFlag(owner.IsDeleted()) - , ControlPointFlag(owner.IsControlPoint()) - { - } - }; - bool IsDeleted() const { if (!FilterIterator) { return false; @@ -307,9 +278,11 @@ private: NJson::TJsonValue DebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; +#ifndef NDEBUG if (CurrentKeyColumns) { result["current"] = CurrentKeyColumns->DebugJson(); } +#endif for (auto&& i : SortHeap) { result["heap"].AppendValue(i.DebugJson()); } @@ -352,25 +325,10 @@ private: const bool Reverse; ui32 ControlPoints = 0; - TBatchIterator::TPosition DrainCurrentPosition() { - Y_VERIFY(SortHeap.size()); - auto position = TBatchIterator::TPosition(SortHeap.front()); - if (SortHeap.front().IsControlPoint()) { - return position; - } - bool isFirst = true; - while (SortHeap.size() && (isFirst || position.GetKeyColumns().Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) { - if (!isFirst) { - position.TakeIfMoreActual(SortHeap.front()); - } - Y_VERIFY(!SortHeap.front().IsControlPoint()); - NextInHeap(true); - isFirst = false; - } - return position; - } + std::optional<TSortableBatchPosition> DrainCurrentPosition(); void AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap); + void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition); public: TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse) : SortSchema(sortSchema) @@ -393,10 +351,6 @@ public: return it->second.size(); } - const std::optional<TSortableBatchPosition>& GetCurrentKeyColumns() const { - return CurrentKeyColumns; - } - void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point); void RemoveControlPoint(); @@ -411,13 +365,14 @@ public: return SortHeap.empty(); } - bool DrainCurrent(std::shared_ptr<TRecordBatchBuilder> builder = nullptr, const std::optional<TSortableBatchPosition>& readTo = {}, const bool includeFinish = false); + bool DrainAll(TRecordBatchBuilder& builder); + bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish); }; class TRecordBatchBuilder { private: std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders; - std::vector<std::shared_ptr<arrow::Field>> Fields; + YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields); YDB_READONLY(ui32, RecordsCount, 0); bool IsSameFieldsSequence(const std::vector<std::shared_ptr<arrow::Field>>& f1, const std::vector<std::shared_ptr<arrow::Field>>& f2) { @@ -433,6 +388,18 @@ private: } public: + ui32 GetBuildersCount() const { + return Builders.size(); + } + + TString GetColumnNames() const { + TStringBuilder result; + for (auto&& f : Fields) { + result << f->name() << ","; + } + return result; + } + TRecordBatchBuilder(const std::vector<std::shared_ptr<arrow::Field>>& fields) : Fields(fields) { Y_VERIFY(Fields.size()); diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 0f8611e2d1..f57cb658c4 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -1,12 +1,8 @@ #include "read_metadata.h" -#include "order_control/default.h" -#include "order_control/pk_with_limit.h" -#include "order_control/not_sorted.h" #include "plain_reader/plain_read_data.h" #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/columnshard/columnshard__index_scan.h> #include <ydb/core/tx/columnshard/columnshard__stats_scan.h> -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> #include <util/string/join.h> namespace NKikimr::NOlap { @@ -143,23 +139,6 @@ std::set<ui32> TReadMetadata::GetPKColumnIds() const { return result; } -std::set<ui32> TReadMetadata::GetUsedColumnIds() const { - std::set<ui32> result; - auto& indexInfo = ResultIndexSchema->GetIndexInfo(); - if (Snapshot.GetPlanStep()) { - auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); - for (auto&& i : snapSchema->fields()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name()); - result.emplace(indexInfo.GetColumnId(i->name())); - } - } - result.insert(AllColumns.begin(), AllColumns.end()); - for (auto&& i : indexInfo.GetPrimaryKey()) { - Y_VERIFY(result.contains(indexInfo.GetColumnId(i.first))); - } - return result; -} - std::vector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetResultYqlSchema() const { return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, ResultColumnIds); } @@ -192,39 +171,6 @@ void TReadStats::PrintToLog() { ; } -NIndexedReader::IOrderPolicy::TPtr TReadMetadata::DoBuildSortingPolicy() const { - auto& indexInfo = ResultIndexSchema->GetIndexInfo(); - if (Limit && IsSorted() && indexInfo.IsSorted() && - indexInfo.GetReplaceKey()->Equals(indexInfo.GetIndexKey())) { - ui32 idx = 0; - for (auto&& i : indexInfo.GetPrimaryKey()) { - if (idx >= indexInfo.GetSortingKey()->fields().size()) { - break; - } - if (indexInfo.GetSortingKey()->fields()[idx]->name() != i.first) { - return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this()); - } - ++idx; - } - - if (!idx || !GetProgram().HasEarlyFilterOnly()) { - return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this()); - } - return std::make_shared<NIndexedReader::TPKSortingWithLimit>(this->shared_from_this()); - } else if (IsSorted()) { - return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this()); - } else { - return std::make_shared<NIndexedReader::TNonSorting>(this->shared_from_this()); - } -} - -std::shared_ptr<NIndexedReader::IOrderPolicy> TReadMetadata::BuildSortingPolicy() const { - auto result = DoBuildSortingPolicy(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "sorting_policy_constructed")("info", result->DebugString()); - NYDBTest::TControllers::GetColumnShardController()->OnSortingPolicy(result); - return result; -} - std::shared_ptr<NKikimr::NOlap::IDataReader> TReadMetadata::BuildReader(const NOlap::TReadContext& context, const TConstPtr& self) const { return std::make_shared<NPlainReader::TPlainReadData>(self, context); // auto result = std::make_shared<TIndexedReadData>(self, context); diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 1aa23d0366..9faa66ee63 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -22,10 +22,6 @@ class TScanIteratorBase; namespace NKikimr::NOlap { -namespace NIndexedReader { -class IOrderPolicy; -} - struct TReadStats { TInstant BeginTimestamp; ui32 SelectedIndex{0}; @@ -138,7 +134,6 @@ private: std::shared_ptr<ISnapshotSchema> ResultIndexSchema; std::vector<ui32> AllColumns; std::vector<ui32> ResultColumnsIds; - std::shared_ptr<NIndexedReader::IOrderPolicy> DoBuildSortingPolicy() const; mutable std::map<TSnapshot, ISnapshotSchema::TPtr> SchemasByVersionCache; mutable ISnapshotSchema::TPtr EmptyVersionSchemaCache; public: @@ -159,8 +154,6 @@ public: return Snapshot; } - std::shared_ptr<NIndexedReader::IOrderPolicy> BuildSortingPolicy() const; - TReadMetadata(const TVersionedIndex& info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram) : TBase(sorting, ssaProgram) , IndexVersions(info) @@ -223,7 +216,6 @@ public: } std::set<ui32> GetEarlyFilterColumnIds() const; - std::set<ui32> GetUsedColumnIds() const; std::set<ui32> GetPKColumnIds() const; bool Empty() const { diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make index 58fc56e898..2ca09c00d5 100644 --- a/ydb/core/tx/columnshard/engines/reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/ya.make @@ -1,20 +1,13 @@ LIBRARY() SRCS( - batch.cpp common.cpp conveyor_task.cpp description.cpp - filling_context.cpp - filter_assembler.cpp - granule.cpp - processing_context.cpp - postfilter_assembler.cpp queue.cpp read_filter_merger.cpp read_metadata.cpp read_context.cpp - granule_preparation.cpp ) PEERDIR( @@ -25,7 +18,6 @@ PEERDIR( ydb/core/tx/columnshard/hooks/abstract ydb/core/tx/columnshard/resources ydb/core/tx/program - ydb/core/tx/columnshard/engines/reader/order_control ydb/core/tx/columnshard/engines/reader/plain_reader ydb/core/tx/columnshard/engines/scheme ) diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp index 8d0ebfccf8..a392c75ce5 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp @@ -89,8 +89,8 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TStr AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", status.ToString()); return nullptr; } - batch = NArrow::SortBatch(batch, sortingKey); - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, sortingKey)); + batch = NArrow::SortBatch(batch, sortingKey, true); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, sortingKey)); return batch; } diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 3d6b808e4e..51e1b6b9ae 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -328,17 +328,27 @@ std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) return features.GetLoader(*this); } -std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const { - std::shared_ptr<arrow::Schema> schema = Schema; - if (IsSpecialColumn(columnId)) { - schema = ArrowSchemaSnapshot(); +std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnsSchema(const std::set<ui32>& columnIds) const { + Y_VERIFY(columnIds.size()); + std::vector<std::shared_ptr<arrow::Field>> fields; + for (auto&& i : columnIds) { + std::shared_ptr<arrow::Schema> schema; + if (IsSpecialColumn(i)) { + schema = ArrowSchemaSnapshot(); + } else { + schema = Schema; + } + auto field = schema->GetFieldByName(GetColumnName(i)); + Y_VERIFY(field); + fields.emplace_back(field); } - auto field = schema->GetFieldByName(GetColumnName(columnId)); - Y_VERIFY(field); - std::vector<std::shared_ptr<arrow::Field>> fields = { field }; return std::make_shared<arrow::Schema>(fields); } +std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const { + return GetColumnsSchema({columnId}); +} + bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { if (schema.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema"); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 62b034283f..2dd955dd6f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -99,6 +99,7 @@ public: } std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const; + std::shared_ptr<arrow::Schema> GetColumnsSchema(const std::set<ui32>& columnIds) const; TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const; std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const; diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make index 54de6c172c..4b058ca9bb 100644 --- a/ydb/core/tx/columnshard/engines/ya.make +++ b/ydb/core/tx/columnshard/engines/ya.make @@ -10,7 +10,6 @@ SRCS( column_features.cpp db_wrapper.cpp index_info.cpp - indexed_read_data.cpp filter.cpp portion_info.cpp tier_info.cpp diff --git a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt index d07c2a1b8e..006b0b2e92 100644 --- a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.darwin-x86_64.txt @@ -12,7 +12,7 @@ target_link_libraries(columnshard-hooks-testing PUBLIC contrib-libs-cxxsupp yutil columnshard-hooks-abstract - engines-reader-order_control + columnshard-engines-changes ) target_sources(columnshard-hooks-testing PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/hooks/testing/controller.cpp diff --git a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt index eab89d03d7..c374737b33 100644 --- a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-aarch64.txt @@ -13,7 +13,7 @@ target_link_libraries(columnshard-hooks-testing PUBLIC contrib-libs-cxxsupp yutil columnshard-hooks-abstract - engines-reader-order_control + columnshard-engines-changes ) target_sources(columnshard-hooks-testing PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/hooks/testing/controller.cpp diff --git a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt index eab89d03d7..c374737b33 100644 --- a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt @@ -13,7 +13,7 @@ target_link_libraries(columnshard-hooks-testing PUBLIC contrib-libs-cxxsupp yutil columnshard-hooks-abstract - engines-reader-order_control + columnshard-engines-changes ) target_sources(columnshard-hooks-testing PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/hooks/testing/controller.cpp diff --git a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt index d07c2a1b8e..006b0b2e92 100644 --- a/ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt @@ -12,7 +12,7 @@ target_link_libraries(columnshard-hooks-testing PUBLIC contrib-libs-cxxsupp yutil columnshard-hooks-abstract - engines-reader-order_control + columnshard-engines-changes ) target_sources(columnshard-hooks-testing PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/hooks/testing/controller.cpp diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.cpp b/ydb/core/tx/columnshard/hooks/testing/controller.cpp index bd6c9d93c8..9f1f5cdb4b 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.cpp +++ b/ydb/core/tx/columnshard/hooks/testing/controller.cpp @@ -1,27 +1,10 @@ #include "controller.h" -#include <ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.h> -#include <ydb/core/tx/columnshard/engines/reader/order_control/default.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <ydb/core/tx/columnshard/engines/changes/compaction.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::NYDBTest::NColumnShard { -bool TController::DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> policy) { - if (dynamic_cast<const NOlap::NIndexedReader::TPKSortingWithLimit*>(policy.get())) { - SortingWithLimit.Inc(); - } else if (dynamic_cast<const NOlap::NIndexedReader::TAnySorting*>(policy.get())) { - AnySorting.Inc(); - } else { - Y_VERIFY(false); - } - return true; -} - -bool TController::HasPKSortingOnly() const { - return SortingWithLimit.Val() && !AnySorting.Val(); -} - bool TController::DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) { if (batch) { FilteredRecordsCount.Add(batch->num_rows()); diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 1ab5407430..1158fcba97 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -5,13 +5,10 @@ namespace NKikimr::NYDBTest::NColumnShard { class TController: public ICSController { private: - YDB_READONLY(TAtomicCounter, SortingWithLimit, 0); - YDB_READONLY(TAtomicCounter, AnySorting, 0); YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0); YDB_READONLY(TAtomicCounter, InternalCompactions, 0); YDB_READONLY(TAtomicCounter, SplitCompactions, 0); protected: - virtual bool DoOnSortingPolicy(std::shared_ptr<NOlap::NIndexedReader::IOrderPolicy> policy) override; virtual bool DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) override; virtual bool DoOnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) override; diff --git a/ydb/core/tx/columnshard/hooks/testing/ya.make b/ydb/core/tx/columnshard/hooks/testing/ya.make index be421974c1..5a56a25ea2 100644 --- a/ydb/core/tx/columnshard/hooks/testing/ya.make +++ b/ydb/core/tx/columnshard/hooks/testing/ya.make @@ -6,7 +6,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/hooks/abstract - ydb/core/tx/columnshard/engines/reader/order_control + ydb/core/tx/columnshard/engines/changes ) END() diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h index 261cdf3bc3..3ef8ebf6f8 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.h +++ b/ydb/core/tx/columnshard/inflight_request_tracker.h @@ -1,7 +1,8 @@ #pragma once #include "blob.h" -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> +#include "engines/reader/read_metadata.h" +#include "engines/column_engine.h" namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index d4c88590ff..c708a261ed 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -1,5 +1,4 @@ #include <ydb/core/tx/columnshard/columnshard_impl.h> -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> #include <ydb/core/tx/columnshard/blob_cache.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -19,7 +18,7 @@ private: size_t next = 1; for (auto it = ready.begin(); it != ready.end(); ++it, ++next) { const bool lastOne = IndexedData->IsFinished() && (next == ready.size()); - SendResult(ctx, it->GetResultBatch(), lastOne); + SendResult(ctx, it->GetResultBatchPtrVerified(), lastOne); } } } |