diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-04-03 21:50:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-03 21:50:17 +0300 |
commit | d42a420408052ce6f86e52b849855343b309850e (patch) | |
tree | 6eb006cdf88cf7f0e9d6e1858be25658a66c90d6 | |
parent | ab81c605a9e051777b60961be2e95705b713f6a5 (diff) | |
download | ydb-d42a420408052ce6f86e52b849855343b309850e.tar.gz |
clean legacy code (#3450)
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.cpp | 64 | ||||
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.h | 8 | ||||
-rw-r--r-- | ydb/core/formats/arrow/merging_sorted_input_stream.cpp | 303 | ||||
-rw-r--r-- | ydb/core/formats/arrow/merging_sorted_input_stream.h | 54 | ||||
-rw-r--r-- | ydb/core/formats/arrow/one_batch_input_stream.h | 36 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/merger.cpp | 3 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/merger.h | 2 | ||||
-rw-r--r-- | ydb/core/formats/arrow/sort_cursor.h | 262 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ut/ut_arrow.cpp (renamed from ydb/core/formats/arrow/ut_arrow.cpp) | 193 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ut/ut_program_step.cpp (renamed from ydb/core/formats/arrow/ut_program_step.cpp) | 12 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ya.make | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/scheme/index_info.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/scheme/index_info.h | 7 |
14 files changed, 62 insertions, 916 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index e05610423e..aaf8cbb01a 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -1,8 +1,6 @@ #include "arrow_helpers.h" #include "switch_type.h" -#include "one_batch_input_stream.h" #include "common/validation.h" -#include "merging_sorted_input_stream.h" #include "permutations.h" #include "common/adapter.h" #include "serializer/native.h" @@ -339,68 +337,6 @@ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& return arrow::RecordBatch::Make(table->schema(), table->num_rows(), columns); } -std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const std::shared_ptr<TSortDescription>& description) { - std::vector<NArrow::IInputStream::TPtr> streams; - for (auto& batch : batches) { - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } - - auto mergeStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, Max<ui64>()); - std::shared_ptr<arrow::RecordBatch> batch = mergeStream->Read(); - Y_ABORT_UNLESS(!mergeStream->Read()); - return batch; -} - -std::vector<std::shared_ptr<arrow::RecordBatch>> MergeSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const std::shared_ptr<TSortDescription>& description, - size_t maxBatchRows) { - Y_ABORT_UNLESS(maxBatchRows); - ui64 numRows = 0; - std::vector<NArrow::IInputStream::TPtr> streams; - streams.reserve(batches.size()); - for (auto& batch : batches) { - if (batch->num_rows()) { - numRows += batch->num_rows(); - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } - } - - std::vector<std::shared_ptr<arrow::RecordBatch>> out; - out.reserve(numRows / maxBatchRows + 1); - - auto mergeStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, maxBatchRows); - while (std::shared_ptr<arrow::RecordBatch> batch = mergeStream->Read()) { - Y_ABORT_UNLESS(batch->num_rows()); - out.push_back(batch); - } - return out; -} - -std::vector<std::shared_ptr<arrow::RecordBatch>> SliceSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const std::shared_ptr<TSortDescription>& description, - size_t maxBatchRows) { - Y_ABORT_UNLESS(!description->Reverse); - - std::vector<NArrow::IInputStream::TPtr> streams; - streams.reserve(batches.size()); - for (auto& batch : batches) { - if (batch->num_rows()) { - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } - } - - std::vector<std::shared_ptr<arrow::RecordBatch>> out; - out.reserve(streams.size()); - - auto dedupStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, maxBatchRows, true); - while (std::shared_ptr<arrow::RecordBatch> batch = dedupStream->Read()) { - Y_ABORT_UNLESS(batch->num_rows()); - out.push_back(batch); - } - return out; -} - // Check if the permutation doesn't reorder anything bool IsTrivial(const arrow::UInt64Array& permutation, const ui64 originalLength) { if ((ui64)permutation.length() != originalLength) { diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index 1628c57415..05d0e85263 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -82,15 +82,7 @@ inline std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shar std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine = false); std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); -std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const std::shared_ptr<TSortDescription>& description); std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb); -std::vector<std::shared_ptr<arrow::RecordBatch>> MergeSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const std::shared_ptr<TSortDescription>& description, - size_t maxBatchRows); -std::vector<std::shared_ptr<arrow::RecordBatch>> SliceSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const std::shared_ptr<TSortDescription>& description, - size_t maxBatchRows = 0); std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<ui32>& sharding, ui32 numShards); diff --git a/ydb/core/formats/arrow/merging_sorted_input_stream.cpp b/ydb/core/formats/arrow/merging_sorted_input_stream.cpp deleted file mode 100644 index f25665901f..0000000000 --- a/ydb/core/formats/arrow/merging_sorted_input_stream.cpp +++ /dev/null @@ -1,303 +0,0 @@ -// The code in this file is based on original ClickHouse source code -// which is licensed under Apache license v2.0 -// See: https://github.com/ClickHouse/ClickHouse/ - -#include <queue> -#include "merging_sorted_input_stream.h" -#include "switch_type.h" -#include "size_calcer.h" - -namespace NKikimr::NArrow { - -class TRowsBuffer : public IRowsBuffer { -public: - using TBuilders = std::vector<std::unique_ptr<arrow::ArrayBuilder>>; - - static constexpr const size_t BUFFER_SIZE = 256; - - TRowsBuffer(TBuilders& columns, size_t maxRows) - : Columns(columns) - , MaxRows(maxRows) - { - Rows.reserve(BUFFER_SIZE); - } - - bool AddRow(const TSortCursor& cursor) override { - Rows.emplace_back(cursor->all_columns, cursor->getRow()); - if (Rows.size() >= BUFFER_SIZE) { - Flush(); - } - ++AddedRows; - return true; - } - - void Flush() override { - if (Rows.empty()) { - return; - } - for (size_t i = 0; i < Columns.size(); ++i) { - arrow::ArrayBuilder& builder = *Columns[i]; - for (auto& [srcColumn, rowPosition] : Rows) { - Y_ABORT_UNLESS(Append(builder, *srcColumn->at(i), rowPosition)); - } - } - Rows.clear(); - } - - bool Limit() const override { - return MaxRows && (AddedRows >= MaxRows); - } - - bool HasLimit() const override { - return MaxRows; - } - -private: - TBuilders& Columns; - std::vector<std::pair<const TArrayVec*, size_t>> Rows; - size_t MaxRows = 0; - size_t AddedRows = 0; -}; - -class TSlicedRowsBuffer : public IRowsBuffer { -public: - TSlicedRowsBuffer(size_t maxRows) - : MaxRows(maxRows) - {} - - bool AddRow(const TSortCursor& cursor) override { - if (!Batch) { - Batch = cursor->current_batch; - Offset = cursor->getRow(); - } - if (Batch.get() != cursor->current_batch.get()) { - // append from another batch - return false; - } else if (cursor->getRow() != (Offset + AddedRows)) { - // append from the same batch with data hole - return false; - } - ++AddedRows; - return true; - } - - void Flush() override { - } - - bool Limit() const override { - return MaxRows && (AddedRows >= MaxRows); - } - - bool HasLimit() const override { - return MaxRows; - } - - std::shared_ptr<arrow::RecordBatch> GetBatch() { - if (Batch) { - return Batch->Slice(Offset, AddedRows); - } - return {}; - } - -private: - std::shared_ptr<arrow::RecordBatch> Batch; - size_t Offset = 0; - size_t MaxRows = 0; - size_t AddedRows = 0; -}; - -TMergingSortedInputStream::TMergingSortedInputStream(const std::vector<IInputStream::TPtr>& inputs, - std::shared_ptr<TSortDescription> description, - size_t maxBatchRows, bool slice) - : Description(description) - , MaxBatchSize(maxBatchRows) - , SliceSources(slice) - , SourceBatches(inputs.size()) - , Cursors(inputs.size()) -{ - Children.insert(Children.end(), inputs.begin(), inputs.end()); - Header = Children.at(0)->Schema(); -} - -/// Read the first blocks, initialize the queue. -void TMergingSortedInputStream::Init() { - Y_ABORT_UNLESS(First); - First = false; - size_t totalRows = 0; - for (size_t i = 0; i < SourceBatches.size(); ++i) { - auto& batch = SourceBatches[i]; - if (batch) { - continue; - } - - batch = Children[i]->Read(); - if (!batch || batch->num_rows() == 0) { - continue; - } - - for (i32 i = 0; i < batch->num_columns(); ++i) { - ColumnSize[batch->column_name(i)] += NArrow::GetArrayDataSize(batch->column(i)); - } - - totalRows += batch->num_rows(); - Cursors[i] = TSortCursorImpl(batch, Description, i); - } - - ExpectedBatchSize = MaxBatchSize ? std::min(totalRows, MaxBatchSize) : totalRows; - if (MaxBatchSize && MaxBatchSize < totalRows) { - ColumnSize.clear(); - } - - Queue = TSortingHeap(Cursors, Description->NotNull); - - /// Let's check that all source blocks have the same structure. - for (const auto& batch : SourceBatches) { - if (batch) { - Y_DEBUG_ABORT_UNLESS(batch->schema()->Equals(*Header)); - } - } -} - -std::shared_ptr<arrow::RecordBatch> TMergingSortedInputStream::ReadImpl() { - if (Finished) { - return {}; - } - - if (Children.size() == 1 && !Description->Replace()) { - return Children[0]->Read(); - } - - if (First) { - Init(); - } - - if (SliceSources) { - Y_DEBUG_ABORT_UNLESS(!Description->Reverse); - TSlicedRowsBuffer rowsBuffer(MaxBatchSize); - Merge(rowsBuffer, Queue); - auto batch = rowsBuffer.GetBatch(); - Y_ABORT_UNLESS(batch); - if (!batch->num_rows()) { - Y_ABORT_UNLESS(Finished); - return {}; - } - return batch; - } else { - auto builders = NArrow::MakeBuilders(Header, ExpectedBatchSize, ColumnSize); - if (builders.empty()) { - return {}; - } - - Y_ABORT_UNLESS(builders.size() == (size_t)Header->num_fields()); - TRowsBuffer rowsBuffer(builders, MaxBatchSize); - Merge(rowsBuffer, Queue); - - auto arrays = NArrow::Finish(std::move(builders)); - Y_ABORT_UNLESS(arrays.size()); - if (!arrays[0]->length()) { - Y_ABORT_UNLESS(Finished); - return {}; - } - return arrow::RecordBatch::Make(Header, arrays[0]->length(), arrays); - } -} - -/// Get the next block from the corresponding source, if there is one. -void TMergingSortedInputStream::FetchNextBatch(const TSortCursor& current, TSortingHeap& queue) { - size_t order = current->order; - Y_ABORT_UNLESS(order < Cursors.size() && &Cursors[order] == current.Impl); - - while (true) { - SourceBatches[order] = Children[order]->Read(); - auto& batch = SourceBatches[order]; - - if (!batch) { - queue.RemoveTop(); - break; - } - - if (batch->num_rows()) { - Y_DEBUG_ABORT_UNLESS(batch->schema()->Equals(*Header)); - - Cursors[order].Reset(batch); - queue.ReplaceTop(TSortCursor(&Cursors[order], Description->NotNull)); - break; - } - } -} - -/// Take rows in required order and put them into `rowBuffer`, -/// while the number of rows are no more than `max_block_size` -template <bool replace, bool limit> -void TMergingSortedInputStream::MergeImpl(IRowsBuffer& rowsBuffer, TSortingHeap& queue) { - if constexpr (replace) { - if (!PrevKey && queue.IsValid()) { - auto current = queue.Current(); - PrevKey = std::make_shared<TReplaceKey>(current->replace_columns, current->getRow()); - if (!rowsBuffer.AddRow(current)) { - return; - } - // Do not get Next() for simplicity. Lead to a dup - } - } - - while (queue.IsValid()) { - if constexpr (limit) { - if (rowsBuffer.Limit()) { - return; - } - } - - auto current = queue.Current(); - - if constexpr (replace) { - TReplaceKey key(current->replace_columns, current->getRow()); - - if (key == *PrevKey) { - // do nothing - } else if (rowsBuffer.AddRow(current)) { - *PrevKey = key; - } else { - return; - } - } else { - if (!rowsBuffer.AddRow(current)) { - return; - } - } - - if (!current->isLast()) { - queue.Next(); - } else { - rowsBuffer.Flush(); - FetchNextBatch(current, queue); - } - } - - /// We have read all data. Ask children to cancel providing more data. - Cancel(); - Finished = true; -} - -void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue) { - const bool replace = Description->Replace(); - const bool limit = rowsBuffer.HasLimit(); - - if (replace) { - if (limit) { - MergeImpl<true, true>(rowsBuffer, queue); - } else { - MergeImpl<true, false>(rowsBuffer, queue); - } - } else { - if (limit) { - MergeImpl<false, true>(rowsBuffer, queue); - } else { - MergeImpl<false, false>(rowsBuffer, queue); - } - } - - rowsBuffer.Flush(); -} - -} diff --git a/ydb/core/formats/arrow/merging_sorted_input_stream.h b/ydb/core/formats/arrow/merging_sorted_input_stream.h deleted file mode 100644 index 2a3a2f7227..0000000000 --- a/ydb/core/formats/arrow/merging_sorted_input_stream.h +++ /dev/null @@ -1,54 +0,0 @@ -// The code in this file is based on original ClickHouse source code -// which is licensed under Apache license v2.0 -// See: https://github.com/ClickHouse/ClickHouse/ - -#pragma once -#include "input_stream.h" -#include "sort_cursor.h" - -namespace NKikimr::NArrow { - -struct IRowsBuffer { - virtual bool AddRow(const TSortCursor& cursor) = 0; - virtual void Flush() = 0; - virtual bool Limit() const = 0; - virtual bool HasLimit() const = 0; -}; - -/// Merges several sorted streams into one sorted stream. -class TMergingSortedInputStream : public IInputStream { -public: - TMergingSortedInputStream(const std::vector<IInputStream::TPtr>& inputs, - std::shared_ptr<TSortDescription> description, - size_t maxBatchRows, bool slice = false); - - std::shared_ptr<arrow::Schema> Schema() const override { return Header; } - -protected: - std::shared_ptr<arrow::RecordBatch> ReadImpl() override; - -private: - std::shared_ptr<arrow::Schema> Header; - std::shared_ptr<TSortDescription> Description; - const ui64 MaxBatchSize; - const bool SliceSources; - bool First = true; - bool Finished = false; - ui64 ExpectedBatchSize = 0; /// May be smaller or equal to max_block_size. To do 'reserve' for columns. - std::map<std::string, ui64> ColumnSize; - - std::vector<std::shared_ptr<arrow::RecordBatch>> SourceBatches; - std::shared_ptr<TReplaceKey> PrevKey; - - std::vector<TSortCursorImpl> Cursors; - TSortingHeap Queue; - - void Init(); - void FetchNextBatch(const TSortCursor& current, TSortingHeap& queue); - void Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue); - - template <bool replace, bool limit> - void MergeImpl(IRowsBuffer& rowsBuffer, TSortingHeap& queue); -}; - -} diff --git a/ydb/core/formats/arrow/one_batch_input_stream.h b/ydb/core/formats/arrow/one_batch_input_stream.h deleted file mode 100644 index 647e70a3f6..0000000000 --- a/ydb/core/formats/arrow/one_batch_input_stream.h +++ /dev/null @@ -1,36 +0,0 @@ -// The code in this file is based on original ClickHouse source code -// which is licensed under Apache license v2.0 -// See: https://github.com/ClickHouse/ClickHouse/ - -#pragma once -#include "input_stream.h" - -namespace NKikimr::NArrow { - -class TOneBatchInputStream : public IInputStream { -public: - explicit TOneBatchInputStream(std::shared_ptr<arrow::RecordBatch> batch) - : Batch(batch) - , Header(Batch->schema()) - {} - - std::shared_ptr<arrow::Schema> Schema() const override { - return Header; - } - -protected: - std::shared_ptr<arrow::RecordBatch> ReadImpl() override { - if (Batch) { - auto out = Batch; - Batch.reset(); - return out; - } - return {}; - } - -private: - std::shared_ptr<arrow::RecordBatch> Batch; - std::shared_ptr<arrow::Schema> Header; -}; - -} diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp index 4613e45bd0..710c07c5a5 100644 --- a/ydb/core/formats/arrow/reader/merger.cpp +++ b/ydb/core/formats/arrow/reader/merger.cpp @@ -194,7 +194,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort return result; } -bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { +void TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); while (SortHeap.Size()) { if (auto currentPosition = DrainCurrentPosition()) { @@ -202,7 +202,6 @@ bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { builder.AddRecord(*currentPosition); } } - return false; } std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() { diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h index 795f083d1e..1849e535a8 100644 --- a/ydb/core/formats/arrow/reader/merger.h +++ b/ydb/core/formats/arrow/reader/merger.h @@ -241,7 +241,7 @@ public: return !SortHeap.Size(); } - bool DrainAll(TRecordBatchBuilder& builder); + void DrainAll(TRecordBatchBuilder& builder); std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr); bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr); bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr); diff --git a/ydb/core/formats/arrow/sort_cursor.h b/ydb/core/formats/arrow/sort_cursor.h deleted file mode 100644 index f51f4145ae..0000000000 --- a/ydb/core/formats/arrow/sort_cursor.h +++ /dev/null @@ -1,262 +0,0 @@ -// The code in this file is based on original ClickHouse source code -// which is licensed under Apache license v2.0 -// See: https://github.com/ClickHouse/ClickHouse/ - -#pragma once -#include "replace_key.h" -#include "arrow_helpers.h" -#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> -#include <algorithm> - -namespace NKikimr::NArrow { - -/// Description of the sorting rule for several columns. -struct TSortDescription { - /// @note In case you have PK and snapshot column you should sort with {ASC PK, DESC snap} key and replase with PK - std::shared_ptr<arrow::Schema> SortingKey; - std::shared_ptr<arrow::Schema> ReplaceKey; /// Keep first visited (SortingKey ordered) of dups - std::vector<int> Directions; /// 1 - ascending, -1 - descending. - bool NotNull{false}; - bool Reverse{false}; // Read sources from bottom to top. With inversed Directions leads to DESC dst for ASC src - - TSortDescription() = default; - - TSortDescription(const std::shared_ptr<arrow::Schema>& sortingKey, - const std::shared_ptr<arrow::Schema>& replaceKey = {}) - : SortingKey(sortingKey) - , ReplaceKey(replaceKey) - , Directions(sortingKey->num_fields(), 1) - {} - - size_t Size() const { return SortingKey->num_fields(); } - int Direction(size_t pos) const { return Directions[pos]; } - bool Replace() const { return ReplaceKey.get(); } - - void Inverse() { - Reverse = !Reverse; - for (int& dir : Directions) { - dir *= -1; - } - } -}; - - -/// Cursor allows to compare rows in different batches. -/// Cursor moves inside single block. It is used in priority queue. -struct TSortCursorImpl { - std::shared_ptr<TArrayVec> sort_columns; - std::shared_ptr<TSortDescription> desc; - ui32 order = 0; // Number of cursor. It determines an order if comparing columns are equal. - // - std::shared_ptr<arrow::RecordBatch> current_batch; - const TArrayVec* all_columns; - std::shared_ptr<TArrayVec> replace_columns; - - TSortCursorImpl() = default; - - TSortCursorImpl(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<TSortDescription> desc_, ui32 order_ = 0) - : desc(desc_) - , order(order_) - { - Reset(batch); - } - - bool Empty() const { return Rows() == 0; } - size_t Rows() const { return (!all_columns || all_columns->empty()) ? 0 : all_columns->front()->length(); } - size_t LastRow() const { return Rows() - 1; } - - void Reset(std::shared_ptr<arrow::RecordBatch> batch) { - current_batch = batch; - auto rbSorting = ExtractColumns(batch, desc->SortingKey); - Y_ABORT_UNLESS(rbSorting); - sort_columns = std::make_shared<TArrayVec>(rbSorting->columns()); - all_columns = &batch->columns(); - if (desc->ReplaceKey) { - auto rbReplace = ExtractColumns(batch, desc->ReplaceKey); - Y_ABORT_UNLESS(rbReplace); - replace_columns = std::make_shared<TArrayVec>(rbReplace->columns()); - } - pos = 0; - } - - size_t getRow() const { - return desc->Reverse ? (Rows() - pos - 1) : pos; - } - - bool isFirst() const { return pos == 0; } - bool isLast() const { return pos + 1 >= Rows(); } - bool isValid() const { return pos < Rows(); } - void next() { ++pos; } - -private: - size_t pos{0}; -}; - - -struct TSortCursor { - TSortCursorImpl* Impl; - bool NotNull; - - TSortCursor(TSortCursorImpl* impl, bool notNull) - : Impl(impl) - , NotNull(notNull) - {} - - TSortCursorImpl* operator-> () { return Impl; } - const TSortCursorImpl* operator-> () const { return Impl; } - - bool Greater(const TSortCursor& rhs) const { - return GreaterAt(rhs, Impl->getRow(), rhs.Impl->getRow()); - } - - /// Inverted so that the priority queue elements are removed in ascending order. - bool operator < (const TSortCursor& rhs) const { - return Greater(rhs); - } - -private: - /// The specified row of this cursor is greater than the specified row of another cursor. - bool GreaterAt(const TSortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const { - TRawReplaceKey left(Impl->sort_columns.get(), lhs_pos); - TRawReplaceKey right(rhs.Impl->sort_columns.get(), rhs_pos); - - if (NotNull) { - for (size_t i = 0; i < Impl->desc->Size(); ++i) { - auto cmp = left.CompareColumnValueNotNull(i, right, i); - int res = Impl->desc->Direction(i) * (std::is_eq(cmp) ? 0 : (std::is_lt(cmp) ? -1 : 1)); - if (res > 0) - return true; - if (res < 0) - return false; - } - } else { - for (size_t i = 0; i < Impl->desc->Size(); ++i) { - auto cmp = left.CompareColumnValue(i, right, i); - int res = Impl->desc->Direction(i) * (std::is_eq(cmp) ? 0 : (std::is_lt(cmp) ? -1 : 1)); - if (res > 0) - return true; - if (res < 0) - return false; - } - } - return Impl->order > rhs.Impl->order; - } -}; - - -/// Allows to fetch data from multiple sort cursors in sorted order (merging sorted data stream). -/// TODO: Replace with "Loser Tree", see https://en.wikipedia.org/wiki/K-way_merge_algorithm -class TSortingHeap { -public: - TSortingHeap() = default; - - template <typename TCursors> - TSortingHeap(TCursors& cursors, bool notNull) { - Queue.reserve(cursors.size()); - for (auto& cur : cursors) { - if (!cur.Empty()) { - Queue.emplace_back(TSortCursor(&cur, notNull)); - } - } - std::make_heap(Queue.begin(), Queue.end()); - } - - bool IsValid() const { return !Queue.empty(); } - TSortCursor& Current() { return Queue.front(); } - size_t Size() { return Queue.size(); } - TSortCursor& NextChild() { return Queue[NextChildIndex()]; } - - void Next() { - Y_ABORT_UNLESS(IsValid()); - - if (!Current()->isLast()) { - Current()->next(); - UpdateTop(); - } - else { - RemoveTop(); - } - } - - void ReplaceTop(TSortCursor&& new_top) { - Current() = new_top; - UpdateTop(); - } - - void RemoveTop() { - std::pop_heap(Queue.begin(), Queue.end()); - Queue.pop_back(); - NextIdx = 0; - } - - void Push(TSortCursor&& cursor) { - Queue.emplace_back(cursor); - std::push_heap(Queue.begin(), Queue.end()); - NextIdx = 0; - } - -private: - std::vector<TSortCursor> Queue; - /// Cache comparison between first and second child if the order in queue has not been changed. - size_t NextIdx = 0; - - size_t NextChildIndex() { - if (NextIdx == 0) { - NextIdx = 1; - if (Queue.size() > 2 && Queue[1] < Queue[2]) { - ++NextIdx; - } - } - - return NextIdx; - } - - /// This is adapted version of the function __sift_down from libc++. - /// Why cannot simply use std::priority_queue? - /// - because it doesn't support updating the top element and requires pop and push instead. - /// Also look at "Boost.Heap" library. - void UpdateTop() { - size_t size = Queue.size(); - if (size < 2) - return; - - auto begin = Queue.begin(); - - size_t child_idx = NextChildIndex(); - auto child_it = begin + child_idx; - - /// Check if we are in order. - if (*child_it < *begin) - return; - - NextIdx = 0; - - auto curr_it = begin; - auto top(std::move(*begin)); - do { - /// We are not in heap-order, swap the parent with it's largest child. - *curr_it = std::move(*child_it); - curr_it = child_it; - - // recompute the child based off of the updated parent - child_idx = 2 * child_idx + 1; - - if (child_idx >= size) - break; - - child_it = begin + child_idx; - - if ((child_idx + 1) < size && *child_it < *(child_it + 1)) - { - /// Right child exists and is greater than left child. - ++child_it; - ++child_idx; - } - - /// Check if we are in order. - } while (!(*child_it < top)); - *curr_it = std::move(top); - } -}; - -} diff --git a/ydb/core/formats/arrow/ut_arrow.cpp b/ydb/core/formats/arrow/ut/ut_arrow.cpp index 2ab400bf49..da620d70fa 100644 --- a/ydb/core/formats/arrow/ut_arrow.cpp +++ b/ydb/core/formats/arrow/ut/ut_arrow.cpp @@ -1,9 +1,10 @@ -#include "arrow_batch_builder.h" -#include "arrow_helpers.h" -#include "converter.h" -#include "one_batch_input_stream.h" -#include "merging_sorted_input_stream.h" -#include "arrow_filter.h" +#include <ydb/core/formats/arrow/arrow_batch_builder.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/converter.h> +#include <ydb/core/formats/arrow/arrow_filter.h> +#include <ydb/core/formats/arrow/permutations.h> +#include <ydb/core/formats/arrow/reader/merger.h> +#include <ydb/core/formats/arrow/reader/result_builder.h> #include <ydb/library/binary_json/write.h> #include <library/cpp/testing/unittest/registar.h> @@ -480,13 +481,6 @@ ui32 RestoreValue(ui32 a, ui32 b, ui32 c) { return ui32(a) * 100 + b * 10 + c; } -ui32 RestoreOne(const std::shared_ptr<arrow::RecordBatch>& batch, int pos) { - auto arrA = std::static_pointer_cast<arrow::Int8Array>(batch->GetColumnByName("i8")); - auto arrB = std::static_pointer_cast<arrow::Int16Array>(batch->GetColumnByName("i16")); - auto arrC = std::static_pointer_cast<arrow::Int32Array>(batch->GetColumnByName("i32")); - return RestoreValue(arrA->Value(pos), arrB->Value(pos), arrC->Value(pos)); -} - bool CheckSorted1000(const std::shared_ptr<arrow::RecordBatch>& batch, bool desc = false) { auto arrA = std::static_pointer_cast<arrow::Int8Array>(batch->GetColumnByName("i8")); auto arrB = std::static_pointer_cast<arrow::Int16Array>(batch->GetColumnByName("i16")); @@ -663,38 +657,26 @@ Y_UNIT_TEST_SUITE(ArrowTest) { UNIT_ASSERT(CheckSorted1000(batch)); std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.push_back(batch->Slice(0, 100)); // 0..100 - batches.push_back(batch->Slice(100, 200)); // 100..300 - batches.push_back(batch->Slice(200, 400)); // 200..600 - batches.push_back(batch->Slice(500, 50)); // 500..550 - batches.push_back(batch->Slice(600, 1)); // 600..601 - - auto descr = std::make_shared<NArrow::TSortDescription>(batch->schema()); - descr->NotNull = true; - - std::vector<std::shared_ptr<arrow::RecordBatch>> sorted; - { // maxBatchSize = 500, no limit - std::vector<NArrow::IInputStream::TPtr> streams; - for (auto& batch : batches) { - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } + batches.push_back(batch->Slice(0, 100)); // 0..100 +100 + batches.push_back(batch->Slice(100, 200)); // 100..300 +200 + batches.push_back(batch->Slice(200, 400)); // 200..600 +300 + batches.push_back(batch->Slice(500, 50)); // 500..550 +50 + batches.push_back(batch->Slice(600, 1)); // 600..601 +1 - NArrow::IInputStream::TPtr mergeStream = - std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 500); - - while (auto batch = mergeStream->Read()) { - sorted.emplace_back(batch); + std::shared_ptr<arrow::RecordBatch> sorted; + { + NArrow::NMerger::TRecordBatchBuilder builder(batch->schema()->fields()); + const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()}; + auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), false, vColumns); + for (auto&& i : batches) { + merger->AddSource(i, nullptr); } + merger->DrainAll(builder); + sorted = builder.Finalize(); } - - UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(sorted[0]->num_rows(), 500); - UNIT_ASSERT_VALUES_EQUAL(sorted[1]->num_rows(), 251); - UNIT_ASSERT(CheckSorted(sorted[0])); - UNIT_ASSERT(CheckSorted(sorted[1])); - UNIT_ASSERT(NArrow::IsSorted(sorted[0], descr->SortingKey)); - UNIT_ASSERT(NArrow::IsSorted(sorted[1], descr->SortingKey)); - UNIT_ASSERT(RestoreOne(sorted[0], 499) <= RestoreOne(sorted[1], 0)); + UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 601); + UNIT_ASSERT(NArrow::IsSorted(sorted, batch->schema())); + UNIT_ASSERT(CheckSorted(sorted)); } Y_UNIT_TEST(MergingSortedInputStreamReversed) { @@ -702,86 +684,29 @@ Y_UNIT_TEST_SUITE(ArrowTest) { UNIT_ASSERT(CheckSorted1000(batch)); std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.push_back(batch->Slice(0, 100)); // 0..100 - batches.push_back(batch->Slice(100, 200)); // 100..300 - batches.push_back(batch->Slice(200, 400)); // 200..600 - batches.push_back(batch->Slice(500, 50)); // 500..550 - batches.push_back(batch->Slice(600, 1)); // 600..601 - - auto descr = std::make_shared<NArrow::TSortDescription>(batch->schema()); - descr->NotNull = true; - descr->Inverse(); - - std::vector<std::shared_ptr<arrow::RecordBatch>> sorted; - { // maxBatchSize = 500, no limit - std::vector<NArrow::IInputStream::TPtr> streams; - for (auto& batch : batches) { - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } - - NArrow::IInputStream::TPtr mergeStream = - std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 500); + batches.push_back(batch->Slice(0, 100)); // 0..100 +100 + batches.push_back(batch->Slice(100, 200)); // 100..300 +200 + batches.push_back(batch->Slice(200, 400)); // 200..600 +300 + batches.push_back(batch->Slice(500, 50)); // 500..550 +50 + batches.push_back(batch->Slice(600, 1)); // 600..601 +1 - while (auto batch = mergeStream->Read()) { - sorted.emplace_back(batch); - } - } - - UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(sorted[0]->num_rows(), 500); - UNIT_ASSERT_VALUES_EQUAL(sorted[1]->num_rows(), 251); - UNIT_ASSERT(CheckSorted(sorted[0], true)); - UNIT_ASSERT(CheckSorted(sorted[1], true)); - UNIT_ASSERT(NArrow::IsSorted(sorted[0], descr->SortingKey, true)); - UNIT_ASSERT(NArrow::IsSorted(sorted[1], descr->SortingKey, true)); - UNIT_ASSERT(RestoreOne(sorted[0], 499) >= RestoreOne(sorted[1], 0)); - } - - Y_UNIT_TEST(MergingSortedInputStreamReplace) { - std::shared_ptr<arrow::RecordBatch> batch = ExtractBatch(MakeTable1000()); - UNIT_ASSERT(CheckSorted1000(batch)); - - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.push_back(AddSnapColumn(batch->Slice(0, 400), 0)); - batches.push_back(AddSnapColumn(batch->Slice(200, 400), 1)); - batches.push_back(AddSnapColumn(batch->Slice(400, 400), 2)); - batches.push_back(AddSnapColumn(batch->Slice(600, 400), 3)); - - auto sortingKey = batches[0]->schema(); - auto replaceKey = batch->schema(); - - auto descr = std::make_shared<NArrow::TSortDescription>(sortingKey, replaceKey); - descr->Directions.back() = -1; // greater snapshot first - descr->NotNull = true; - - std::vector<std::shared_ptr<arrow::RecordBatch>> sorted; + std::shared_ptr<arrow::RecordBatch> sorted; { - std::vector<NArrow::IInputStream::TPtr> streams; - for (auto& batch : batches) { - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } - - NArrow::IInputStream::TPtr mergeStream = - std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 5000); - - while (auto batch = mergeStream->Read()) { - sorted.emplace_back(batch); + NArrow::NMerger::TRecordBatchBuilder builder(batch->schema()->fields()); + const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()}; + auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), true, vColumns); + for (auto&& i : batches) { + merger->AddSource(i, nullptr); } + merger->DrainAll(builder); + sorted = builder.Finalize(); } - - UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(sorted[0]->num_rows(), 1000); - UNIT_ASSERT(CheckSorted1000(sorted[0])); - UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted[0], descr->SortingKey)); - - auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted[0]->GetColumnByName("snap"))); - UNIT_ASSERT_VALUES_EQUAL(counts[0], 200); - UNIT_ASSERT_VALUES_EQUAL(counts[1], 200); - UNIT_ASSERT_VALUES_EQUAL(counts[2], 200); - UNIT_ASSERT_VALUES_EQUAL(counts[3], 400); + UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 601); + UNIT_ASSERT(NArrow::IsSorted(sorted, batch->schema(), true)); + UNIT_ASSERT(CheckSorted(sorted, true)); } - Y_UNIT_TEST(MergingSortedInputStreamReplaceReversed) { + Y_UNIT_TEST(MergingSortedInputStreamReplace) { std::shared_ptr<arrow::RecordBatch> batch = ExtractBatch(MakeTable1000()); UNIT_ASSERT(CheckSorted1000(batch)); @@ -791,35 +716,23 @@ Y_UNIT_TEST_SUITE(ArrowTest) { batches.push_back(AddSnapColumn(batch->Slice(400, 400), 2)); batches.push_back(AddSnapColumn(batch->Slice(600, 400), 3)); - auto sortingKey = batches[0]->schema(); - auto replaceKey = batch->schema(); - - auto descr = std::make_shared<NArrow::TSortDescription>(sortingKey, replaceKey); - descr->Directions.back() = 1; // greater snapshot last - descr->NotNull = true; - descr->Inverse(); - - std::vector<std::shared_ptr<arrow::RecordBatch>> sorted; + std::shared_ptr<arrow::RecordBatch> sorted; { - std::vector<NArrow::IInputStream::TPtr> streams; - for (auto& batch : batches) { - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } - - NArrow::IInputStream::TPtr mergeStream = - std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 5000); - - while (auto batch = mergeStream->Read()) { - sorted.emplace_back(batch); + NArrow::NMerger::TRecordBatchBuilder builder(batches[0]->schema()->fields()); + const std::vector<std::string> vColumns = {"snap"}; + auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns); + for (auto&& i : batches) { + merger->AddSource(i, nullptr); } + merger->DrainAll(builder); + sorted = builder.Finalize(); } - UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(sorted[0]->num_rows(), 1000); - UNIT_ASSERT(CheckSorted1000(sorted[0], true)); - UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted[0], descr->SortingKey, true)); + UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 1000); + UNIT_ASSERT(CheckSorted1000(sorted)); + UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted, batch->schema())); - auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted[0]->GetColumnByName("snap"))); + auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted->GetColumnByName("snap"))); UNIT_ASSERT_VALUES_EQUAL(counts[0], 200); UNIT_ASSERT_VALUES_EQUAL(counts[1], 200); UNIT_ASSERT_VALUES_EQUAL(counts[2], 200); diff --git a/ydb/core/formats/arrow/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp index 2ab52ed29b..d7f447a1b2 100644 --- a/ydb/core/formats/arrow/ut_program_step.cpp +++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp @@ -2,14 +2,16 @@ #include <memory> #include <vector> +#include <ydb/core/formats/arrow/custom_registry.h> +#include <ydb/core/formats/arrow/program.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/library/arrow_kernels/ut_common.h> + +#include <library/cpp/testing/unittest/registar.h> + #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/exec.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/type_fwd.h> -#include <library/cpp/testing/unittest/registar.h> -#include <ydb/library/arrow_kernels/ut_common.h> -#include "custom_registry.h" -#include "program.h" -#include "arrow_helpers.h" using namespace NKikimr::NArrow; using namespace NKikimr::NSsa; diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make index 146c5bae1e..d55ccb5799 100644 --- a/ydb/core/formats/arrow/ya.make +++ b/ydb/core/formats/arrow/ya.make @@ -45,14 +45,10 @@ SRCS( converter.h custom_registry.cpp input_stream.h - merging_sorted_input_stream.cpp - merging_sorted_input_stream.h - one_batch_input_stream.h permutations.cpp program.cpp replace_key.cpp size_calcer.cpp - sort_cursor.h ssa_program_optimizer.cpp special_keys.cpp simple_arrays_cache.cpp diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 4e36b4722f..f5192eb7d8 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -15,9 +15,6 @@ #include <ydb/core/tx/columnshard/data_locks/manager/manager.h> #include <ydb/core/tx/tiering/manager.h> -#include <ydb/core/formats/arrow/one_batch_input_stream.h> -#include <ydb/core/formats/arrow/merging_sorted_input_stream.h> - #include <ydb/library/conclusion/status.h> #include <library/cpp/time_provider/time_provider.h> diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 5e14af96cd..c91f2d0d13 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -5,7 +5,6 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/formats/arrow/arrow_batch_builder.h> -#include <ydb/core/formats/arrow/sort_cursor.h> #include <ydb/core/formats/arrow/serializer/native.h> #include <ydb/core/formats/arrow/transformer/dictionary.h> #include <ydb/core/sys_view/common/schema.h> @@ -264,32 +263,6 @@ void TIndexInfo::SetAllKeys(const std::shared_ptr<IStoragesManager>& operators) } } -std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortDescription() const { - if (GetPrimaryKey()) { - auto key = ExtendedKey; // Sort with extended key, greater snapshot first - Y_ABORT_UNLESS(key && key->num_fields() > 2); - auto description = std::make_shared<NArrow::TSortDescription>(key); - description->Directions[key->num_fields() - 1] = -1; - description->Directions[key->num_fields() - 2] = -1; - description->NotNull = true; // TODO - return description; - } - return {}; -} - -std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortReplaceDescription() const { - if (GetPrimaryKey()) { - auto key = ExtendedKey; // Sort with extended key, greater snapshot first - Y_ABORT_UNLESS(key && key->num_fields() > 2); - auto description = std::make_shared<NArrow::TSortDescription>(key, GetPrimaryKey()); - description->Directions[key->num_fields() - 1] = -1; - description->Directions[key->num_fields() - 2] = -1; - description->NotNull = true; // TODO - return description; - } - return {}; -} - TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId) const { auto it = ColumnFeatures.find(columnId); AFL_VERIFY(it != ColumnFeatures.end()); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 39ce02b372..a5c6381425 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -24,10 +24,6 @@ namespace arrow { class Schema; } -namespace NKikimr::NArrow { - struct TSortDescription; -} - namespace NKikimr::NOlap { class TPortionInfoWithBlobs; @@ -304,9 +300,6 @@ public: bool IsSorted() const { return true; } bool IsSortedColumn(const ui32 columnId) const { return GetPKFirstColumnId() == columnId; } - std::shared_ptr<NArrow::TSortDescription> SortDescription() const; - std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const; - static const std::set<ui32>& GetSpecialColumnIdsSet() { static const std::set<ui32> result(GetSpecialColumnIds().begin(), GetSpecialColumnIds().end()); return result; |