aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon <yentsovsemyon@ydb.tech>2025-04-07 16:09:48 +0300
committerGitHub <noreply@github.com>2025-04-07 13:09:48 +0000
commit09125181749c9bb85e6d28f0b4bd7860fd150147 (patch)
tree014fff8b94627640b60c36c268cf37b89f9dc470
parent8f3a9de2105706f8728f66f9cbf9ce67138acdf3 (diff)
downloadydb-09125181749c9bb85e6d28f0b4bd7860fd150147.tar.gz
find bounds in CS containers with duplicates (#16723)
-rw-r--r--ydb/core/formats/arrow/reader/merger.cpp6
-rw-r--r--ydb/core/formats/arrow/reader/merger.h2
-rw-r--r--ydb/core/formats/arrow/reader/position.cpp66
-rw-r--r--ydb/core/formats/arrow/reader/position.h13
-rw-r--r--ydb/core/formats/arrow/ut/ut_reader.cpp58
-rw-r--r--ydb/core/formats/arrow/ut/ya.make1
-rw-r--r--ydb/core/kqp/ut/olap/helpers/get_value.cpp17
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp2
9 files changed, 127 insertions, 40 deletions
diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp
index 06b5d2be4b..bdf212696b 100644
--- a/ydb/core/formats/arrow/reader/merger.cpp
+++ b/ydb/core/formats/arrow/reader/merger.cpp
@@ -214,7 +214,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
return result;
}
-void TMergePartialStream::SkipToLowerBound(const TSortableBatchPosition& pos, const bool include) {
+void TMergePartialStream::SkipToBound(const TSortableBatchPosition& pos, const bool lower) {
if (SortHeap.Empty()) {
return;
}
@@ -224,13 +224,13 @@ void TMergePartialStream::SkipToLowerBound(const TSortableBatchPosition& pos, co
if (cmpResult == std::partial_ordering::greater) {
break;
}
- if (cmpResult == std::partial_ordering::equivalent && include) {
+ if (cmpResult == std::partial_ordering::equivalent && lower) {
break;
}
const TSortableBatchPosition::TFoundPosition skipPos = SortHeap.MutableCurrent().SkipToLower(pos);
AFL_DEBUG(NKikimrServices::ARROW_HELPER)("pos", pos.DebugJson().GetStringRobust())("heap", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
if (skipPos.IsEqual()) {
- if (!include && !SortHeap.MutableCurrent().Next()) {
+ if (!lower && !SortHeap.MutableCurrent().Next()) {
SortHeap.RemoveTop();
} else {
SortHeap.UpdateTop();
diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h
index c30aba0f38..4950e49bee 100644
--- a/ydb/core/formats/arrow/reader/merger.h
+++ b/ydb/core/formats/arrow/reader/merger.h
@@ -53,7 +53,7 @@ public:
}
void PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy);
- void SkipToLowerBound(const TSortableBatchPosition& pos, const bool include);
+ void SkipToBound(const TSortableBatchPosition& pos, const bool lower);
void SetPossibleSameVersion(const bool value) {
PossibleSameVersionFlag = value;
diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp
index e4f4d799a5..81e576d9fa 100644
--- a/ydb/core/formats/arrow/reader/position.cpp
+++ b/ydb/core/formats/arrow/reader/position.cpp
@@ -18,51 +18,51 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
return result;
}
-std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(TRWSortableBatchPosition& position,
- const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) {
+std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindBound(TRWSortableBatchPosition& position,
+ const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool upper) {
ui64 posStart = posStartExt;
ui64 posFinish = posFinishExt;
auto guard = position.CreateAsymmetricAccessGuard();
+ const auto cond = upper ?
+ [](const std::partial_ordering cmp) {
+ return cmp == std::partial_ordering::greater;
+ } :
+ [](const std::partial_ordering cmp) {
+ return cmp == std::partial_ordering::greater || cmp == std::partial_ordering::equivalent;
+ };
+
{
AFL_VERIFY(guard.InitSortingPosition(posStart));
auto cmp = position.Compare(forFound);
- if (cmp == std::partial_ordering::greater) {
- return TFoundPosition::Greater(posStart);
- } else if (cmp == std::partial_ordering::equivalent) {
- return TFoundPosition::Equal(posStart);
+ if (cond(cmp)) {
+ return TFoundPosition(posStart, cmp);
}
}
{
AFL_VERIFY(guard.InitSortingPosition(posFinish));
auto cmp = position.Compare(forFound);
- if (cmp == std::partial_ordering::less) {
- return TFoundPosition::Less(posFinish);
- } else if (cmp == std::partial_ordering::equivalent) {
- return TFoundPosition::Equal(posFinish);
+ if (!cond(cmp)) {
+ return std::nullopt;
}
}
- while (posFinish > posStart + 1) {
+ while (posFinish != posStart + 1) {
+ AFL_VERIFY(posFinish > posStart + 1)("finish", posFinish)("start", posStart);
AFL_VERIFY(guard.InitSortingPosition(0.5 * (posStart + posFinish)));
const auto comparision = position.Compare(forFound);
- if (comparision == std::partial_ordering::less) {
- posStart = position.Position;
- } else if (comparision == std::partial_ordering::greater) {
+ if (cond(comparision)) {
posFinish = position.Position;
} else {
- return TFoundPosition::Equal(position.Position);
+ posStart = position.Position;
}
}
- AFL_VERIFY(posFinish != posStart);
- if (greater) {
- AFL_VERIFY(guard.InitSortingPosition(posFinish));
- return TFoundPosition::Greater(posFinish);
- } else {
- AFL_VERIFY(guard.InitSortingPosition(posStart));
- return TFoundPosition::Less(posStart);
- }
+ AFL_VERIFY(posFinish == posStart + 1)("finish", posFinish)("start", posStart);
+ AFL_VERIFY(guard.InitSortingPosition(posFinish));
+ const auto comparision = position.Compare(forFound);
+ AFL_VERIFY(cond(comparision));
+ return TFoundPosition(posFinish, comparision);
}
-std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch,
+std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindBound(const std::shared_ptr<arrow::RecordBatch>& batch,
const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) {
if (!batch || !batch->num_rows()) {
return {};
@@ -77,12 +77,11 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
}
TRWSortableBatchPosition position = forFound.BuildRWPosition(batch, posStart);
- return FindPosition(position, posStart, posFinish, forFound, greater);
+ return FindBound(position, posStart, posFinish, forFound, greater);
}
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(const bool needData, const bool deepCopy) const {
- return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort,
- deepCopy ? Sorting->BuildCopy(Position) : Sorting,
+ return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort, deepCopy ? Sorting->BuildCopy(Position) : Sorting,
(needData && Data) ? (deepCopy ? Data->BuildCopy(Position) : Data) : nullptr);
}
@@ -96,9 +95,15 @@ NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::Build
}
TSortableBatchPosition::TFoundPosition TRWSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) {
+ AFL_VERIFY(RecordsCount);
const ui32 posStart = Position;
- auto pos = FindPosition(*this, posStart, ReverseSort ? 0 : (RecordsCount - 1), forFound, true);
- AFL_VERIFY(pos)("cursor", DebugJson())("found", forFound.DebugJson());
+ AFL_VERIFY(!ReverseSort)("reason", "unimplemented");
+ auto pos = FindBound(*this, posStart, RecordsCount - 1, forFound, false);
+ if (!pos) {
+ auto guard = CreateAsymmetricAccessGuard();
+ AFL_VERIFY(guard.InitSortingPosition(RecordsCount - 1));
+ return TFoundPosition(RecordsCount - 1, Compare(forFound));
+ }
if (ReverseSort) {
AFL_VERIFY(Position <= posStart)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true);
} else {
@@ -133,8 +138,7 @@ TSortableScanData::TSortableScanData(
BuildPosition(position);
}
-TSortableScanData::TSortableScanData(
- const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch) {
+TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch) {
for (auto&& c : batch->columns()) {
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(c));
}
diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h
index f403dd7fe3..4f5278029d 100644
--- a/ydb/core/formats/arrow/reader/position.h
+++ b/ydb/core/formats/arrow/reader/position.h
@@ -314,15 +314,24 @@ public:
static TFoundPosition Equal(const ui32 pos) {
return TFoundPosition(pos);
}
+
+ TFoundPosition(const ui32 pos, const std::partial_ordering cmp)
+ : Position(pos) {
+ if (cmp == std::partial_ordering::less) {
+ GreaterIfNotEqual = false;
+ } else if (cmp == std::partial_ordering::greater) {
+ GreaterIfNotEqual = true;
+ }
+ }
};
[[nodiscard]] bool IsAvailablePosition(const i64 position) const {
return 0 <= position && position < RecordsCount;
}
- static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound,
+ static std::optional<TFoundPosition> FindBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound,
const bool needGreater, const std::optional<ui32> includedStartPosition);
- static std::optional<TSortableBatchPosition::TFoundPosition> FindPosition(TRWSortableBatchPosition& position, const ui64 posStart,
+ static std::optional<TSortableBatchPosition::TFoundPosition> FindBound(TRWSortableBatchPosition& position, const ui64 posStart,
const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater);
const TSortableScanData& GetData() const {
diff --git a/ydb/core/formats/arrow/ut/ut_reader.cpp b/ydb/core/formats/arrow/ut/ut_reader.cpp
new file mode 100644
index 0000000000..68b1cd2090
--- /dev/null
+++ b/ydb/core/formats/arrow/ut/ut_reader.cpp
@@ -0,0 +1,58 @@
+#include <ydb/core/formats/arrow/reader/position.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NArrow {
+
+Y_UNIT_TEST_SUITE(SortableBatchPosition) {
+ Y_UNIT_TEST(FindPosition) {
+ std::shared_ptr<arrow::RecordBatch> data;
+ std::shared_ptr<arrow::Schema> schema =
+ std::make_shared<arrow::Schema>(arrow::Schema({ std::make_shared<arrow::Field>("class", std::make_shared<arrow::StringType>()),
+ std::make_shared<arrow::Field>("name", std::make_shared<arrow::StringType>()) }));
+ {
+ std::unique_ptr<arrow::RecordBatchBuilder> batchBuilder;
+ UNIT_ASSERT(arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), &batchBuilder).ok());
+
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("c").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("c").ok());
+
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok());
+
+ UNIT_ASSERT(batchBuilder->Flush(&data).ok());
+ }
+
+ std::shared_ptr<arrow::RecordBatch> search;
+ {
+ std::unique_ptr<arrow::RecordBatchBuilder> batchBuilder;
+ UNIT_ASSERT(arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), &batchBuilder).ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
+ UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok());
+ UNIT_ASSERT(batchBuilder->Flush(&search).ok());
+ }
+
+ NMerger::TSortableBatchPosition searchPosition(search, 0, false);
+ {
+ auto findPosition = NMerger::TSortableBatchPosition::FindBound(data, searchPosition, false, std::nullopt);
+ UNIT_ASSERT(!!findPosition);
+ UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 2);
+ }
+
+ {
+ auto findPosition = NMerger::TSortableBatchPosition::FindBound(data, searchPosition, true, std::nullopt);
+ UNIT_ASSERT(!!findPosition);
+ UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 4);
+ }
+ }
+}
+
+} // namespace NKikimr::NArrow
diff --git a/ydb/core/formats/arrow/ut/ya.make b/ydb/core/formats/arrow/ut/ya.make
index 6d82dd0a6d..f3f2f25501 100644
--- a/ydb/core/formats/arrow/ut/ya.make
+++ b/ydb/core/formats/arrow/ut/ya.make
@@ -33,6 +33,7 @@ SRCS(
ut_dictionary.cpp
ut_column_filter.cpp
ut_hash.cpp
+ ut_reader.cpp
)
END()
diff --git a/ydb/core/kqp/ut/olap/helpers/get_value.cpp b/ydb/core/kqp/ut/olap/helpers/get_value.cpp
index 617e78da59..5e39202426 100644
--- a/ydb/core/kqp/ut/olap/helpers/get_value.cpp
+++ b/ydb/core/kqp/ut/olap/helpers/get_value.cpp
@@ -61,9 +61,24 @@ void PrintValue(IOutputStream& out, const NYdb::TValue& v) {
out << value.GetBool();
break;
}
+ case NYdb::EPrimitiveType::String:
+ {
+ out << value.GetString();
+ break;
+ }
+ case NYdb::EPrimitiveType::Json:
+ {
+ out << value.GetJson();
+ break;
+ }
+ case NYdb::EPrimitiveType::JsonDocument:
+ {
+ out << value.GetJsonDocument();
+ break;
+ }
default:
{
- UNIT_ASSERT_C(false, "PrintValue not iplemented for this type");
+ UNIT_ASSERT_C(false, TStringBuilder() << "PrintValue not iplemented for this type: " << (ui64)value.GetPrimitiveType());
}
}
}
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index a1301d8d28..49618aec95 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -1071,7 +1071,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
for (const auto& r : rows) {
TInstant ts = GetTimestamp(r.at("timestamp"));
UNIT_ASSERT_GE_C(ts, tsPrev, "result is not sorted in ASC order");
- UNIT_ASSERT(results.erase(ts.GetValue()));
+ UNIT_ASSERT_C(results.erase(ts.GetValue()), Sprintf("%d", ts.GetValue()));
tsPrev = ts;
}
UNIT_ASSERT(rows.size() == 6);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
index 1432002370..9c351d4f78 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
@@ -142,7 +142,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
}
}
Merger->PutControlPoint(MergingContext->GetFinish(), false);
- Merger->SkipToLowerBound(MergingContext->GetStart(), MergingContext->GetIncludeStart());
+ Merger->SkipToBound(MergingContext->GetStart(), MergingContext->GetIncludeStart());
const ui32 originalSourcesCount = Sources.size();
Sources.clear();