diff options
author | Semyon <yentsovsemyon@ydb.tech> | 2025-05-22 13:40:05 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-22 13:40:05 +0300 |
commit | 4efc436ed990116e924e258e4b918a15ddbce6d3 (patch) | |
tree | 102a2e59b11d10473fdafe898fcd7e6d045fbfd5 | |
parent | 67eb9eccb1d9f623dcf3a0fa00a3cab6957bb49f (diff) | |
download | ydb-4efc436ed990116e924e258e4b918a15ddbce6d3.tar.gz |
merge result builder customization on CS (#18481)
-rw-r--r-- | ydb/core/formats/arrow/reader/batch_iterator.h | 10 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/merger.cpp | 82 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/merger.h | 157 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/position.cpp | 13 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/position.h | 19 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/result_builder.cpp | 10 | ||||
-rw-r--r-- | ydb/core/formats/arrow/reader/result_builder.h | 12 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ut/ut_arrow.cpp | 98 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp | 4 |
12 files changed, 291 insertions, 119 deletions
diff --git a/ydb/core/formats/arrow/reader/batch_iterator.h b/ydb/core/formats/arrow/reader/batch_iterator.h index d3bb365d570..91d47d78041 100644 --- a/ydb/core/formats/arrow/reader/batch_iterator.h +++ b/ydb/core/formats/arrow/reader/batch_iterator.h @@ -11,6 +11,7 @@ private: TRWSortableBatchPosition VersionColumns; i64 RecordsCount; int ReverseSortKff; + YDB_READONLY(ui64, SourceId, 0); std::shared_ptr<NArrow::TColumnFilter> Filter; std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator; @@ -50,14 +51,17 @@ public: } template <class TDataContainer> - TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter, - const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::vector<std::string>& versionColumnNames) + TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const arrow::Schema& keySchema, + const arrow::Schema& dataSchema, const bool reverseSort, const std::vector<std::string>& versionColumnNames, const ui64 sourceId) : ControlPointFlag(false) - , KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort) + , KeyColumns(batch, 0, keySchema.field_names(), dataSchema.field_names(), reverseSort) , VersionColumns(batch, 0, versionColumnNames, {}, false) , RecordsCount(batch->num_rows()) , ReverseSortKff(reverseSort ? -1 : 1) + , SourceId(sourceId) , Filter(filter) { + AFL_VERIFY(KeyColumns.IsSameSortingSchema(keySchema))("batch", KeyColumns.DebugJson())("schema", keySchema.ToString()); + AFL_VERIFY(KeyColumns.IsSameDataSchema(dataSchema))("batch", KeyColumns.DebugJson())("schema", dataSchema.ToString()); Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition())); Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition())); if (Filter) { diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp index bdf212696b8..c12de2936ac 100644 --- a/ydb/core/formats/arrow/reader/merger.cpp +++ b/ydb/core/formats/arrow/reader/merger.cpp @@ -6,7 +6,7 @@ namespace NKikimr::NArrow::NMerger { void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy) { - AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString()); + AFL_VERIFY(point.IsSameSortingSchema(*SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString()); Y_ABORT_UNLESS(point.IsReverseSort() == Reverse); Y_ABORT_UNLESS(++ControlPoints == 1); @@ -37,39 +37,6 @@ void TMergePartialStream::CheckSequenceInDebug(const TRWSortableBatchPosition& n #endif } -bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition) { - AFL_VERIFY(ControlPoints == 1); - Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); - builder.ValidateDataSchema(DataSchema); - bool cpReachedFlag = false; - std::shared_ptr<TSortableScanData> resultScanData; - ui64 resultPosition; - while (SortHeap.Size() && !cpReachedFlag && !builder.IsBufferExhausted()) { - if (SortHeap.Current().IsControlPoint()) { - auto keyColumns = SortHeap.Current().GetKeyColumns().BuildSortingCursor(); - RemoveControlPoint(); - cpReachedFlag = true; - if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(keyColumns) == std::partial_ordering::greater) { - if (lastResultPosition && resultScanData) { - *lastResultPosition = resultScanData->BuildCursor(resultPosition); - } - return true; - } - } - - DrainCurrentPosition(&builder, &resultScanData, &resultPosition); - } - if (lastResultPosition && resultScanData) { - *lastResultPosition = resultScanData->BuildCursor(resultPosition); - } - return cpReachedFlag; -} - -bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) { - PutControlPoint(readTo, false); - return DrainToControlPoint(builder, includeFinish, lastResultPosition); -} - std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) { std::shared_ptr<arrow::Table> result; if (SortHeap.Empty()) { @@ -143,53 +110,6 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort return result; } -void TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { - Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); - while (SortHeap.Size()) { - DrainCurrentPosition(&builder, nullptr, nullptr); - } -} - -void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition) { - Y_ABORT_UNLESS(SortHeap.Size()); - Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint()); - if (!SortHeap.Current().IsDeleted()) { -// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_add", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); - if (builder) { - builder->AddRecord(SortHeap.Current().GetKeyColumns()); - } - if (resultScanData && resultPosition) { - *resultScanData = SortHeap.Current().GetKeyColumns().GetSorting(); - *resultPosition = SortHeap.Current().GetKeyColumns().GetPosition(); - } - } else { -// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); - } - CheckSequenceInDebug(SortHeap.Current().GetKeyColumns()); - const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition(); - const TSortableScanData* startSorting = SortHeap.Current().GetKeyColumns().GetSorting().get(); - const TSortableScanData* startVersion = SortHeap.Current().GetVersionColumns().GetSorting().get(); - bool isFirst = true; - while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) { - if (!isFirst) { -// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip1", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); - auto& anotherIterator = SortHeap.Current(); - if (PossibleSameVersionFlag) { - AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater) - ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) - ("key", startSorting->BuildCursor(startPosition).DebugJson()); - } else { - AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) == std::partial_ordering::less) - ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) - ("key", startSorting->BuildCursor(startPosition).DebugJson()); - } - } - SortHeap.Next(); - isFirst = false; - } - SortHeap.CleanFinished(); -} - std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const TIntervalPositions& positions, const std::vector<std::shared_ptr<arrow::Field>>& resultFields) { diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h index 4950e49bee3..b49e1b6a67e 100644 --- a/ydb/core/formats/arrow/reader/merger.h +++ b/ydb/core/formats/arrow/reader/merger.h @@ -1,13 +1,20 @@ #pragma once -#include "position.h" -#include "heap.h" -#include "result_builder.h" #include "batch_iterator.h" +#include "heap.h" +#include "position.h" #include <ydb/core/formats/arrow/arrow_filter.h> namespace NKikimr::NArrow::NMerger { +template <typename T> +concept MergeResultBuilder = requires(const T& constT, T& mutT, const std::shared_ptr<arrow::Schema>& schema, const TBatchIterator& cursor) { + { constT.IsBufferExhausted() } -> std::same_as<bool>; + { constT.ValidateDataSchema(schema) } -> std::same_as<void>; + { mutT.AddRecord(cursor) } -> std::same_as<void>; + { mutT.SkipRecord(cursor) } -> std::same_as<void>; +}; + class TMergePartialStream { private: #ifndef NDEBUG @@ -19,6 +26,7 @@ private: std::shared_ptr<arrow::Schema> DataSchema; const bool Reverse; const std::vector<std::string> VersionColumnNames; + std::optional<TCursor> MaxVersion; ui32 ControlPoints = 0; TSortingHeap<TBatchIterator> SortHeap; @@ -34,19 +42,92 @@ private: return result; } - void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition); + template <MergeResultBuilder TBuilder> + [[nodiscard]] bool DrainCurrentPosition(TBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition) { + Y_ABORT_UNLESS(SortHeap.Size()); + Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint()); + CheckSequenceInDebug(SortHeap.Current().GetKeyColumns()); + + const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition(); + const TSortableScanData* startSorting = SortHeap.Current().GetKeyColumns().GetSorting().get(); + const TSortableScanData* startVersion = SortHeap.Current().GetVersionColumns().GetSorting().get(); + + if (MaxVersion) { + bool skippedPk = false; + while (SortHeap.Size() && SortHeap.Current().GetVersionColumns().Compare(*MaxVersion) == std::partial_ordering::greater && !skippedPk) { + if (builder) { + builder->SkipRecord(SortHeap.Current()); + } + SortHeap.Next(); + if (SortHeap.Empty() || + SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) != std::partial_ordering::equivalent) { + skippedPk = true; + } + } + if (skippedPk) { + SortHeap.CleanFinished(); + return false; + } + } + + bool foundResult = false; + if (!SortHeap.Current().IsDeleted()) { + foundResult = true; + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_add", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); + if (builder) { + builder->AddRecord(SortHeap.Current()); + } + if (resultScanData && resultPosition) { + *resultScanData = SortHeap.Current().GetKeyColumns().GetSorting(); + *resultPosition = SortHeap.Current().GetKeyColumns().GetPosition(); + } + } else { + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); + if (builder) { + builder->SkipRecord(SortHeap.Current()); + } + } + SortHeap.Next(); + + while ( + SortHeap.Size() && (SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) { + if (builder) { + builder->SkipRecord(SortHeap.Current()); + } + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip1", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust()); + auto& anotherIterator = SortHeap.Current(); + if (PossibleSameVersionFlag) { + AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater) + ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())( + "key", startSorting->BuildCursor(startPosition).DebugJson()); + } else { + AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) == std::partial_ordering::less) + ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())( + "key", startSorting->BuildCursor(startPosition).DebugJson()); + } + SortHeap.Next(); + } + SortHeap.CleanFinished(); + return foundResult; + } void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition); - bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, - std::optional<TCursor>* lastResultPosition = nullptr); + + template <MergeResultBuilder TBuilder> + bool DrainCurrentTo(TBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, + std::optional<TCursor>* lastResultPosition = nullptr) { + PutControlPoint(readTo, false); + return DrainToControlPoint(builder, includeFinish, lastResultPosition); + } public: - TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames) + TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, + const std::vector<std::string>& versionColumnNames, const std::optional<TCursor>& maxVersion) : SortSchema(sortSchema) , DataSchema(dataSchema) , Reverse(reverse) , VersionColumnNames(versionColumnNames) - { + , MaxVersion(maxVersion) { Y_ABORT_UNLESS(SortSchema); Y_ABORT_UNLESS(SortSchema->num_fields()); Y_ABORT_UNLESS(!DataSchema || DataSchema->num_fields()); @@ -78,25 +159,67 @@ public: } template <class TDataContainer> - void AddSource(const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter) { + void AddSource(const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter, + const std::optional<ui64> sourceIdExt = std::nullopt) { + const ui64 sourceId = sourceIdExt.value_or(SortHeap.Size()); if (!batch || !batch->num_rows()) { return; } -// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema)); + // Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema)); const bool isDenyFilter = filter && filter->IsTotalDenyFilter(); auto filterImpl = (!filter || filter->IsTotalAllowFilter()) ? nullptr : filter; - SortHeap.Push(TBatchIterator(batch, filterImpl, SortSchema->field_names(), (!isDenyFilter && DataSchema) ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames)); + static const arrow::Schema emptySchema = arrow::Schema(arrow::FieldVector()); + TBatchIterator iterator( + batch, filterImpl, *SortSchema, (!isDenyFilter && DataSchema) ? *DataSchema : emptySchema, Reverse, VersionColumnNames, sourceId); + if (MaxVersion) { + MaxVersion->ValidateSchema(*iterator.GetVersionColumns().GetSorting()); + } + SortHeap.Push(std::move(iterator)); } bool IsEmpty() const { return !SortHeap.Size(); } - void DrainAll(TRecordBatchBuilder& builder); - std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr); - bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr); - std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const TIntervalPositions& positions, - const std::vector<std::shared_ptr<arrow::Field>>& resultFields); + template <MergeResultBuilder TBuilder> + void DrainAll(TBuilder& builder) { + builder.ValidateDataSchema(DataSchema); + while (SortHeap.Size()) { + Y_UNUSED(DrainCurrentPosition(&builder, nullptr, nullptr)); + } + } + std::shared_ptr<arrow::Table> SingleSourceDrain( + const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr); + std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts( + const TIntervalPositions& positions, const std::vector<std::shared_ptr<arrow::Field>>& resultFields); + + template <MergeResultBuilder TBuilder> + bool DrainToControlPoint(TBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr) { + AFL_VERIFY(ControlPoints == 1); + builder.ValidateDataSchema(DataSchema); + bool cpReachedFlag = false; + std::shared_ptr<TSortableScanData> resultScanData; + ui64 resultPosition; + while (SortHeap.Size() && !cpReachedFlag && !builder.IsBufferExhausted()) { + if (SortHeap.Current().IsControlPoint()) { + auto keyColumns = SortHeap.Current().GetKeyColumns().BuildSortingCursor(); + RemoveControlPoint(); + cpReachedFlag = true; + if (SortHeap.Empty() || !includeFinish || + SortHeap.Current().GetKeyColumns().Compare(keyColumns) == std::partial_ordering::greater) { + if (lastResultPosition && resultScanData) { + *lastResultPosition = resultScanData->BuildCursor(resultPosition); + } + return true; + } + } + Y_UNUSED(DrainCurrentPosition(&builder, &resultScanData, &resultPosition)); + } + if (lastResultPosition && resultScanData) { + *lastResultPosition = resultScanData->BuildCursor(resultPosition); + } + return cpReachedFlag; + } }; -} +} // namespace NKikimr::NArrow::NMerger diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp index 02ddbaf2e26..ae8ecdc8317 100644 --- a/ydb/core/formats/arrow/reader/position.cpp +++ b/ydb/core/formats/arrow/reader/position.cpp @@ -32,14 +32,14 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi }; { - AFL_VERIFY(guard.InitSortingPosition(posStart)); + AFL_VERIFY(guard.InitSortingPosition(posStart))("start", posStart)("finish", posFinish); auto cmp = position.Compare(forFound); if (cond(cmp)) { return TFoundPosition(posStart, cmp); } } { - AFL_VERIFY(guard.InitSortingPosition(posFinish)); + AFL_VERIFY(guard.InitSortingPosition(posFinish))("start", posStart)("finish", posFinish); auto cmp = position.Compare(forFound); if (!cond(cmp)) { return std::nullopt; @@ -266,6 +266,15 @@ std::partial_ordering TCursor::Compare(const TCursor& item) const { return std::partial_ordering::equivalent; } +void TCursor::ValidateSchema(const TSortableScanData& position) const { + AFL_VERIFY(position.GetFields().size() == PositionAddress.size()); + for (ui64 i = 0; i < PositionAddress.size(); ++i) { + const auto& posType = position.GetFields()[i]->type(); + const auto& cursorType = PositionAddress[i].GetArray()->type(); + AFL_VERIFY(posType->Equals(cursorType))("pos", posType->ToString())("cursor", cursorType->ToString()); + } +} + void TCursor::AppendPositionTo(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, ui64* recordSize) const { AFL_VERIFY(builders.size() == PositionAddress.size()); for (ui32 i = 0; i < PositionAddress.size(); ++i) { diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index 973abfc92ac..5508fb0d3b4 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -56,6 +56,8 @@ public: std::partial_ordering Compare(const TSortableScanData& item, const ui64 itemPosition) const; std::partial_ordering Compare(const TCursor& item) const; + + void ValidateSchema(const TSortableScanData& position) const; }; class TSortableScanData { @@ -175,15 +177,15 @@ public: return arrow::Table::Make(std::make_shared<arrow::Schema>(Fields), slicedArrays, count); } - bool IsSameSchema(const std::shared_ptr<arrow::Schema>& schema) const { - if (Fields.size() != (size_t)schema->num_fields()) { + bool IsSameSchema(const arrow::Schema& schema) const { + if (Fields.size() != (size_t)schema.num_fields()) { return false; } for (ui32 i = 0; i < Fields.size(); ++i) { - if (!Fields[i]->type()->Equals(schema->field(i)->type())) { + if (!Fields[i]->type()->Equals(schema.field(i)->type())) { return false; } - if (Fields[i]->name() != schema->field(i)->name()) { + if (Fields[i]->name() != schema.field(i)->name()) { return false; } } @@ -360,10 +362,17 @@ public: TRWSortableBatchPosition BuildRWPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const; - bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) const { + bool IsSameSortingSchema(const arrow::Schema& schema) const { return Sorting->IsSameSchema(schema); } + bool IsSameDataSchema(const arrow::Schema& schema) const { + if (!Data) { + return schema.num_fields() == 0; + } + return Data->IsSameSchema(schema); + } + template <class TRecords> TSortableBatchPosition(const std::shared_ptr<TRecords>& batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort) diff --git a/ydb/core/formats/arrow/reader/result_builder.cpp b/ydb/core/formats/arrow/reader/result_builder.cpp index 795e693fe2a..ec942e7befa 100644 --- a/ydb/core/formats/arrow/reader/result_builder.cpp +++ b/ydb/core/formats/arrow/reader/result_builder.cpp @@ -9,10 +9,17 @@ namespace NKikimr::NArrow::NMerger { -void TRecordBatchBuilder::ValidateDataSchema(const std::shared_ptr<arrow::Schema>& schema) { +void TRecordBatchBuilder::ValidateDataSchema(const std::shared_ptr<arrow::Schema>& schema) const { AFL_VERIFY(IsSameFieldsSequence(schema->fields(), Fields)); } +void TRecordBatchBuilder::AddRecord(const TBatchIterator& cursor) { + AddRecord(cursor.GetKeyColumns()); +} + +void TRecordBatchBuilder::SkipRecord(const TBatchIterator& /*cursor*/) { +} + void TRecordBatchBuilder::AddRecord(const TCursor& position) { // AFL_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields)); // AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson()); @@ -58,6 +65,7 @@ TRecordBatchBuilder::TRecordBatchBuilder(const std::vector<std::shared_ptr<arrow NArrow::TStatusValidator::Validate(Builders.back()->Reserve(*rowsCountExpectation)); } } + AFL_VERIFY(Fields.size() == Builders.size()); } std::shared_ptr<arrow::RecordBatch> TRecordBatchBuilder::Finalize() { diff --git a/ydb/core/formats/arrow/reader/result_builder.h b/ydb/core/formats/arrow/reader/result_builder.h index 786c68f1ed7..de1c9df25ad 100644 --- a/ydb/core/formats/arrow/reader/result_builder.h +++ b/ydb/core/formats/arrow/reader/result_builder.h @@ -1,8 +1,13 @@ #pragma once #include "position.h" + +#include <ydb/core/formats/arrow/reader/merger.h> + #include <ydb/library/accessor/accessor.h> + #include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> #include <util/system/types.h> + #include <optional> namespace NKikimr::NArrow::NMerger { @@ -15,7 +20,7 @@ private: YDB_ACCESSOR_DEF(std::optional<ui32>, MemoryBufferLimit); ui64 CurrentBytesUsed = 0; - bool IsSameFieldsSequence(const std::vector<std::shared_ptr<arrow::Field>>& f1, const std::vector<std::shared_ptr<arrow::Field>>& f2); + static bool IsSameFieldsSequence(const std::vector<std::shared_ptr<arrow::Field>>& f1, const std::vector<std::shared_ptr<arrow::Field>>& f2); public: @@ -60,7 +65,8 @@ public: } void AddRecord(const TCursor& position); void AddRecord(const TRWSortableBatchPosition& position); - void ValidateDataSchema(const std::shared_ptr<arrow::Schema>& schema); + void ValidateDataSchema(const std::shared_ptr<arrow::Schema>& schema) const; + void AddRecord(const TBatchIterator& cursor); + void SkipRecord(const TBatchIterator& cursor); }; - } diff --git a/ydb/core/formats/arrow/ut/ut_arrow.cpp b/ydb/core/formats/arrow/ut/ut_arrow.cpp index bc46b4f329f..ccbf117a5ee 100644 --- a/ydb/core/formats/arrow/ut/ut_arrow.cpp +++ b/ydb/core/formats/arrow/ut/ut_arrow.cpp @@ -691,7 +691,8 @@ Y_UNIT_TEST_SUITE(ArrowTest) { { NArrow::NMerger::TRecordBatchBuilder builder(batch->schema()->fields()); const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()}; - auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), false, vColumns); + auto merger = + std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), false, vColumns, std::nullopt); for (auto&& i : batches) { merger->AddSource(i, nullptr); } @@ -718,7 +719,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { { NArrow::NMerger::TRecordBatchBuilder builder(batch->schema()->fields()); const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()}; - auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), true, vColumns); + auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), true, vColumns, std::nullopt); for (auto&& i : batches) { merger->AddSource(i, nullptr); } @@ -744,7 +745,8 @@ Y_UNIT_TEST_SUITE(ArrowTest) { { NArrow::NMerger::TRecordBatchBuilder builder(batches[0]->schema()->fields()); const std::vector<std::string> vColumns = {"snap"}; - auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns); + auto merger = + std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns, std::nullopt); for (auto&& i : batches) { merger->AddSource(i, nullptr); } @@ -762,6 +764,96 @@ Y_UNIT_TEST_SUITE(ArrowTest) { UNIT_ASSERT_VALUES_EQUAL(counts[2], 200); UNIT_ASSERT_VALUES_EQUAL(counts[3], 400); } + + Y_UNIT_TEST(MaxVersionFilter) { + std::shared_ptr<arrow::RecordBatch> batch = ExtractBatch(MakeTable1000()); + UNIT_ASSERT(CheckSorted1000(batch)); + + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + batches.push_back(AddSnapColumn(batch->Slice(0, 400), 0)); + batches.push_back(AddSnapColumn(batch->Slice(200, 400), 1)); + batches.push_back(AddSnapColumn(batch->Slice(400, 400), 2)); + batches.push_back(AddSnapColumn(batch->Slice(600, 400), 3)); + + std::shared_ptr<arrow::RecordBatch> maxVersion = + arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(arrow::FieldVector()), 1, std::vector<std::shared_ptr<arrow::Array>>()); + maxVersion = AddSnapColumn(maxVersion, 1); + NArrow::NMerger::TCursor maxVersionCursor(arrow::Table::FromRecordBatches({ maxVersion }).ValueOrDie(), 0, { "snap" }); + + std::shared_ptr<arrow::RecordBatch> sorted; + { + NArrow::NMerger::TRecordBatchBuilder builder(batches[0]->schema()->fields()); + const std::vector<std::string> vColumns = { "snap" }; + auto merger = + std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns, maxVersionCursor); + for (auto&& i : batches) { + merger->AddSource(i, nullptr); + } + merger->DrainAll(builder); + sorted = builder.Finalize(); + } + + UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 600); + UNIT_ASSERT(CheckSorted1000(sorted)); + UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted, batch->schema())); + + auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted->GetColumnByName("snap"))); + UNIT_ASSERT_VALUES_EQUAL(counts[0], 200); + UNIT_ASSERT_VALUES_EQUAL(counts[1], 400); + } + + Y_UNIT_TEST(EqualKeysVersionFilter) { + std::vector<std::shared_ptr<arrow::RecordBatch>> batchesByKey(3); + for (ui64 i = 0; i < batchesByKey.size(); ++i) { + batchesByKey[i] = arrow::RecordBatch::Make( + std::make_shared<arrow::Schema>(arrow::FieldVector()), 1, std::vector<std::shared_ptr<arrow::Array>>()); + batchesByKey[i] = batchesByKey[i] + ->AddColumn(batchesByKey[i]->num_columns(), "key", NArrow::MakeUI64Array(i, batchesByKey[i]->num_rows())) + .ValueOrDie(); + } + + std::shared_ptr<arrow::Schema> sortingSchema = std::make_shared<arrow::Schema>(*batchesByKey.front()->schema()); + + std::shared_ptr<arrow::RecordBatch> maxVersion = + arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(arrow::FieldVector()), 1, std::vector<std::shared_ptr<arrow::Array>>()); + maxVersion = AddSnapColumn(maxVersion, 1); + NArrow::NMerger::TCursor maxVersionCursor(arrow::Table::FromRecordBatches({ maxVersion }).ValueOrDie(), 0, { "snap" }); + + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + batches.push_back(AddSnapColumn(batchesByKey[0], 1)); + batches.push_back(AddSnapColumn(batchesByKey[1], 1)); + batches.push_back(AddSnapColumn(batchesByKey[1], 1)); + batches.push_back(AddSnapColumn(batchesByKey[2], 1)); + batches.push_back(AddSnapColumn(batchesByKey[2], 2)); + + std::shared_ptr<arrow::RecordBatch> sorted; + { + NArrow::NMerger::TRecordBatchBuilder builder(batches[0]->schema()->fields()); + const std::vector<std::string> vColumns = { "snap" }; + auto merger = + std::make_shared<NArrow::NMerger::TMergePartialStream>(sortingSchema, batches[0]->schema(), false, vColumns, maxVersionCursor); + for (auto&& i : batches) { + merger->AddSource(i, nullptr); + } + merger->DrainAll(builder); + sorted = builder.Finalize(); + } + + UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 3); + UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted, batches[0]->schema())); + + { + auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted->GetColumnByName("key"))); + UNIT_ASSERT_VALUES_EQUAL(counts[0], 1); + UNIT_ASSERT_VALUES_EQUAL(counts[1], 1); + UNIT_ASSERT_VALUES_EQUAL(counts[2], 1); + } + + { + auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted->GetColumnByName("snap"))); + UNIT_ASSERT_VALUES_EQUAL(counts[1], 3); + } + } } } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index f0ecb83428b..c3a38a045e7 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -205,7 +205,7 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared IIndexInfo::AddSnapshotFields(indexFields); auto dataSchema = std::make_shared<arrow::Schema>(indexFields); NArrow::NMerger::TMergePartialStream mergeStream( - resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames()); + resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames(), std::nullopt); ui32 idx = 0; for (auto&& batch : Batches) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index 2636db28693..592fd546d0c 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -9,7 +9,7 @@ namespace NKikimr::NOlap::NReader::NPlain { std::unique_ptr<NArrow::NMerger::TMergePartialStream> TSpecialReadContext::BuildMerger() const { return std::make_unique<NArrow::NMerger::TMergePartialStream>(GetReadMetadata()->GetReplaceKey(), GetProgramInputColumns()->GetSchema(), - GetCommonContext()->IsReverse(), IIndexInfo::GetSnapshotColumnNames()); + GetCommonContext()->IsReverse(), IIndexInfo::GetSnapshotColumnNames(), std::nullopt); } ui64 TSpecialReadContext::GetMemoryForSources(const THashMap<ui32, std::shared_ptr<IDataSource>>& sources) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index 9ae0c44e7c0..a27739ffcdf 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -3,6 +3,7 @@ #include "source.h" #include <ydb/core/formats/arrow/program/collection.h> +#include <ydb/core/formats/arrow/reader/result_builder.h> #include <ydb/core/formats/arrow/serializer/native.h> #include <ydb/core/tx/conveyor/usage/service.h> diff --git a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp index 9b9b0e351b5..f697f6d3a4c 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp @@ -157,8 +157,8 @@ public: if (!dataSchema) { dataSchema = indexInfo.GetColumnsSchemaByOrderedIndexes(indexes); } - NArrow::NMerger::TMergePartialStream stream( - context.GetActualSchema()->GetIndexInfo().GetReplaceKey(), dataSchema, false, { IIndexInfo::GetWriteIdField()->name() }); + NArrow::NMerger::TMergePartialStream stream(context.GetActualSchema()->GetIndexInfo().GetReplaceKey(), dataSchema, false, + { IIndexInfo::GetWriteIdField()->name() }, std::nullopt); for (auto&& i : containers) { stream.AddSource(i, nullptr); } |