diff options
author | Vladilen <vladilenmuz@ydb.tech> | 2025-07-18 12:17:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-18 12:17:49 +0200 |
commit | 1fc8c64a3df10ec7d95b1b7df3152d606af255fa (patch) | |
tree | 5ab2f7af50cf3f6f3cf070212980e8af59bfffc3 | |
parent | 60fb6e70d7fa6bbe2c66d8ed1493200526be2bca (diff) | |
download | ydb-1fc8c64a3df10ec7d95b1b7df3152d606af255fa.tar.gz |
Fix reversed sorting data reading (#21235)
-rw-r--r-- | ydb/core/formats/arrow/reader/position.cpp | 30 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ut/ut_reader.cpp | 298 | ||||
-rw-r--r-- | ydb/tests/olap/delete/test_delete_all_after_inserts.py | 107 |
3 files changed, 417 insertions, 18 deletions
diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp index ae8ecdc8317..665d3f7581a 100644 --- a/ydb/core/formats/arrow/reader/position.cpp +++ b/ydb/core/formats/arrow/reader/position.cpp @@ -45,8 +45,24 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi return std::nullopt; } } - while (posFinish != posStart + 1) { - AFL_VERIFY(posFinish > posStart + 1)("finish", posFinish)("start", posStart); + + const auto checkBoundIsFound = position.ReverseSort ? + [](const ui64 start, const ui64 finish) -> bool { + if (finish + 1 == start) { + return true; + } + AFL_VERIFY(finish + 1 < start)("finish", finish)("start", start); + return false; + } : + [](const ui64 start, const ui64 finish) -> bool { + if (start + 1 == finish) { + return true; + } + AFL_VERIFY(start + 1 < finish)("start", start)("finish", finish); + return false; + }; + + while (!checkBoundIsFound(posStart, posFinish)) { AFL_VERIFY(guard.InitSortingPosition(0.5 * (posStart + posFinish))); const auto comparision = position.Compare(forFound); if (cond(comparision)) { @@ -55,7 +71,7 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi posStart = position.Position; } } - AFL_VERIFY(posFinish == posStart + 1)("finish", posFinish)("start", posStart); + AFL_VERIFY(checkBoundIsFound(posStart, posFinish))("finish", posFinish)("start", posStart)("reverse", position.ReverseSort); AFL_VERIFY(guard.InitSortingPosition(posFinish)); const auto comparision = position.Compare(forFound); AFL_VERIFY(cond(comparision)); @@ -100,11 +116,11 @@ TSortableBatchPosition::TFoundPosition TRWSortableBatchPosition::SkipToLower(con std::optional<TSortableBatchPosition::TFoundPosition> pos; std::optional<ui64> overrideFound; if (ReverseSort) { - pos = FindBound(*this, 0, posStart, forFound, true); - if (!pos) { + pos = FindBound(*this, posStart, 0, forFound, false); + if (pos) { + overrideFound = pos->GetPosition(); + } else { overrideFound = posStart; - } else if (pos->GetPosition()) { - overrideFound = pos->GetPosition() - 1; } } else { pos = FindBound(*this, posStart, RecordsCount - 1, forFound, false); diff --git a/ydb/core/formats/arrow/ut/ut_reader.cpp b/ydb/core/formats/arrow/ut/ut_reader.cpp index 68b1cd2090b..b86e5083316 100644 --- a/ydb/core/formats/arrow/ut/ut_reader.cpp +++ b/ydb/core/formats/arrow/ut/ut_reader.cpp @@ -1,8 +1,164 @@ +#include <ydb/core/formats/arrow/arrow_batch_builder.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/reader/merger.h> #include <ydb/core/formats/arrow/reader/position.h> +#include <ydb/core/formats/arrow/reader/result_builder.h> #include <library/cpp/testing/unittest/registar.h> namespace NKikimr::NArrow { +namespace { + +namespace NTypeIds = NScheme::NTypeIds; +using TTypeInfo = NScheme::TTypeInfo; + +std::shared_ptr<arrow::RecordBatch> ExtractBatch(std::shared_ptr<arrow::Table> table) { + std::shared_ptr<arrow::RecordBatch> batch; + + arrow::TableBatchReader reader(*table); + auto result = reader.Next(); + Y_ABORT_UNLESS(result.ok()); + batch = *result; + result = reader.Next(); + Y_ABORT_UNLESS(result.ok() && !(*result)); + return batch; +} + +struct TDataRow { + static const TTypeInfo* MakeTypeInfos() { + static const TTypeInfo types[3] = { + TTypeInfo(NTypeIds::Int32), + TTypeInfo(NTypeIds::Int32), + TTypeInfo(NTypeIds::Int32)}; + return types; + } + + i32 id1; + i32 value; + i32 version; + + bool operator==(const TDataRow& r) const { + return (id1 == r.id1) && (value == r.value) && (version == r.version); + } + + static std::shared_ptr<arrow::Schema> MakeFullSchema() { + std::vector<std::shared_ptr<arrow::Field>> fields = { + arrow::field("id1", arrow::int32(), false), + arrow::field("value", arrow::int32(), false), + arrow::field("version", arrow::int32(), false)}; + + return std::make_shared<arrow::Schema>(std::move(fields)); + } + + static std::shared_ptr<arrow::Schema> MakeDataSchema() { + std::vector<std::shared_ptr<arrow::Field>> fields = { + arrow::field("id1", arrow::int32(), false), + arrow::field("value", arrow::int32(), false)}; + + return std::make_shared<arrow::Schema>(std::move(fields)); + } + + static std::shared_ptr<arrow::Schema> MakeSortingSchema() { + std::vector<std::shared_ptr<arrow::Field>> fields = { + arrow::field("id1", arrow::int32(), false)}; + + return std::make_shared<arrow::Schema>(std::move(fields)); + } + + static std::shared_ptr<arrow::Schema> MakeVersionSchema() { + std::vector<std::shared_ptr<arrow::Field>> fields = { + arrow::field("version", arrow::int32(), false)}; + + return std::make_shared<arrow::Schema>(std::move(fields)); + } + + static std::vector<std::pair<TString, TTypeInfo>> MakeYdbSchema() { + std::vector<std::pair<TString, TTypeInfo>> columns = { + {"id1", TTypeInfo(NTypeIds::Int32)}, + {"value", TTypeInfo(NTypeIds::Int32)}, + {"version", TTypeInfo(NTypeIds::Int32)}, + }; + return columns; + } + + NKikimr::TDbTupleRef ToDbTupleRef() const { + static TCell Cells[3]; + Cells[0] = TCell::Make<i32>(id1); + Cells[1] = TCell::Make<i32>(value); + Cells[2] = TCell::Make<i32>(version); + + return NKikimr::TDbTupleRef(MakeTypeInfos(), Cells, 3); + } + + TOwnedCellVec SerializedCells() const { + NKikimr::TDbTupleRef value = ToDbTupleRef(); + std::vector<TCell> cells(value.Cells().data(), value.Cells().data() + value.Cells().size()); + + return TOwnedCellVec(cells); + } + + static std::vector<std::string> GetVersionColumns() { + return {"version"}; + } + + static std::vector<std::string> GetSortingColumns() { + return {"id1"}; + } +}; + +class TDataRowTableBuilder { +public: + void AddRow(const TDataRow& row) { + UNIT_ASSERT(Bid1.Append(row.id1).ok()); + UNIT_ASSERT(Bvalue.Append(row.value).ok()); + UNIT_ASSERT(Bversion.Append(row.version).ok()); + } + + std::shared_ptr<arrow::Table> Finish() { + std::shared_ptr<arrow::Int32Array> arid1; + std::shared_ptr<arrow::Int32Array> arvalue; + std::shared_ptr<arrow::Int32Array> arversion; + + UNIT_ASSERT(Bid1.Finish(&arid1).ok()); + UNIT_ASSERT(Bvalue.Finish(&arvalue).ok()); + UNIT_ASSERT(Bversion.Finish(&arversion).ok()); + + std::shared_ptr<arrow::Schema> schema = TDataRow::MakeFullSchema(); + return arrow::Table::Make(schema, {arid1, arvalue, arversion}); + } + + static std::shared_ptr<arrow::Table> Build(const std::vector<struct TDataRow>& rows) { + TDataRowTableBuilder builder; + for (const TDataRow& row : rows) { + builder.AddRow(row); + } + return builder.Finish(); + } + + static std::shared_ptr<arrow::RecordBatch> buildBatch(const std::vector<std::pair<int, int>>& rows, int version) { + TDataRowTableBuilder builder; + for (auto [i, j] : rows) { + builder.AddRow(TDataRow{i, j, version}); + } + + auto table = builder.Finish(); + auto schema = table->schema(); + auto tres = table->SelectColumns(std::vector<int>{ + schema->GetFieldIndex("id1"), + schema->GetFieldIndex("value"), + schema->GetFieldIndex("version")}); + UNIT_ASSERT(tres.ok()); + + return ExtractBatch(*tres); + }; + +private: + arrow::Int32Builder Bid1; + arrow::Int32Builder Bvalue; + arrow::Int32Builder Bversion; +}; + +} // namespace Y_UNIT_TEST_SUITE(SortableBatchPosition) { Y_UNIT_TEST(FindPosition) { @@ -52,6 +208,148 @@ Y_UNIT_TEST_SUITE(SortableBatchPosition) { UNIT_ASSERT(!!findPosition); UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 4); } + + NMerger::TSortableBatchPosition searchPositionReverse(search, 0, true); + { + auto findPosition = NMerger::TSortableBatchPosition::FindBound(data, searchPositionReverse, false, std::nullopt); + UNIT_ASSERT(!!findPosition); + UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 3); + } + { + auto findPosition = NMerger::TSortableBatchPosition::FindBound(data, searchPositionReverse, true, std::nullopt); + UNIT_ASSERT(!!findPosition); + UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 1); + } + } + + Y_UNIT_TEST(MergingSortedInputStreamReversedWithOneSearchPoint) { + const bool isReverse = true; + const bool deepCopy = false; + const bool includeFinish = true; + const bool includeStart = true; + + const int p1 = 1111; + const int p2 = 2222; + const int oldValue = 7777; + const int newValue = 8888; + const int oldVersion = 0; + const int newVersion = 1; + + std::shared_ptr<arrow::RecordBatch> batch1 = TDataRowTableBuilder::buildBatch({{p1, oldValue}}, oldVersion); + std::shared_ptr<arrow::RecordBatch> batch2 = TDataRowTableBuilder::buildBatch({{p2, oldValue}}, oldVersion); + std::shared_ptr<arrow::RecordBatch> batch3 = TDataRowTableBuilder::buildBatch({{p1, newValue}, {p2, newValue}}, newVersion); + + auto vColumns = TDataRow::GetVersionColumns(); + auto sColumns = TDataRow::GetSortingColumns(); + + auto merger = + std::make_shared<NArrow::NMerger::TMergePartialStream>(TDataRow::MakeSortingSchema(), + TDataRow::MakeDataSchema(), isReverse, vColumns, std::nullopt); + + merger->AddSource(batch1, nullptr, NArrow::NMerger::TIterationOrder(isReverse, 0)); + merger->AddSource(batch2, nullptr, NArrow::NMerger::TIterationOrder(isReverse, 0)); + merger->AddSource(batch3, nullptr, NArrow::NMerger::TIterationOrder(isReverse, 0)); + // Range to include in result batch + NArrow::NMerger::TSortableBatchPosition startingPoint(batch1, 0, sColumns, {}, isReverse); + NArrow::NMerger::TSortableBatchPosition finishPoint(batch1, 0, sColumns, {}, isReverse); + + merger->PutControlPoint(finishPoint, deepCopy); + merger->SkipToBound(startingPoint, includeStart); + + NArrow::NMerger::TRecordBatchBuilder builder(TDataRow::MakeDataSchema()->fields()); + std::optional<NArrow::NMerger::TCursor> lastResultPosition; + + merger->DrainToControlPoint(builder, includeFinish, &lastResultPosition); + + UNIT_ASSERT(lastResultPosition); + auto lrpVal = std::static_pointer_cast<arrow::Int32Array>(lastResultPosition->ExtractSortingPosition(TDataRow::MakeSortingSchema()->fields())->column(0))->Value(0); + UNIT_ASSERT_EQUAL(p1, lrpVal); + + auto resultBatch = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({builder.Finalize()})); + UNIT_ASSERT(resultBatch); + + UNIT_ASSERT_EQUAL(1, resultBatch->num_rows()); + + auto id1Col = resultBatch->GetColumnByName("id1"); + auto valueCol = resultBatch->GetColumnByName("value"); + + UNIT_ASSERT_EQUAL(1, id1Col->num_chunks()); + UNIT_ASSERT_EQUAL(1, valueCol->num_chunks()); + + auto id1Val = std::static_pointer_cast<arrow::Int32Array>(id1Col->chunk(0))->Value(0); + auto valueVal = std::static_pointer_cast<arrow::Int32Array>(valueCol->chunk(0))->Value(0); + + UNIT_ASSERT_EQUAL(p1, id1Val); + UNIT_ASSERT_EQUAL(newValue, valueVal); + } + + Y_UNIT_TEST(MergingSortedInputStreamReversedWithRangeSearch) { + const bool isReverse = true; + const bool deepCopy = false; + const bool includeFinish = true; + const bool includeStart = true; + + const int p1 = 1111; + const int p2 = 2222; + const int p3 = 3333; + const int p4 = 4444; + const int oldValue = 7777; + const int newValue = 8888; + const int v0 = 0; + const int v1 = 1; + const int v2 = 2; + + std::shared_ptr<arrow::RecordBatch> batch1 = TDataRowTableBuilder::buildBatch({{p1, oldValue}, {p4, oldValue}}, v0); + std::shared_ptr<arrow::RecordBatch> batch2 = TDataRowTableBuilder::buildBatch({{p2, oldValue}, {p3, oldValue}}, v1); + std::shared_ptr<arrow::RecordBatch> batch3 = TDataRowTableBuilder::buildBatch({{p1, newValue}, {p2, newValue}, {p3, newValue}, {p4, newValue}}, v2); + + auto vColumns = TDataRow::GetVersionColumns(); + auto sColumns = TDataRow::GetSortingColumns(); + + auto merger = + std::make_shared<NArrow::NMerger::TMergePartialStream>(TDataRow::MakeSortingSchema(), + TDataRow::MakeDataSchema(), isReverse, vColumns, std::nullopt); + + merger->AddSource(batch1, nullptr, NArrow::NMerger::TIterationOrder(isReverse, 0)); + merger->AddSource(batch2, nullptr, NArrow::NMerger::TIterationOrder(isReverse, 0)); + merger->AddSource(batch3, nullptr, NArrow::NMerger::TIterationOrder(isReverse, 0)); + // Range to include in result batch, only points p2 p3 matters here + std::shared_ptr<arrow::RecordBatch> p2sp = TDataRowTableBuilder::buildBatch({{p2, oldValue}}, v0); + std::shared_ptr<arrow::RecordBatch> p3sp = TDataRowTableBuilder::buildBatch({{p3, oldValue}}, v0); + NArrow::NMerger::TSortableBatchPosition startingPoint(isReverse ? p3sp : p2sp, 0, sColumns, {}, isReverse); + NArrow::NMerger::TSortableBatchPosition finishPoint(isReverse ? p2sp : p3sp, 0, sColumns, {}, isReverse); + + merger->PutControlPoint(finishPoint, deepCopy); + merger->SkipToBound(startingPoint, includeStart); + + NArrow::NMerger::TRecordBatchBuilder builder(TDataRow::MakeDataSchema()->fields()); + std::optional<NArrow::NMerger::TCursor> lastResultPosition; + + merger->DrainToControlPoint(builder, includeFinish, &lastResultPosition); + + UNIT_ASSERT(lastResultPosition); + auto lrpVal = std::static_pointer_cast<arrow::Int32Array>(lastResultPosition->ExtractSortingPosition(TDataRow::MakeSortingSchema()->fields())->column(0))->Value(0); + UNIT_ASSERT_EQUAL(p2, lrpVal); + + auto resultBatch = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({builder.Finalize()})); + UNIT_ASSERT(resultBatch); + UNIT_ASSERT_EQUAL(2, resultBatch->num_rows()); + + auto id1Col = resultBatch->GetColumnByName("id1"); + auto valueCol = resultBatch->GetColumnByName("value"); + + UNIT_ASSERT_EQUAL(1, id1Col->num_chunks()); + UNIT_ASSERT_EQUAL(1, valueCol->num_chunks()); + + auto firstId1 = std::static_pointer_cast<arrow::Int32Array>(id1Col->chunk(0))->Value(0); + auto secondId1 = std::static_pointer_cast<arrow::Int32Array>(id1Col->chunk(0))->Value(1); + auto firstVal = std::static_pointer_cast<arrow::Int32Array>(valueCol->chunk(0))->Value(0); + auto secondVal = std::static_pointer_cast<arrow::Int32Array>(valueCol->chunk(0))->Value(1); + + UNIT_ASSERT_EQUAL((isReverse ? p3 : p2), firstId1); + UNIT_ASSERT_EQUAL((isReverse ? p2 : p3), secondId1); + UNIT_ASSERT_EQUAL(newValue, firstVal); + UNIT_ASSERT_EQUAL(newValue, secondVal); } } diff --git a/ydb/tests/olap/delete/test_delete_all_after_inserts.py b/ydb/tests/olap/delete/test_delete_all_after_inserts.py index c01b211e420..701d6a2f429 100644 --- a/ydb/tests/olap/delete/test_delete_all_after_inserts.py +++ b/ydb/tests/olap/delete/test_delete_all_after_inserts.py @@ -1,5 +1,4 @@ from .base import DeleteTestBase -import pytest import random @@ -17,7 +16,7 @@ class TestDeleteAllAfterInserts(DeleteTestBase): def _get_row_count(self, table_path): return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}`")[0].rows[0]["Rows"] - def _test_two_columns_pk(self, rows_to_insert, insert_iterations): + def _test_delete_all(self, rows_to_insert, insert_iterations): table_path = f"{self._get_test_dir()}/testTableAllDeleted" self.ydb_client.query(f"DROP TABLE IF EXISTS `{table_path}`;") self.ydb_client.query( @@ -37,7 +36,7 @@ class TestDeleteAllAfterInserts(DeleteTestBase): all_rows_ids = random.sample(range(self.MAX_ID), rows_to_insert * insert_iterations) rows_in_table = 0 for it in range(0, len(all_rows_ids), rows_to_insert): - rows_ids = all_rows_ids[it : it + rows_to_insert] + rows_ids = all_rows_ids[it:it + rows_to_insert] insert_query = f"INSERT INTO `{table_path}` (id1, id2, value) VALUES " for i in rows_ids: @@ -52,42 +51,128 @@ class TestDeleteAllAfterInserts(DeleteTestBase): assert self._get_row_count(table_path) == 0 - # passes assert len(self.ydb_client.query( f""" - SELECT id1 as id1, id2 as id2 FROM `{table_path}` WHERE id1 != 10000000 ORDER by id1, id2 LIMIT 100; + SELECT id1 as id1, id2 as id2 FROM `{table_path}` WHERE id1 != 10000000 ORDER by id1 ASC, id2 ASC LIMIT 100; + """ + )[0].rows) == 0 + + assert len(self.ydb_client.query( + f""" + SELECT id1 as id1, id2 as id2 FROM `{table_path}` WHERE id1 != 10000000 ORDER by id1 ASC, id2 ASC; """ )[0].rows) == 0 - # passes assert len(self.ydb_client.query( f""" SELECT id1 as id1, id2 as id2 FROM `{table_path}` WHERE id1 != 10000000 ORDER by id1 DESC, id2 DESC; """ )[0].rows) == 0 - # passes assert len(self.ydb_client.query( f""" SELECT id1 as id1, id2 as id2 FROM `{table_path}` WHERE id1 != 10000000 LIMIT 100; """ )[0].rows) == 0 - # passes assert len(self.ydb_client.query( f""" SELECT id1 as id1, id2 as id2 FROM `{table_path}` WHERE id1 != 10000000; """ )[0].rows) == 0 - # fails assert len(self.ydb_client.query( f""" SELECT id1 as id1, id2 as id2 FROM `{table_path}` WHERE id1 != 10000000 ORDER by id1 DESC, id2 DESC LIMIT 100; """ )[0].rows) == 0 - @pytest.mark.skip(reason="https://github.com/ydb-platform/ydb/issues/20098") + def _test_select_query(self, query, reversed=False): + res = self.ydb_client.query(query)[0].rows + prev_row = None + for i in range(len(res)): + row = res[i] + assert row["value"] == 'NEW', row["id1"] + if prev_row is not None: + if reversed: + assert row["id1"] < prev_row["id1"] + else: + assert row["id1"] > prev_row["id1"] + prev_row = row + + def _test_update_all(self, rows_to_insert, insert_iterations, all_rows_ids=None): + table_path = f"{self._get_test_dir()}/testTableAllUpdated" + self.ydb_client.query(f"DROP TABLE IF EXISTS `{table_path}`;") + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + id1 Int32 NOT NULL, + value Utf8 NOT NULL, + PRIMARY KEY(id1), + ) + WITH ( + STORE = COLUMN + ) + """ + ) + if all_rows_ids is None: + all_rows_ids = random.sample(range(self.MAX_ID), rows_to_insert * insert_iterations) + rows_in_table = 0 + for it in range(0, len(all_rows_ids), rows_to_insert): + rows_ids = all_rows_ids[it:it + rows_to_insert] + + insert_query = f"INSERT INTO `{table_path}` (id1, value) VALUES " + for i in rows_ids: + insert_query += f"({i}, 'OLD'), " + insert_query = insert_query[:-2] + ";" + self.ydb_client.query(insert_query) + + rows_in_table += rows_to_insert + assert self._get_row_count(table_path) == rows_in_table + + self.ydb_client.query(f"UPDATE `{table_path}` SET value = 'NEW';") + + assert self._get_row_count(table_path) == rows_in_table + + self._test_select_query( + f""" + SELECT id1 as id1, value as value FROM `{table_path}` WHERE id1 != 10000001 ORDER by id1 DESC; + """, + reversed=True + ) + + self._test_select_query( + f""" + SELECT id1 as id1, value as value FROM `{table_path}` WHERE id1 != 10000001 ORDER by id1 ASC; + """, + reversed=False + ) + + self._test_select_query( + f""" + SELECT id1 as id1, value as value FROM `{table_path}` WHERE id1 != 10000001 ORDER by id1 ASC LIMIT 100; + """, + reversed=False + ) + + self._test_select_query( + f""" + SELECT id1 as id1, value as value FROM `{table_path}` WHERE id1 != 10000001 ORDER by id1 DESC LIMIT 100; + """, + reversed=True + ) + def test_delete_all_rows_after_several_inserts(self): # IMPORTANT note: tests passes with 1 insert_iterations - self._test_two_columns_pk(rows_to_insert=1000, insert_iterations=2) + self._test_delete_all(rows_to_insert=1000, insert_iterations=2) + + def test_update_all_rows_after_several_inserts(self): + # IMPORTANT note: tests passes with 1 insert_iterations + self._test_update_all(rows_to_insert=1000, insert_iterations=2) + # Known failing scenarios: + self._test_update_all(rows_to_insert=2, insert_iterations=2, all_rows_ids=[2125791724, 269299117, 1929453799, 383764224]) + self._test_update_all(rows_to_insert=2, insert_iterations=2, all_rows_ids=[945294066, 1142679124, 175083513, 1218666029]) + self._test_update_all(rows_to_insert=10, insert_iterations=2, + all_rows_ids=[1979586701, 127598743, 231258081, 416119681, 1889436625, 303117720, 155720914, 673430720, + 1732479562, 17223092, 2089986107, 1847206032, 1598948818, 1418309310, 586602536, 630824296, + 434780620, 1254100566, 525597905, 860984260]) |