diff options
author | Semyon <yentsovsemyon@ydb.tech> | 2025-04-07 16:09:48 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-07 13:09:48 +0000 |
commit | 09125181749c9bb85e6d28f0b4bd7860fd150147 (patch) | |
tree | 014fff8b94627640b60c36c268cf37b89f9dc470 | |
parent | 8f3a9de2105706f8728f66f9cbf9ce67138acdf3 (diff) | |
download | ydb-09125181749c9bb85e6d28f0b4bd7860fd150147.tar.gz |
find bounds in CS containers with duplicates (#16723)
-rw-r--r-- | ydb/core/formats/arrow/reader/merger.cpp | 6 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/merger.h | 2 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/position.cpp | 66 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/position.h | 13 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ut/ut_reader.cpp | 58 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ut/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/helpers/get_value.cpp | 17 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp | 2 |
9 files changed, 127 insertions, 40 deletions
diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp index 06b5d2be4b..bdf212696b 100644 --- a/ydb/core/formats/arrow/reader/merger.cpp +++ b/ydb/core/formats/arrow/reader/merger.cpp @@ -214,7 +214,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa return result; } -void TMergePartialStream::SkipToLowerBound(const TSortableBatchPosition& pos, const bool include) { +void TMergePartialStream::SkipToBound(const TSortableBatchPosition& pos, const bool lower) { if (SortHeap.Empty()) { return; } @@ -224,13 +224,13 @@ void TMergePartialStream::SkipToLowerBound(const TSortableBatchPosition& pos, co if (cmpResult == std::partial_ordering::greater) { break; } - if (cmpResult == std::partial_ordering::equivalent && include) { + if (cmpResult == std::partial_ordering::equivalent && lower) { break; } const TSortableBatchPosition::TFoundPosition skipPos = SortHeap.MutableCurrent().SkipToLower(pos); AFL_DEBUG(NKikimrServices::ARROW_HELPER)("pos", pos.DebugJson().GetStringRobust())("heap", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); if (skipPos.IsEqual()) { - if (!include && !SortHeap.MutableCurrent().Next()) { + if (!lower && !SortHeap.MutableCurrent().Next()) { SortHeap.RemoveTop(); } else { SortHeap.UpdateTop(); diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h index c30aba0f38..4950e49bee 100644 --- a/ydb/core/formats/arrow/reader/merger.h +++ b/ydb/core/formats/arrow/reader/merger.h @@ -53,7 +53,7 @@ public: } void PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy); - void SkipToLowerBound(const TSortableBatchPosition& pos, const bool include); + void SkipToBound(const TSortableBatchPosition& pos, const bool lower); void SetPossibleSameVersion(const bool value) { PossibleSameVersionFlag = value; diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp index e4f4d799a5..81e576d9fa 100644 --- a/ydb/core/formats/arrow/reader/position.cpp +++ b/ydb/core/formats/arrow/reader/position.cpp @@ -18,51 +18,51 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const { return result; } -std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(TRWSortableBatchPosition& position, - const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) { +std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindBound(TRWSortableBatchPosition& position, + const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool upper) { ui64 posStart = posStartExt; ui64 posFinish = posFinishExt; auto guard = position.CreateAsymmetricAccessGuard(); + const auto cond = upper ? + [](const std::partial_ordering cmp) { + return cmp == std::partial_ordering::greater; + } : + [](const std::partial_ordering cmp) { + return cmp == std::partial_ordering::greater || cmp == std::partial_ordering::equivalent; + }; + { AFL_VERIFY(guard.InitSortingPosition(posStart)); auto cmp = position.Compare(forFound); - if (cmp == std::partial_ordering::greater) { - return TFoundPosition::Greater(posStart); - } else if (cmp == std::partial_ordering::equivalent) { - return TFoundPosition::Equal(posStart); + if (cond(cmp)) { + return TFoundPosition(posStart, cmp); } } { AFL_VERIFY(guard.InitSortingPosition(posFinish)); auto cmp = position.Compare(forFound); - if (cmp == std::partial_ordering::less) { - return TFoundPosition::Less(posFinish); - } else if (cmp == std::partial_ordering::equivalent) { - return TFoundPosition::Equal(posFinish); + if (!cond(cmp)) { + return std::nullopt; } } - while (posFinish > posStart + 1) { + while (posFinish != posStart + 1) { + AFL_VERIFY(posFinish > posStart + 1)("finish", posFinish)("start", posStart); AFL_VERIFY(guard.InitSortingPosition(0.5 * (posStart + posFinish))); const auto comparision = position.Compare(forFound); - if (comparision == std::partial_ordering::less) { - posStart = position.Position; - } else if (comparision == std::partial_ordering::greater) { + if (cond(comparision)) { posFinish = position.Position; } else { - return TFoundPosition::Equal(position.Position); + posStart = position.Position; } } - AFL_VERIFY(posFinish != posStart); - if (greater) { - AFL_VERIFY(guard.InitSortingPosition(posFinish)); - return TFoundPosition::Greater(posFinish); - } else { - AFL_VERIFY(guard.InitSortingPosition(posStart)); - return TFoundPosition::Less(posStart); - } + AFL_VERIFY(posFinish == posStart + 1)("finish", posFinish)("start", posStart); + AFL_VERIFY(guard.InitSortingPosition(posFinish)); + const auto comparision = position.Compare(forFound); + AFL_VERIFY(cond(comparision)); + return TFoundPosition(posFinish, comparision); } -std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, +std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) { if (!batch || !batch->num_rows()) { return {}; @@ -77,12 +77,11 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi } TRWSortableBatchPosition position = forFound.BuildRWPosition(batch, posStart); - return FindPosition(position, posStart, posFinish, forFound, greater); + return FindBound(position, posStart, posFinish, forFound, greater); } NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(const bool needData, const bool deepCopy) const { - return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort, - deepCopy ? Sorting->BuildCopy(Position) : Sorting, + return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort, deepCopy ? Sorting->BuildCopy(Position) : Sorting, (needData && Data) ? (deepCopy ? Data->BuildCopy(Position) : Data) : nullptr); } @@ -96,9 +95,15 @@ NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::Build } TSortableBatchPosition::TFoundPosition TRWSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) { + AFL_VERIFY(RecordsCount); const ui32 posStart = Position; - auto pos = FindPosition(*this, posStart, ReverseSort ? 0 : (RecordsCount - 1), forFound, true); - AFL_VERIFY(pos)("cursor", DebugJson())("found", forFound.DebugJson()); + AFL_VERIFY(!ReverseSort)("reason", "unimplemented"); + auto pos = FindBound(*this, posStart, RecordsCount - 1, forFound, false); + if (!pos) { + auto guard = CreateAsymmetricAccessGuard(); + AFL_VERIFY(guard.InitSortingPosition(RecordsCount - 1)); + return TFoundPosition(RecordsCount - 1, Compare(forFound)); + } if (ReverseSort) { AFL_VERIFY(Position <= posStart)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true); } else { @@ -133,8 +138,7 @@ TSortableScanData::TSortableScanData( BuildPosition(position); } -TSortableScanData::TSortableScanData( - const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch) { +TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch) { for (auto&& c : batch->columns()) { Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(c)); } diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index f403dd7fe3..4f5278029d 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -314,15 +314,24 @@ public: static TFoundPosition Equal(const ui32 pos) { return TFoundPosition(pos); } + + TFoundPosition(const ui32 pos, const std::partial_ordering cmp) + : Position(pos) { + if (cmp == std::partial_ordering::less) { + GreaterIfNotEqual = false; + } else if (cmp == std::partial_ordering::greater) { + GreaterIfNotEqual = true; + } + } }; [[nodiscard]] bool IsAvailablePosition(const i64 position) const { return 0 <= position && position < RecordsCount; } - static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, + static std::optional<TFoundPosition> FindBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition); - static std::optional<TSortableBatchPosition::TFoundPosition> FindPosition(TRWSortableBatchPosition& position, const ui64 posStart, + static std::optional<TSortableBatchPosition::TFoundPosition> FindBound(TRWSortableBatchPosition& position, const ui64 posStart, const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater); const TSortableScanData& GetData() const { diff --git a/ydb/core/formats/arrow/ut/ut_reader.cpp b/ydb/core/formats/arrow/ut/ut_reader.cpp new file mode 100644 index 0000000000..68b1cd2090 --- /dev/null +++ b/ydb/core/formats/arrow/ut/ut_reader.cpp @@ -0,0 +1,58 @@ +#include <ydb/core/formats/arrow/reader/position.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NArrow { + +Y_UNIT_TEST_SUITE(SortableBatchPosition) { + Y_UNIT_TEST(FindPosition) { + std::shared_ptr<arrow::RecordBatch> data; + std::shared_ptr<arrow::Schema> schema = + std::make_shared<arrow::Schema>(arrow::Schema({ std::make_shared<arrow::Field>("class", std::make_shared<arrow::StringType>()), + std::make_shared<arrow::Field>("name", std::make_shared<arrow::StringType>()) })); + { + std::unique_ptr<arrow::RecordBatchBuilder> batchBuilder; + UNIT_ASSERT(arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), &batchBuilder).ok()); + + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("c").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("c").ok()); + + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok()); + + UNIT_ASSERT(batchBuilder->Flush(&data).ok()); + } + + std::shared_ptr<arrow::RecordBatch> search; + { + std::unique_ptr<arrow::RecordBatchBuilder> batchBuilder; + UNIT_ASSERT(arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), &batchBuilder).ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok()); + UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok()); + UNIT_ASSERT(batchBuilder->Flush(&search).ok()); + } + + NMerger::TSortableBatchPosition searchPosition(search, 0, false); + { + auto findPosition = NMerger::TSortableBatchPosition::FindBound(data, searchPosition, false, std::nullopt); + UNIT_ASSERT(!!findPosition); + UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 2); + } + + { + auto findPosition = NMerger::TSortableBatchPosition::FindBound(data, searchPosition, true, std::nullopt); + UNIT_ASSERT(!!findPosition); + UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 4); + } + } +} + +} // namespace NKikimr::NArrow diff --git a/ydb/core/formats/arrow/ut/ya.make b/ydb/core/formats/arrow/ut/ya.make index 6d82dd0a6d..f3f2f25501 100644 --- a/ydb/core/formats/arrow/ut/ya.make +++ b/ydb/core/formats/arrow/ut/ya.make @@ -33,6 +33,7 @@ SRCS( ut_dictionary.cpp ut_column_filter.cpp ut_hash.cpp + ut_reader.cpp ) END() diff --git a/ydb/core/kqp/ut/olap/helpers/get_value.cpp b/ydb/core/kqp/ut/olap/helpers/get_value.cpp index 617e78da59..5e39202426 100644 --- a/ydb/core/kqp/ut/olap/helpers/get_value.cpp +++ b/ydb/core/kqp/ut/olap/helpers/get_value.cpp @@ -61,9 +61,24 @@ void PrintValue(IOutputStream& out, const NYdb::TValue& v) { out << value.GetBool(); break; } + case NYdb::EPrimitiveType::String: + { + out << value.GetString(); + break; + } + case NYdb::EPrimitiveType::Json: + { + out << value.GetJson(); + break; + } + case NYdb::EPrimitiveType::JsonDocument: + { + out << value.GetJsonDocument(); + break; + } default: { - UNIT_ASSERT_C(false, "PrintValue not iplemented for this type"); + UNIT_ASSERT_C(false, TStringBuilder() << "PrintValue not iplemented for this type: " << (ui64)value.GetPrimitiveType()); } } } diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index a1301d8d28..49618aec95 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1071,7 +1071,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { for (const auto& r : rows) { TInstant ts = GetTimestamp(r.at("timestamp")); UNIT_ASSERT_GE_C(ts, tsPrev, "result is not sorted in ASC order"); - UNIT_ASSERT(results.erase(ts.GetValue())); + UNIT_ASSERT_C(results.erase(ts.GetValue()), Sprintf("%d", ts.GetValue())); tsPrev = ts; } UNIT_ASSERT(rows.size() == 6); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index 1432002370..9c351d4f78 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -142,7 +142,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() { } } Merger->PutControlPoint(MergingContext->GetFinish(), false); - Merger->SkipToLowerBound(MergingContext->GetStart(), MergingContext->GetIncludeStart()); + Merger->SkipToBound(MergingContext->GetStart(), MergingContext->GetIncludeStart()); const ui32 originalSourcesCount = Sources.size(); Sources.clear(); |