diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-22 17:56:02 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-22 18:15:51 +0300 |
commit | 6a06448fc70344e1420dab5b685f8a6b4ab311c3 (patch) | |
tree | 7f643b8d99c2491bdaeebc6e909aa2f9b1162dab | |
parent | 9d59960b8484ffc24d6f31ed2525ff49c18fe4e9 (diff) | |
download | ydb-6a06448fc70344e1420dab5b685f8a6b4ab311c3.tar.gz |
KIKIMR-19213: dont store batch into sortable position (use data columns)
3 files changed, 54 insertions, 29 deletions
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.cpp b/ydb/core/formats/arrow/reader/read_filter_merger.cpp index dbcc7951041..0fa6b52cb04 100644 --- a/ydb/core/formats/arrow/reader/read_filter_merger.cpp +++ b/ydb/core/formats/arrow/reader/read_filter_merger.cpp @@ -15,20 +15,9 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const { return result; } -std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) { - if (!batch || !batch->num_rows()) { - return {}; - } - - i64 posStart = 0; - i64 posFinish = batch->num_rows() - 1; - if (forFound.IsReverseSort()) { - std::swap(posStart, posFinish); - } - if (includedStartPosition) { - posStart = *includedStartPosition; - } - TSortableBatchPosition position = forFound.BuildSame(batch, posStart); +std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(TSortableBatchPosition& position, const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) { + ui64 posStart = posStartExt; + ui64 posFinish = posFinishExt; { position.InitPosition(posStart); auto cmp = position.Compare(forFound); @@ -60,21 +49,40 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi } Y_ABORT_UNLESS(posFinish != posStart); if (greater) { + Y_ABORT_UNLESS(position.InitPosition(posFinish)); return TFoundPosition::Greater(posFinish); } else { + Y_ABORT_UNLESS(position.InitPosition(posStart)); return TFoundPosition::Less(posStart); } } +std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) { + if (!batch || !batch->num_rows()) { + return {}; + } + i64 posStart = 0; + i64 posFinish = batch->num_rows() - 1; + if (forFound.IsReverseSort()) { + std::swap(posStart, posFinish); + } + if (includedStartPosition) { + posStart = *includedStartPosition; + } + + TSortableBatchPosition position = forFound.BuildSame(batch, posStart); + return FindPosition(position, posStart, posFinish, forFound, greater); +} + TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) { - auto pos = FindPosition(Batch, forFound, true, Position); - AFL_VERIFY(pos)("batch", NArrow::DebugJson(Batch, 1, 1))("found", forFound.DebugJson()); + const ui32 posStart = Position; + auto pos = FindPosition(*this, posStart, ReverseSort ? 0 : (RecordsCount - 1), forFound, true); + AFL_VERIFY(pos)("cursor", DebugJson())("found", forFound.DebugJson()); if (ReverseSort) { - AFL_VERIFY(pos->GetPosition() <= Position)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true); + AFL_VERIFY(Position <= posStart)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true); } else { - AFL_VERIFY(Position <= pos->GetPosition())("pos", Position)("pos_skip", pos->GetPosition())("reverse", false); + AFL_VERIFY(posStart <= Position)("pos", Position)("pos_skip", pos->GetPosition())("reverse", false); } - AFL_VERIFY(InitPosition(pos->GetPosition())); return *pos; } diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h index 6fef5323074..b8f97af026f 100644 --- a/ydb/core/formats/arrow/reader/read_filter_merger.h +++ b/ydb/core/formats/arrow/reader/read_filter_merger.h @@ -21,6 +21,15 @@ public: TSortableScanData() = default; TSortableScanData(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns); + std::shared_ptr<arrow::RecordBatch> Slice(const ui64 offset, const ui64 count) const { + std::vector<std::shared_ptr<arrow::Array>> slicedArrays; + for (auto&& i : Columns) { + AFL_VERIFY(offset + count <= (ui64)i->length())("offset", offset)("count", count)("length", i->length()); + slicedArrays.emplace_back(i->Slice(offset, count)); + } + return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(Fields), count, slicedArrays); + } + bool IsSameSchema(const std::shared_ptr<arrow::Schema>& schema) const { if (Fields.size() != (size_t)schema->num_fields()) { return false; @@ -62,17 +71,20 @@ public: }; class TSortableBatchPosition { -protected: - +private: YDB_READONLY(i64, Position, 0); i64 RecordsCount = 0; bool ReverseSort = false; std::shared_ptr<TSortableScanData> Sorting; std::shared_ptr<TSortableScanData> Data; - YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch); public: TSortableBatchPosition() = default; + std::shared_ptr<arrow::RecordBatch> Slice(const ui64 offset, const ui64 count) const { + AFL_VERIFY(Data); + return Data->Slice(offset, count); + } + class TFoundPosition { private: YDB_READONLY(ui32, Position, 0); @@ -209,7 +221,8 @@ public: } static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition); - TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound); + static std::optional<TSortableBatchPosition::TFoundPosition> FindPosition(TSortableBatchPosition& position, const ui64 posStart, const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater); + TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition & forFound); const TSortableScanData& GetData() const { return *Data; @@ -234,15 +247,16 @@ public: TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort) : Position(position) - , RecordsCount(batch->num_rows()) , ReverseSort(reverseSort) - , Sorting(std::make_shared<TSortableScanData>(batch, sortingColumns)) - , Batch(batch) { + Y_ABORT_UNLESS(batch); + Y_ABORT_UNLESS(batch->num_rows()); + RecordsCount = batch->num_rows(); + if (dataColumns.size()) { Data = std::make_shared<TSortableScanData>(batch, dataColumns); } - Y_ABORT_UNLESS(batch->num_rows()); + Sorting = std::make_shared<TSortableScanData>(batch, sortingColumns); Y_DEBUG_ABORT_UNLESS(batch->ValidateFull().ok()); Y_ABORT_UNLESS(Sorting->GetColumns().size()); } diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp index 71c086aed9e..b03f44f285e 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -105,10 +105,13 @@ std::shared_ptr<arrow::RecordBatch> TMergePartialStream::SingleSourceDrain(const include = true; } if (Reverse) { - result = SortHeap.Current().GetKeyColumns().GetBatch()->Slice(pos.GetPosition() + (include ? 0 : 1), delta + (include ? 1 : 0)); + result = SortHeap.Current().GetKeyColumns().Slice(pos.GetPosition() + (include ? 0 : 1), delta + (include ? 1 : 0)); } else { - result = SortHeap.Current().GetKeyColumns().GetBatch()->Slice(startPos, delta + (include ? 1 : 0)); + result = SortHeap.Current().GetKeyColumns().Slice(startPos, delta + (include ? 1 : 0)); } +#ifndef NDEBUG + NArrow::TStatusValidator::Validate(result->ValidateFull()); +#endif if (Reverse) { auto permutation = NArrow::MakePermutation(result->num_rows(), true); result = NArrow::TStatusValidator::GetValid(arrow::compute::Take(result, permutation)).record_batch(); |