aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-22 17:56:02 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-22 18:15:51 +0300
commit6a06448fc70344e1420dab5b685f8a6b4ab311c3 (patch)
tree7f643b8d99c2491bdaeebc6e909aa2f9b1162dab
parent9d59960b8484ffc24d6f31ed2525ff49c18fe4e9 (diff)
downloadydb-6a06448fc70344e1420dab5b685f8a6b4ab311c3.tar.gz
KIKIMR-19213: dont store batch into sortable position (use data columns)
-rw-r--r--ydb/core/formats/arrow/reader/read_filter_merger.cpp46
-rw-r--r--ydb/core/formats/arrow/reader/read_filter_merger.h30
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp7
3 files changed, 54 insertions, 29 deletions
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.cpp b/ydb/core/formats/arrow/reader/read_filter_merger.cpp
index dbcc7951041..0fa6b52cb04 100644
--- a/ydb/core/formats/arrow/reader/read_filter_merger.cpp
+++ b/ydb/core/formats/arrow/reader/read_filter_merger.cpp
@@ -15,20 +15,9 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
return result;
}
-std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) {
- if (!batch || !batch->num_rows()) {
- return {};
- }
-
- i64 posStart = 0;
- i64 posFinish = batch->num_rows() - 1;
- if (forFound.IsReverseSort()) {
- std::swap(posStart, posFinish);
- }
- if (includedStartPosition) {
- posStart = *includedStartPosition;
- }
- TSortableBatchPosition position = forFound.BuildSame(batch, posStart);
+std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(TSortableBatchPosition& position, const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) {
+ ui64 posStart = posStartExt;
+ ui64 posFinish = posFinishExt;
{
position.InitPosition(posStart);
auto cmp = position.Compare(forFound);
@@ -60,21 +49,40 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
}
Y_ABORT_UNLESS(posFinish != posStart);
if (greater) {
+ Y_ABORT_UNLESS(position.InitPosition(posFinish));
return TFoundPosition::Greater(posFinish);
} else {
+ Y_ABORT_UNLESS(position.InitPosition(posStart));
return TFoundPosition::Less(posStart);
}
}
+std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) {
+ if (!batch || !batch->num_rows()) {
+ return {};
+ }
+ i64 posStart = 0;
+ i64 posFinish = batch->num_rows() - 1;
+ if (forFound.IsReverseSort()) {
+ std::swap(posStart, posFinish);
+ }
+ if (includedStartPosition) {
+ posStart = *includedStartPosition;
+ }
+
+ TSortableBatchPosition position = forFound.BuildSame(batch, posStart);
+ return FindPosition(position, posStart, posFinish, forFound, greater);
+}
+
TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) {
- auto pos = FindPosition(Batch, forFound, true, Position);
- AFL_VERIFY(pos)("batch", NArrow::DebugJson(Batch, 1, 1))("found", forFound.DebugJson());
+ const ui32 posStart = Position;
+ auto pos = FindPosition(*this, posStart, ReverseSort ? 0 : (RecordsCount - 1), forFound, true);
+ AFL_VERIFY(pos)("cursor", DebugJson())("found", forFound.DebugJson());
if (ReverseSort) {
- AFL_VERIFY(pos->GetPosition() <= Position)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true);
+ AFL_VERIFY(Position <= posStart)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true);
} else {
- AFL_VERIFY(Position <= pos->GetPosition())("pos", Position)("pos_skip", pos->GetPosition())("reverse", false);
+ AFL_VERIFY(posStart <= Position)("pos", Position)("pos_skip", pos->GetPosition())("reverse", false);
}
- AFL_VERIFY(InitPosition(pos->GetPosition()));
return *pos;
}
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h
index 6fef5323074..b8f97af026f 100644
--- a/ydb/core/formats/arrow/reader/read_filter_merger.h
+++ b/ydb/core/formats/arrow/reader/read_filter_merger.h
@@ -21,6 +21,15 @@ public:
TSortableScanData() = default;
TSortableScanData(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns);
+ std::shared_ptr<arrow::RecordBatch> Slice(const ui64 offset, const ui64 count) const {
+ std::vector<std::shared_ptr<arrow::Array>> slicedArrays;
+ for (auto&& i : Columns) {
+ AFL_VERIFY(offset + count <= (ui64)i->length())("offset", offset)("count", count)("length", i->length());
+ slicedArrays.emplace_back(i->Slice(offset, count));
+ }
+ return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(Fields), count, slicedArrays);
+ }
+
bool IsSameSchema(const std::shared_ptr<arrow::Schema>& schema) const {
if (Fields.size() != (size_t)schema->num_fields()) {
return false;
@@ -62,17 +71,20 @@ public:
};
class TSortableBatchPosition {
-protected:
-
+private:
YDB_READONLY(i64, Position, 0);
i64 RecordsCount = 0;
bool ReverseSort = false;
std::shared_ptr<TSortableScanData> Sorting;
std::shared_ptr<TSortableScanData> Data;
- YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
public:
TSortableBatchPosition() = default;
+ std::shared_ptr<arrow::RecordBatch> Slice(const ui64 offset, const ui64 count) const {
+ AFL_VERIFY(Data);
+ return Data->Slice(offset, count);
+ }
+
class TFoundPosition {
private:
YDB_READONLY(ui32, Position, 0);
@@ -209,7 +221,8 @@ public:
}
static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition);
- TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound);
+ static std::optional<TSortableBatchPosition::TFoundPosition> FindPosition(TSortableBatchPosition& position, const ui64 posStart, const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater);
+ TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition & forFound);
const TSortableScanData& GetData() const {
return *Data;
@@ -234,15 +247,16 @@ public:
TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort)
: Position(position)
- , RecordsCount(batch->num_rows())
, ReverseSort(reverseSort)
- , Sorting(std::make_shared<TSortableScanData>(batch, sortingColumns))
- , Batch(batch)
{
+ Y_ABORT_UNLESS(batch);
+ Y_ABORT_UNLESS(batch->num_rows());
+ RecordsCount = batch->num_rows();
+
if (dataColumns.size()) {
Data = std::make_shared<TSortableScanData>(batch, dataColumns);
}
- Y_ABORT_UNLESS(batch->num_rows());
+ Sorting = std::make_shared<TSortableScanData>(batch, sortingColumns);
Y_DEBUG_ABORT_UNLESS(batch->ValidateFull().ok());
Y_ABORT_UNLESS(Sorting->GetColumns().size());
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
index 71c086aed9e..b03f44f285e 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -105,10 +105,13 @@ std::shared_ptr<arrow::RecordBatch> TMergePartialStream::SingleSourceDrain(const
include = true;
}
if (Reverse) {
- result = SortHeap.Current().GetKeyColumns().GetBatch()->Slice(pos.GetPosition() + (include ? 0 : 1), delta + (include ? 1 : 0));
+ result = SortHeap.Current().GetKeyColumns().Slice(pos.GetPosition() + (include ? 0 : 1), delta + (include ? 1 : 0));
} else {
- result = SortHeap.Current().GetKeyColumns().GetBatch()->Slice(startPos, delta + (include ? 1 : 0));
+ result = SortHeap.Current().GetKeyColumns().Slice(startPos, delta + (include ? 1 : 0));
}
+#ifndef NDEBUG
+ NArrow::TStatusValidator::Validate(result->ValidateFull());
+#endif
if (Reverse) {
auto permutation = NArrow::MakePermutation(result->num_rows(), true);
result = NArrow::TStatusValidator::GetValid(arrow::compute::Take(result, permutation)).record_batch();