aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-04-03 21:50:17 +0300
committerGitHub <noreply@github.com>2024-04-03 21:50:17 +0300
commitd42a420408052ce6f86e52b849855343b309850e (patch)
tree6eb006cdf88cf7f0e9d6e1858be25658a66c90d6
parentab81c605a9e051777b60961be2e95705b713f6a5 (diff)
downloadydb-d42a420408052ce6f86e52b849855343b309850e.tar.gz
clean legacy code (#3450)
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp64
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.h8
-rw-r--r--ydb/core/formats/arrow/merging_sorted_input_stream.cpp303
-rw-r--r--ydb/core/formats/arrow/merging_sorted_input_stream.h54
-rw-r--r--ydb/core/formats/arrow/one_batch_input_stream.h36
-rw-r--r--ydb/core/formats/arrow/reader/merger.cpp3
-rw-r--r--ydb/core/formats/arrow/reader/merger.h2
-rw-r--r--ydb/core/formats/arrow/sort_cursor.h262
-rw-r--r--ydb/core/formats/arrow/ut/ut_arrow.cpp (renamed from ydb/core/formats/arrow/ut_arrow.cpp)193
-rw-r--r--ydb/core/formats/arrow/ut/ut_program_step.cpp (renamed from ydb/core/formats/arrow/ut_program_step.cpp)12
-rw-r--r--ydb/core/formats/arrow/ya.make4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp27
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h7
14 files changed, 62 insertions, 916 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index e05610423e..aaf8cbb01a 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -1,8 +1,6 @@
#include "arrow_helpers.h"
#include "switch_type.h"
-#include "one_batch_input_stream.h"
#include "common/validation.h"
-#include "merging_sorted_input_stream.h"
#include "permutations.h"
#include "common/adapter.h"
#include "serializer/native.h"
@@ -339,68 +337,6 @@ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>&
return arrow::RecordBatch::Make(table->schema(), table->num_rows(), columns);
}
-std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const std::shared_ptr<TSortDescription>& description) {
- std::vector<NArrow::IInputStream::TPtr> streams;
- for (auto& batch : batches) {
- streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch));
- }
-
- auto mergeStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, Max<ui64>());
- std::shared_ptr<arrow::RecordBatch> batch = mergeStream->Read();
- Y_ABORT_UNLESS(!mergeStream->Read());
- return batch;
-}
-
-std::vector<std::shared_ptr<arrow::RecordBatch>> MergeSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const std::shared_ptr<TSortDescription>& description,
- size_t maxBatchRows) {
- Y_ABORT_UNLESS(maxBatchRows);
- ui64 numRows = 0;
- std::vector<NArrow::IInputStream::TPtr> streams;
- streams.reserve(batches.size());
- for (auto& batch : batches) {
- if (batch->num_rows()) {
- numRows += batch->num_rows();
- streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch));
- }
- }
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> out;
- out.reserve(numRows / maxBatchRows + 1);
-
- auto mergeStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, maxBatchRows);
- while (std::shared_ptr<arrow::RecordBatch> batch = mergeStream->Read()) {
- Y_ABORT_UNLESS(batch->num_rows());
- out.push_back(batch);
- }
- return out;
-}
-
-std::vector<std::shared_ptr<arrow::RecordBatch>> SliceSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const std::shared_ptr<TSortDescription>& description,
- size_t maxBatchRows) {
- Y_ABORT_UNLESS(!description->Reverse);
-
- std::vector<NArrow::IInputStream::TPtr> streams;
- streams.reserve(batches.size());
- for (auto& batch : batches) {
- if (batch->num_rows()) {
- streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch));
- }
- }
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> out;
- out.reserve(streams.size());
-
- auto dedupStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, maxBatchRows, true);
- while (std::shared_ptr<arrow::RecordBatch> batch = dedupStream->Read()) {
- Y_ABORT_UNLESS(batch->num_rows());
- out.push_back(batch);
- }
- return out;
-}
-
// Check if the permutation doesn't reorder anything
bool IsTrivial(const arrow::UInt64Array& permutation, const ui64 originalLength) {
if ((ui64)permutation.length() != originalLength) {
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index 1628c57415..05d0e85263 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -82,15 +82,7 @@ inline std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shar
std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine = false);
std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
-std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const std::shared_ptr<TSortDescription>& description);
std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb);
-std::vector<std::shared_ptr<arrow::RecordBatch>> MergeSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const std::shared_ptr<TSortDescription>& description,
- size_t maxBatchRows);
-std::vector<std::shared_ptr<arrow::RecordBatch>> SliceSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const std::shared_ptr<TSortDescription>& description,
- size_t maxBatchRows = 0);
std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::vector<ui32>& sharding,
ui32 numShards);
diff --git a/ydb/core/formats/arrow/merging_sorted_input_stream.cpp b/ydb/core/formats/arrow/merging_sorted_input_stream.cpp
deleted file mode 100644
index f25665901f..0000000000
--- a/ydb/core/formats/arrow/merging_sorted_input_stream.cpp
+++ /dev/null
@@ -1,303 +0,0 @@
-// The code in this file is based on original ClickHouse source code
-// which is licensed under Apache license v2.0
-// See: https://github.com/ClickHouse/ClickHouse/
-
-#include <queue>
-#include "merging_sorted_input_stream.h"
-#include "switch_type.h"
-#include "size_calcer.h"
-
-namespace NKikimr::NArrow {
-
-class TRowsBuffer : public IRowsBuffer {
-public:
- using TBuilders = std::vector<std::unique_ptr<arrow::ArrayBuilder>>;
-
- static constexpr const size_t BUFFER_SIZE = 256;
-
- TRowsBuffer(TBuilders& columns, size_t maxRows)
- : Columns(columns)
- , MaxRows(maxRows)
- {
- Rows.reserve(BUFFER_SIZE);
- }
-
- bool AddRow(const TSortCursor& cursor) override {
- Rows.emplace_back(cursor->all_columns, cursor->getRow());
- if (Rows.size() >= BUFFER_SIZE) {
- Flush();
- }
- ++AddedRows;
- return true;
- }
-
- void Flush() override {
- if (Rows.empty()) {
- return;
- }
- for (size_t i = 0; i < Columns.size(); ++i) {
- arrow::ArrayBuilder& builder = *Columns[i];
- for (auto& [srcColumn, rowPosition] : Rows) {
- Y_ABORT_UNLESS(Append(builder, *srcColumn->at(i), rowPosition));
- }
- }
- Rows.clear();
- }
-
- bool Limit() const override {
- return MaxRows && (AddedRows >= MaxRows);
- }
-
- bool HasLimit() const override {
- return MaxRows;
- }
-
-private:
- TBuilders& Columns;
- std::vector<std::pair<const TArrayVec*, size_t>> Rows;
- size_t MaxRows = 0;
- size_t AddedRows = 0;
-};
-
-class TSlicedRowsBuffer : public IRowsBuffer {
-public:
- TSlicedRowsBuffer(size_t maxRows)
- : MaxRows(maxRows)
- {}
-
- bool AddRow(const TSortCursor& cursor) override {
- if (!Batch) {
- Batch = cursor->current_batch;
- Offset = cursor->getRow();
- }
- if (Batch.get() != cursor->current_batch.get()) {
- // append from another batch
- return false;
- } else if (cursor->getRow() != (Offset + AddedRows)) {
- // append from the same batch with data hole
- return false;
- }
- ++AddedRows;
- return true;
- }
-
- void Flush() override {
- }
-
- bool Limit() const override {
- return MaxRows && (AddedRows >= MaxRows);
- }
-
- bool HasLimit() const override {
- return MaxRows;
- }
-
- std::shared_ptr<arrow::RecordBatch> GetBatch() {
- if (Batch) {
- return Batch->Slice(Offset, AddedRows);
- }
- return {};
- }
-
-private:
- std::shared_ptr<arrow::RecordBatch> Batch;
- size_t Offset = 0;
- size_t MaxRows = 0;
- size_t AddedRows = 0;
-};
-
-TMergingSortedInputStream::TMergingSortedInputStream(const std::vector<IInputStream::TPtr>& inputs,
- std::shared_ptr<TSortDescription> description,
- size_t maxBatchRows, bool slice)
- : Description(description)
- , MaxBatchSize(maxBatchRows)
- , SliceSources(slice)
- , SourceBatches(inputs.size())
- , Cursors(inputs.size())
-{
- Children.insert(Children.end(), inputs.begin(), inputs.end());
- Header = Children.at(0)->Schema();
-}
-
-/// Read the first blocks, initialize the queue.
-void TMergingSortedInputStream::Init() {
- Y_ABORT_UNLESS(First);
- First = false;
- size_t totalRows = 0;
- for (size_t i = 0; i < SourceBatches.size(); ++i) {
- auto& batch = SourceBatches[i];
- if (batch) {
- continue;
- }
-
- batch = Children[i]->Read();
- if (!batch || batch->num_rows() == 0) {
- continue;
- }
-
- for (i32 i = 0; i < batch->num_columns(); ++i) {
- ColumnSize[batch->column_name(i)] += NArrow::GetArrayDataSize(batch->column(i));
- }
-
- totalRows += batch->num_rows();
- Cursors[i] = TSortCursorImpl(batch, Description, i);
- }
-
- ExpectedBatchSize = MaxBatchSize ? std::min(totalRows, MaxBatchSize) : totalRows;
- if (MaxBatchSize && MaxBatchSize < totalRows) {
- ColumnSize.clear();
- }
-
- Queue = TSortingHeap(Cursors, Description->NotNull);
-
- /// Let's check that all source blocks have the same structure.
- for (const auto& batch : SourceBatches) {
- if (batch) {
- Y_DEBUG_ABORT_UNLESS(batch->schema()->Equals(*Header));
- }
- }
-}
-
-std::shared_ptr<arrow::RecordBatch> TMergingSortedInputStream::ReadImpl() {
- if (Finished) {
- return {};
- }
-
- if (Children.size() == 1 && !Description->Replace()) {
- return Children[0]->Read();
- }
-
- if (First) {
- Init();
- }
-
- if (SliceSources) {
- Y_DEBUG_ABORT_UNLESS(!Description->Reverse);
- TSlicedRowsBuffer rowsBuffer(MaxBatchSize);
- Merge(rowsBuffer, Queue);
- auto batch = rowsBuffer.GetBatch();
- Y_ABORT_UNLESS(batch);
- if (!batch->num_rows()) {
- Y_ABORT_UNLESS(Finished);
- return {};
- }
- return batch;
- } else {
- auto builders = NArrow::MakeBuilders(Header, ExpectedBatchSize, ColumnSize);
- if (builders.empty()) {
- return {};
- }
-
- Y_ABORT_UNLESS(builders.size() == (size_t)Header->num_fields());
- TRowsBuffer rowsBuffer(builders, MaxBatchSize);
- Merge(rowsBuffer, Queue);
-
- auto arrays = NArrow::Finish(std::move(builders));
- Y_ABORT_UNLESS(arrays.size());
- if (!arrays[0]->length()) {
- Y_ABORT_UNLESS(Finished);
- return {};
- }
- return arrow::RecordBatch::Make(Header, arrays[0]->length(), arrays);
- }
-}
-
-/// Get the next block from the corresponding source, if there is one.
-void TMergingSortedInputStream::FetchNextBatch(const TSortCursor& current, TSortingHeap& queue) {
- size_t order = current->order;
- Y_ABORT_UNLESS(order < Cursors.size() && &Cursors[order] == current.Impl);
-
- while (true) {
- SourceBatches[order] = Children[order]->Read();
- auto& batch = SourceBatches[order];
-
- if (!batch) {
- queue.RemoveTop();
- break;
- }
-
- if (batch->num_rows()) {
- Y_DEBUG_ABORT_UNLESS(batch->schema()->Equals(*Header));
-
- Cursors[order].Reset(batch);
- queue.ReplaceTop(TSortCursor(&Cursors[order], Description->NotNull));
- break;
- }
- }
-}
-
-/// Take rows in required order and put them into `rowBuffer`,
-/// while the number of rows are no more than `max_block_size`
-template <bool replace, bool limit>
-void TMergingSortedInputStream::MergeImpl(IRowsBuffer& rowsBuffer, TSortingHeap& queue) {
- if constexpr (replace) {
- if (!PrevKey && queue.IsValid()) {
- auto current = queue.Current();
- PrevKey = std::make_shared<TReplaceKey>(current->replace_columns, current->getRow());
- if (!rowsBuffer.AddRow(current)) {
- return;
- }
- // Do not get Next() for simplicity. Lead to a dup
- }
- }
-
- while (queue.IsValid()) {
- if constexpr (limit) {
- if (rowsBuffer.Limit()) {
- return;
- }
- }
-
- auto current = queue.Current();
-
- if constexpr (replace) {
- TReplaceKey key(current->replace_columns, current->getRow());
-
- if (key == *PrevKey) {
- // do nothing
- } else if (rowsBuffer.AddRow(current)) {
- *PrevKey = key;
- } else {
- return;
- }
- } else {
- if (!rowsBuffer.AddRow(current)) {
- return;
- }
- }
-
- if (!current->isLast()) {
- queue.Next();
- } else {
- rowsBuffer.Flush();
- FetchNextBatch(current, queue);
- }
- }
-
- /// We have read all data. Ask children to cancel providing more data.
- Cancel();
- Finished = true;
-}
-
-void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue) {
- const bool replace = Description->Replace();
- const bool limit = rowsBuffer.HasLimit();
-
- if (replace) {
- if (limit) {
- MergeImpl<true, true>(rowsBuffer, queue);
- } else {
- MergeImpl<true, false>(rowsBuffer, queue);
- }
- } else {
- if (limit) {
- MergeImpl<false, true>(rowsBuffer, queue);
- } else {
- MergeImpl<false, false>(rowsBuffer, queue);
- }
- }
-
- rowsBuffer.Flush();
-}
-
-}
diff --git a/ydb/core/formats/arrow/merging_sorted_input_stream.h b/ydb/core/formats/arrow/merging_sorted_input_stream.h
deleted file mode 100644
index 2a3a2f7227..0000000000
--- a/ydb/core/formats/arrow/merging_sorted_input_stream.h
+++ /dev/null
@@ -1,54 +0,0 @@
-// The code in this file is based on original ClickHouse source code
-// which is licensed under Apache license v2.0
-// See: https://github.com/ClickHouse/ClickHouse/
-
-#pragma once
-#include "input_stream.h"
-#include "sort_cursor.h"
-
-namespace NKikimr::NArrow {
-
-struct IRowsBuffer {
- virtual bool AddRow(const TSortCursor& cursor) = 0;
- virtual void Flush() = 0;
- virtual bool Limit() const = 0;
- virtual bool HasLimit() const = 0;
-};
-
-/// Merges several sorted streams into one sorted stream.
-class TMergingSortedInputStream : public IInputStream {
-public:
- TMergingSortedInputStream(const std::vector<IInputStream::TPtr>& inputs,
- std::shared_ptr<TSortDescription> description,
- size_t maxBatchRows, bool slice = false);
-
- std::shared_ptr<arrow::Schema> Schema() const override { return Header; }
-
-protected:
- std::shared_ptr<arrow::RecordBatch> ReadImpl() override;
-
-private:
- std::shared_ptr<arrow::Schema> Header;
- std::shared_ptr<TSortDescription> Description;
- const ui64 MaxBatchSize;
- const bool SliceSources;
- bool First = true;
- bool Finished = false;
- ui64 ExpectedBatchSize = 0; /// May be smaller or equal to max_block_size. To do 'reserve' for columns.
- std::map<std::string, ui64> ColumnSize;
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> SourceBatches;
- std::shared_ptr<TReplaceKey> PrevKey;
-
- std::vector<TSortCursorImpl> Cursors;
- TSortingHeap Queue;
-
- void Init();
- void FetchNextBatch(const TSortCursor& current, TSortingHeap& queue);
- void Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue);
-
- template <bool replace, bool limit>
- void MergeImpl(IRowsBuffer& rowsBuffer, TSortingHeap& queue);
-};
-
-}
diff --git a/ydb/core/formats/arrow/one_batch_input_stream.h b/ydb/core/formats/arrow/one_batch_input_stream.h
deleted file mode 100644
index 647e70a3f6..0000000000
--- a/ydb/core/formats/arrow/one_batch_input_stream.h
+++ /dev/null
@@ -1,36 +0,0 @@
-// The code in this file is based on original ClickHouse source code
-// which is licensed under Apache license v2.0
-// See: https://github.com/ClickHouse/ClickHouse/
-
-#pragma once
-#include "input_stream.h"
-
-namespace NKikimr::NArrow {
-
-class TOneBatchInputStream : public IInputStream {
-public:
- explicit TOneBatchInputStream(std::shared_ptr<arrow::RecordBatch> batch)
- : Batch(batch)
- , Header(Batch->schema())
- {}
-
- std::shared_ptr<arrow::Schema> Schema() const override {
- return Header;
- }
-
-protected:
- std::shared_ptr<arrow::RecordBatch> ReadImpl() override {
- if (Batch) {
- auto out = Batch;
- Batch.reset();
- return out;
- }
- return {};
- }
-
-private:
- std::shared_ptr<arrow::RecordBatch> Batch;
- std::shared_ptr<arrow::Schema> Header;
-};
-
-}
diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp
index 4613e45bd0..710c07c5a5 100644
--- a/ydb/core/formats/arrow/reader/merger.cpp
+++ b/ydb/core/formats/arrow/reader/merger.cpp
@@ -194,7 +194,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
return result;
}
-bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
+void TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
while (SortHeap.Size()) {
if (auto currentPosition = DrainCurrentPosition()) {
@@ -202,7 +202,6 @@ bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
builder.AddRecord(*currentPosition);
}
}
- return false;
}
std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() {
diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h
index 795f083d1e..1849e535a8 100644
--- a/ydb/core/formats/arrow/reader/merger.h
+++ b/ydb/core/formats/arrow/reader/merger.h
@@ -241,7 +241,7 @@ public:
return !SortHeap.Size();
}
- bool DrainAll(TRecordBatchBuilder& builder);
+ void DrainAll(TRecordBatchBuilder& builder);
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
diff --git a/ydb/core/formats/arrow/sort_cursor.h b/ydb/core/formats/arrow/sort_cursor.h
deleted file mode 100644
index f51f4145ae..0000000000
--- a/ydb/core/formats/arrow/sort_cursor.h
+++ /dev/null
@@ -1,262 +0,0 @@
-// The code in this file is based on original ClickHouse source code
-// which is licensed under Apache license v2.0
-// See: https://github.com/ClickHouse/ClickHouse/
-
-#pragma once
-#include "replace_key.h"
-#include "arrow_helpers.h"
-#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
-#include <algorithm>
-
-namespace NKikimr::NArrow {
-
-/// Description of the sorting rule for several columns.
-struct TSortDescription {
- /// @note In case you have PK and snapshot column you should sort with {ASC PK, DESC snap} key and replase with PK
- std::shared_ptr<arrow::Schema> SortingKey;
- std::shared_ptr<arrow::Schema> ReplaceKey; /// Keep first visited (SortingKey ordered) of dups
- std::vector<int> Directions; /// 1 - ascending, -1 - descending.
- bool NotNull{false};
- bool Reverse{false}; // Read sources from bottom to top. With inversed Directions leads to DESC dst for ASC src
-
- TSortDescription() = default;
-
- TSortDescription(const std::shared_ptr<arrow::Schema>& sortingKey,
- const std::shared_ptr<arrow::Schema>& replaceKey = {})
- : SortingKey(sortingKey)
- , ReplaceKey(replaceKey)
- , Directions(sortingKey->num_fields(), 1)
- {}
-
- size_t Size() const { return SortingKey->num_fields(); }
- int Direction(size_t pos) const { return Directions[pos]; }
- bool Replace() const { return ReplaceKey.get(); }
-
- void Inverse() {
- Reverse = !Reverse;
- for (int& dir : Directions) {
- dir *= -1;
- }
- }
-};
-
-
-/// Cursor allows to compare rows in different batches.
-/// Cursor moves inside single block. It is used in priority queue.
-struct TSortCursorImpl {
- std::shared_ptr<TArrayVec> sort_columns;
- std::shared_ptr<TSortDescription> desc;
- ui32 order = 0; // Number of cursor. It determines an order if comparing columns are equal.
- //
- std::shared_ptr<arrow::RecordBatch> current_batch;
- const TArrayVec* all_columns;
- std::shared_ptr<TArrayVec> replace_columns;
-
- TSortCursorImpl() = default;
-
- TSortCursorImpl(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<TSortDescription> desc_, ui32 order_ = 0)
- : desc(desc_)
- , order(order_)
- {
- Reset(batch);
- }
-
- bool Empty() const { return Rows() == 0; }
- size_t Rows() const { return (!all_columns || all_columns->empty()) ? 0 : all_columns->front()->length(); }
- size_t LastRow() const { return Rows() - 1; }
-
- void Reset(std::shared_ptr<arrow::RecordBatch> batch) {
- current_batch = batch;
- auto rbSorting = ExtractColumns(batch, desc->SortingKey);
- Y_ABORT_UNLESS(rbSorting);
- sort_columns = std::make_shared<TArrayVec>(rbSorting->columns());
- all_columns = &batch->columns();
- if (desc->ReplaceKey) {
- auto rbReplace = ExtractColumns(batch, desc->ReplaceKey);
- Y_ABORT_UNLESS(rbReplace);
- replace_columns = std::make_shared<TArrayVec>(rbReplace->columns());
- }
- pos = 0;
- }
-
- size_t getRow() const {
- return desc->Reverse ? (Rows() - pos - 1) : pos;
- }
-
- bool isFirst() const { return pos == 0; }
- bool isLast() const { return pos + 1 >= Rows(); }
- bool isValid() const { return pos < Rows(); }
- void next() { ++pos; }
-
-private:
- size_t pos{0};
-};
-
-
-struct TSortCursor {
- TSortCursorImpl* Impl;
- bool NotNull;
-
- TSortCursor(TSortCursorImpl* impl, bool notNull)
- : Impl(impl)
- , NotNull(notNull)
- {}
-
- TSortCursorImpl* operator-> () { return Impl; }
- const TSortCursorImpl* operator-> () const { return Impl; }
-
- bool Greater(const TSortCursor& rhs) const {
- return GreaterAt(rhs, Impl->getRow(), rhs.Impl->getRow());
- }
-
- /// Inverted so that the priority queue elements are removed in ascending order.
- bool operator < (const TSortCursor& rhs) const {
- return Greater(rhs);
- }
-
-private:
- /// The specified row of this cursor is greater than the specified row of another cursor.
- bool GreaterAt(const TSortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const {
- TRawReplaceKey left(Impl->sort_columns.get(), lhs_pos);
- TRawReplaceKey right(rhs.Impl->sort_columns.get(), rhs_pos);
-
- if (NotNull) {
- for (size_t i = 0; i < Impl->desc->Size(); ++i) {
- auto cmp = left.CompareColumnValueNotNull(i, right, i);
- int res = Impl->desc->Direction(i) * (std::is_eq(cmp) ? 0 : (std::is_lt(cmp) ? -1 : 1));
- if (res > 0)
- return true;
- if (res < 0)
- return false;
- }
- } else {
- for (size_t i = 0; i < Impl->desc->Size(); ++i) {
- auto cmp = left.CompareColumnValue(i, right, i);
- int res = Impl->desc->Direction(i) * (std::is_eq(cmp) ? 0 : (std::is_lt(cmp) ? -1 : 1));
- if (res > 0)
- return true;
- if (res < 0)
- return false;
- }
- }
- return Impl->order > rhs.Impl->order;
- }
-};
-
-
-/// Allows to fetch data from multiple sort cursors in sorted order (merging sorted data stream).
-/// TODO: Replace with "Loser Tree", see https://en.wikipedia.org/wiki/K-way_merge_algorithm
-class TSortingHeap {
-public:
- TSortingHeap() = default;
-
- template <typename TCursors>
- TSortingHeap(TCursors& cursors, bool notNull) {
- Queue.reserve(cursors.size());
- for (auto& cur : cursors) {
- if (!cur.Empty()) {
- Queue.emplace_back(TSortCursor(&cur, notNull));
- }
- }
- std::make_heap(Queue.begin(), Queue.end());
- }
-
- bool IsValid() const { return !Queue.empty(); }
- TSortCursor& Current() { return Queue.front(); }
- size_t Size() { return Queue.size(); }
- TSortCursor& NextChild() { return Queue[NextChildIndex()]; }
-
- void Next() {
- Y_ABORT_UNLESS(IsValid());
-
- if (!Current()->isLast()) {
- Current()->next();
- UpdateTop();
- }
- else {
- RemoveTop();
- }
- }
-
- void ReplaceTop(TSortCursor&& new_top) {
- Current() = new_top;
- UpdateTop();
- }
-
- void RemoveTop() {
- std::pop_heap(Queue.begin(), Queue.end());
- Queue.pop_back();
- NextIdx = 0;
- }
-
- void Push(TSortCursor&& cursor) {
- Queue.emplace_back(cursor);
- std::push_heap(Queue.begin(), Queue.end());
- NextIdx = 0;
- }
-
-private:
- std::vector<TSortCursor> Queue;
- /// Cache comparison between first and second child if the order in queue has not been changed.
- size_t NextIdx = 0;
-
- size_t NextChildIndex() {
- if (NextIdx == 0) {
- NextIdx = 1;
- if (Queue.size() > 2 && Queue[1] < Queue[2]) {
- ++NextIdx;
- }
- }
-
- return NextIdx;
- }
-
- /// This is adapted version of the function __sift_down from libc++.
- /// Why cannot simply use std::priority_queue?
- /// - because it doesn't support updating the top element and requires pop and push instead.
- /// Also look at "Boost.Heap" library.
- void UpdateTop() {
- size_t size = Queue.size();
- if (size < 2)
- return;
-
- auto begin = Queue.begin();
-
- size_t child_idx = NextChildIndex();
- auto child_it = begin + child_idx;
-
- /// Check if we are in order.
- if (*child_it < *begin)
- return;
-
- NextIdx = 0;
-
- auto curr_it = begin;
- auto top(std::move(*begin));
- do {
- /// We are not in heap-order, swap the parent with it's largest child.
- *curr_it = std::move(*child_it);
- curr_it = child_it;
-
- // recompute the child based off of the updated parent
- child_idx = 2 * child_idx + 1;
-
- if (child_idx >= size)
- break;
-
- child_it = begin + child_idx;
-
- if ((child_idx + 1) < size && *child_it < *(child_it + 1))
- {
- /// Right child exists and is greater than left child.
- ++child_it;
- ++child_idx;
- }
-
- /// Check if we are in order.
- } while (!(*child_it < top));
- *curr_it = std::move(top);
- }
-};
-
-}
diff --git a/ydb/core/formats/arrow/ut_arrow.cpp b/ydb/core/formats/arrow/ut/ut_arrow.cpp
index 2ab400bf49..da620d70fa 100644
--- a/ydb/core/formats/arrow/ut_arrow.cpp
+++ b/ydb/core/formats/arrow/ut/ut_arrow.cpp
@@ -1,9 +1,10 @@
-#include "arrow_batch_builder.h"
-#include "arrow_helpers.h"
-#include "converter.h"
-#include "one_batch_input_stream.h"
-#include "merging_sorted_input_stream.h"
-#include "arrow_filter.h"
+#include <ydb/core/formats/arrow/arrow_batch_builder.h>
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/converter.h>
+#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <ydb/core/formats/arrow/permutations.h>
+#include <ydb/core/formats/arrow/reader/merger.h>
+#include <ydb/core/formats/arrow/reader/result_builder.h>
#include <ydb/library/binary_json/write.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -480,13 +481,6 @@ ui32 RestoreValue(ui32 a, ui32 b, ui32 c) {
return ui32(a) * 100 + b * 10 + c;
}
-ui32 RestoreOne(const std::shared_ptr<arrow::RecordBatch>& batch, int pos) {
- auto arrA = std::static_pointer_cast<arrow::Int8Array>(batch->GetColumnByName("i8"));
- auto arrB = std::static_pointer_cast<arrow::Int16Array>(batch->GetColumnByName("i16"));
- auto arrC = std::static_pointer_cast<arrow::Int32Array>(batch->GetColumnByName("i32"));
- return RestoreValue(arrA->Value(pos), arrB->Value(pos), arrC->Value(pos));
-}
-
bool CheckSorted1000(const std::shared_ptr<arrow::RecordBatch>& batch, bool desc = false) {
auto arrA = std::static_pointer_cast<arrow::Int8Array>(batch->GetColumnByName("i8"));
auto arrB = std::static_pointer_cast<arrow::Int16Array>(batch->GetColumnByName("i16"));
@@ -663,38 +657,26 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
UNIT_ASSERT(CheckSorted1000(batch));
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.push_back(batch->Slice(0, 100)); // 0..100
- batches.push_back(batch->Slice(100, 200)); // 100..300
- batches.push_back(batch->Slice(200, 400)); // 200..600
- batches.push_back(batch->Slice(500, 50)); // 500..550
- batches.push_back(batch->Slice(600, 1)); // 600..601
-
- auto descr = std::make_shared<NArrow::TSortDescription>(batch->schema());
- descr->NotNull = true;
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> sorted;
- { // maxBatchSize = 500, no limit
- std::vector<NArrow::IInputStream::TPtr> streams;
- for (auto& batch : batches) {
- streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch));
- }
+ batches.push_back(batch->Slice(0, 100)); // 0..100 +100
+ batches.push_back(batch->Slice(100, 200)); // 100..300 +200
+ batches.push_back(batch->Slice(200, 400)); // 200..600 +300
+ batches.push_back(batch->Slice(500, 50)); // 500..550 +50
+ batches.push_back(batch->Slice(600, 1)); // 600..601 +1
- NArrow::IInputStream::TPtr mergeStream =
- std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 500);
-
- while (auto batch = mergeStream->Read()) {
- sorted.emplace_back(batch);
+ std::shared_ptr<arrow::RecordBatch> sorted;
+ {
+ NArrow::NMerger::TRecordBatchBuilder builder(batch->schema()->fields());
+ const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()};
+ auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), false, vColumns);
+ for (auto&& i : batches) {
+ merger->AddSource(i, nullptr);
}
+ merger->DrainAll(builder);
+ sorted = builder.Finalize();
}
-
- UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sorted[0]->num_rows(), 500);
- UNIT_ASSERT_VALUES_EQUAL(sorted[1]->num_rows(), 251);
- UNIT_ASSERT(CheckSorted(sorted[0]));
- UNIT_ASSERT(CheckSorted(sorted[1]));
- UNIT_ASSERT(NArrow::IsSorted(sorted[0], descr->SortingKey));
- UNIT_ASSERT(NArrow::IsSorted(sorted[1], descr->SortingKey));
- UNIT_ASSERT(RestoreOne(sorted[0], 499) <= RestoreOne(sorted[1], 0));
+ UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 601);
+ UNIT_ASSERT(NArrow::IsSorted(sorted, batch->schema()));
+ UNIT_ASSERT(CheckSorted(sorted));
}
Y_UNIT_TEST(MergingSortedInputStreamReversed) {
@@ -702,86 +684,29 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
UNIT_ASSERT(CheckSorted1000(batch));
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.push_back(batch->Slice(0, 100)); // 0..100
- batches.push_back(batch->Slice(100, 200)); // 100..300
- batches.push_back(batch->Slice(200, 400)); // 200..600
- batches.push_back(batch->Slice(500, 50)); // 500..550
- batches.push_back(batch->Slice(600, 1)); // 600..601
-
- auto descr = std::make_shared<NArrow::TSortDescription>(batch->schema());
- descr->NotNull = true;
- descr->Inverse();
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> sorted;
- { // maxBatchSize = 500, no limit
- std::vector<NArrow::IInputStream::TPtr> streams;
- for (auto& batch : batches) {
- streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch));
- }
-
- NArrow::IInputStream::TPtr mergeStream =
- std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 500);
+ batches.push_back(batch->Slice(0, 100)); // 0..100 +100
+ batches.push_back(batch->Slice(100, 200)); // 100..300 +200
+ batches.push_back(batch->Slice(200, 400)); // 200..600 +300
+ batches.push_back(batch->Slice(500, 50)); // 500..550 +50
+ batches.push_back(batch->Slice(600, 1)); // 600..601 +1
- while (auto batch = mergeStream->Read()) {
- sorted.emplace_back(batch);
- }
- }
-
- UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sorted[0]->num_rows(), 500);
- UNIT_ASSERT_VALUES_EQUAL(sorted[1]->num_rows(), 251);
- UNIT_ASSERT(CheckSorted(sorted[0], true));
- UNIT_ASSERT(CheckSorted(sorted[1], true));
- UNIT_ASSERT(NArrow::IsSorted(sorted[0], descr->SortingKey, true));
- UNIT_ASSERT(NArrow::IsSorted(sorted[1], descr->SortingKey, true));
- UNIT_ASSERT(RestoreOne(sorted[0], 499) >= RestoreOne(sorted[1], 0));
- }
-
- Y_UNIT_TEST(MergingSortedInputStreamReplace) {
- std::shared_ptr<arrow::RecordBatch> batch = ExtractBatch(MakeTable1000());
- UNIT_ASSERT(CheckSorted1000(batch));
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.push_back(AddSnapColumn(batch->Slice(0, 400), 0));
- batches.push_back(AddSnapColumn(batch->Slice(200, 400), 1));
- batches.push_back(AddSnapColumn(batch->Slice(400, 400), 2));
- batches.push_back(AddSnapColumn(batch->Slice(600, 400), 3));
-
- auto sortingKey = batches[0]->schema();
- auto replaceKey = batch->schema();
-
- auto descr = std::make_shared<NArrow::TSortDescription>(sortingKey, replaceKey);
- descr->Directions.back() = -1; // greater snapshot first
- descr->NotNull = true;
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> sorted;
+ std::shared_ptr<arrow::RecordBatch> sorted;
{
- std::vector<NArrow::IInputStream::TPtr> streams;
- for (auto& batch : batches) {
- streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch));
- }
-
- NArrow::IInputStream::TPtr mergeStream =
- std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 5000);
-
- while (auto batch = mergeStream->Read()) {
- sorted.emplace_back(batch);
+ NArrow::NMerger::TRecordBatchBuilder builder(batch->schema()->fields());
+ const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()};
+ auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), true, vColumns);
+ for (auto&& i : batches) {
+ merger->AddSource(i, nullptr);
}
+ merger->DrainAll(builder);
+ sorted = builder.Finalize();
}
-
- UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(sorted[0]->num_rows(), 1000);
- UNIT_ASSERT(CheckSorted1000(sorted[0]));
- UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted[0], descr->SortingKey));
-
- auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted[0]->GetColumnByName("snap")));
- UNIT_ASSERT_VALUES_EQUAL(counts[0], 200);
- UNIT_ASSERT_VALUES_EQUAL(counts[1], 200);
- UNIT_ASSERT_VALUES_EQUAL(counts[2], 200);
- UNIT_ASSERT_VALUES_EQUAL(counts[3], 400);
+ UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 601);
+ UNIT_ASSERT(NArrow::IsSorted(sorted, batch->schema(), true));
+ UNIT_ASSERT(CheckSorted(sorted, true));
}
- Y_UNIT_TEST(MergingSortedInputStreamReplaceReversed) {
+ Y_UNIT_TEST(MergingSortedInputStreamReplace) {
std::shared_ptr<arrow::RecordBatch> batch = ExtractBatch(MakeTable1000());
UNIT_ASSERT(CheckSorted1000(batch));
@@ -791,35 +716,23 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
batches.push_back(AddSnapColumn(batch->Slice(400, 400), 2));
batches.push_back(AddSnapColumn(batch->Slice(600, 400), 3));
- auto sortingKey = batches[0]->schema();
- auto replaceKey = batch->schema();
-
- auto descr = std::make_shared<NArrow::TSortDescription>(sortingKey, replaceKey);
- descr->Directions.back() = 1; // greater snapshot last
- descr->NotNull = true;
- descr->Inverse();
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> sorted;
+ std::shared_ptr<arrow::RecordBatch> sorted;
{
- std::vector<NArrow::IInputStream::TPtr> streams;
- for (auto& batch : batches) {
- streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch));
- }
-
- NArrow::IInputStream::TPtr mergeStream =
- std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 5000);
-
- while (auto batch = mergeStream->Read()) {
- sorted.emplace_back(batch);
+ NArrow::NMerger::TRecordBatchBuilder builder(batches[0]->schema()->fields());
+ const std::vector<std::string> vColumns = {"snap"};
+ auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns);
+ for (auto&& i : batches) {
+ merger->AddSource(i, nullptr);
}
+ merger->DrainAll(builder);
+ sorted = builder.Finalize();
}
- UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(sorted[0]->num_rows(), 1000);
- UNIT_ASSERT(CheckSorted1000(sorted[0], true));
- UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted[0], descr->SortingKey, true));
+ UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 1000);
+ UNIT_ASSERT(CheckSorted1000(sorted));
+ UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted, batch->schema()));
- auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted[0]->GetColumnByName("snap")));
+ auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted->GetColumnByName("snap")));
UNIT_ASSERT_VALUES_EQUAL(counts[0], 200);
UNIT_ASSERT_VALUES_EQUAL(counts[1], 200);
UNIT_ASSERT_VALUES_EQUAL(counts[2], 200);
diff --git a/ydb/core/formats/arrow/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp
index 2ab52ed29b..d7f447a1b2 100644
--- a/ydb/core/formats/arrow/ut_program_step.cpp
+++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp
@@ -2,14 +2,16 @@
#include <memory>
#include <vector>
+#include <ydb/core/formats/arrow/custom_registry.h>
+#include <ydb/core/formats/arrow/program.h>
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/library/arrow_kernels/ut_common.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/exec.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type_fwd.h>
-#include <library/cpp/testing/unittest/registar.h>
-#include <ydb/library/arrow_kernels/ut_common.h>
-#include "custom_registry.h"
-#include "program.h"
-#include "arrow_helpers.h"
using namespace NKikimr::NArrow;
using namespace NKikimr::NSsa;
diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make
index 146c5bae1e..d55ccb5799 100644
--- a/ydb/core/formats/arrow/ya.make
+++ b/ydb/core/formats/arrow/ya.make
@@ -45,14 +45,10 @@ SRCS(
converter.h
custom_registry.cpp
input_stream.h
- merging_sorted_input_stream.cpp
- merging_sorted_input_stream.h
- one_batch_input_stream.h
permutations.cpp
program.cpp
replace_key.cpp
size_calcer.cpp
- sort_cursor.h
ssa_program_optimizer.cpp
special_keys.cpp
simple_arrays_cache.cpp
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 4e36b4722f..f5192eb7d8 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -15,9 +15,6 @@
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
#include <ydb/core/tx/tiering/manager.h>
-#include <ydb/core/formats/arrow/one_batch_input_stream.h>
-#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
-
#include <ydb/library/conclusion/status.h>
#include <library/cpp/time_provider/time_provider.h>
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 5e14af96cd..c91f2d0d13 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -5,7 +5,6 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
-#include <ydb/core/formats/arrow/sort_cursor.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/formats/arrow/transformer/dictionary.h>
#include <ydb/core/sys_view/common/schema.h>
@@ -264,32 +263,6 @@ void TIndexInfo::SetAllKeys(const std::shared_ptr<IStoragesManager>& operators)
}
}
-std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortDescription() const {
- if (GetPrimaryKey()) {
- auto key = ExtendedKey; // Sort with extended key, greater snapshot first
- Y_ABORT_UNLESS(key && key->num_fields() > 2);
- auto description = std::make_shared<NArrow::TSortDescription>(key);
- description->Directions[key->num_fields() - 1] = -1;
- description->Directions[key->num_fields() - 2] = -1;
- description->NotNull = true; // TODO
- return description;
- }
- return {};
-}
-
-std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortReplaceDescription() const {
- if (GetPrimaryKey()) {
- auto key = ExtendedKey; // Sort with extended key, greater snapshot first
- Y_ABORT_UNLESS(key && key->num_fields() > 2);
- auto description = std::make_shared<NArrow::TSortDescription>(key, GetPrimaryKey());
- description->Directions[key->num_fields() - 1] = -1;
- description->Directions[key->num_fields() - 2] = -1;
- description->NotNull = true; // TODO
- return description;
- }
- return {};
-}
-
TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId) const {
auto it = ColumnFeatures.find(columnId);
AFL_VERIFY(it != ColumnFeatures.end());
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 39ce02b372..a5c6381425 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -24,10 +24,6 @@ namespace arrow {
class Schema;
}
-namespace NKikimr::NArrow {
- struct TSortDescription;
-}
-
namespace NKikimr::NOlap {
class TPortionInfoWithBlobs;
@@ -304,9 +300,6 @@ public:
bool IsSorted() const { return true; }
bool IsSortedColumn(const ui32 columnId) const { return GetPKFirstColumnId() == columnId; }
- std::shared_ptr<NArrow::TSortDescription> SortDescription() const;
- std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const;
-
static const std::set<ui32>& GetSpecialColumnIdsSet() {
static const std::set<ui32> result(GetSpecialColumnIds().begin(), GetSpecialColumnIds().end());
return result;