aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon <yentsovsemyon@ydb.tech>2025-05-22 13:40:05 +0300
committerGitHub <noreply@github.com>2025-05-22 13:40:05 +0300
commit4efc436ed990116e924e258e4b918a15ddbce6d3 (patch)
tree102a2e59b11d10473fdafe898fcd7e6d045fbfd5
parent67eb9eccb1d9f623dcf3a0fa00a3cab6957bb49f (diff)
downloadydb-4efc436ed990116e924e258e4b918a15ddbce6d3.tar.gz
merge result builder customization on CS (#18481)
-rw-r--r--ydb/core/formats/arrow/reader/batch_iterator.h10
-rw-r--r--ydb/core/formats/arrow/reader/merger.cpp82
-rw-r--r--ydb/core/formats/arrow/reader/merger.h157
-rw-r--r--ydb/core/formats/arrow/reader/position.cpp13
-rw-r--r--ydb/core/formats/arrow/reader/position.h19
-rw-r--r--ydb/core/formats/arrow/reader/result_builder.cpp10
-rw-r--r--ydb/core/formats/arrow/reader/result_builder.h12
-rw-r--r--ydb/core/formats/arrow/ut/ut_arrow.cpp98
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp1
-rw-r--r--ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp4
12 files changed, 291 insertions, 119 deletions
diff --git a/ydb/core/formats/arrow/reader/batch_iterator.h b/ydb/core/formats/arrow/reader/batch_iterator.h
index d3bb365d570..91d47d78041 100644
--- a/ydb/core/formats/arrow/reader/batch_iterator.h
+++ b/ydb/core/formats/arrow/reader/batch_iterator.h
@@ -11,6 +11,7 @@ private:
TRWSortableBatchPosition VersionColumns;
i64 RecordsCount;
int ReverseSortKff;
+ YDB_READONLY(ui64, SourceId, 0);
std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;
@@ -50,14 +51,17 @@ public:
}
template <class TDataContainer>
- TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
- const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::vector<std::string>& versionColumnNames)
+ TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const arrow::Schema& keySchema,
+ const arrow::Schema& dataSchema, const bool reverseSort, const std::vector<std::string>& versionColumnNames, const ui64 sourceId)
: ControlPointFlag(false)
- , KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
+ , KeyColumns(batch, 0, keySchema.field_names(), dataSchema.field_names(), reverseSort)
, VersionColumns(batch, 0, versionColumnNames, {}, false)
, RecordsCount(batch->num_rows())
, ReverseSortKff(reverseSort ? -1 : 1)
+ , SourceId(sourceId)
, Filter(filter) {
+ AFL_VERIFY(KeyColumns.IsSameSortingSchema(keySchema))("batch", KeyColumns.DebugJson())("schema", keySchema.ToString());
+ AFL_VERIFY(KeyColumns.IsSameDataSchema(dataSchema))("batch", KeyColumns.DebugJson())("schema", dataSchema.ToString());
Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition()));
Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition()));
if (Filter) {
diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp
index bdf212696b8..c12de2936ac 100644
--- a/ydb/core/formats/arrow/reader/merger.cpp
+++ b/ydb/core/formats/arrow/reader/merger.cpp
@@ -6,7 +6,7 @@
namespace NKikimr::NArrow::NMerger {
void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy) {
- AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString());
+ AFL_VERIFY(point.IsSameSortingSchema(*SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString());
Y_ABORT_UNLESS(point.IsReverseSort() == Reverse);
Y_ABORT_UNLESS(++ControlPoints == 1);
@@ -37,39 +37,6 @@ void TMergePartialStream::CheckSequenceInDebug(const TRWSortableBatchPosition& n
#endif
}
-bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
- AFL_VERIFY(ControlPoints == 1);
- Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
- builder.ValidateDataSchema(DataSchema);
- bool cpReachedFlag = false;
- std::shared_ptr<TSortableScanData> resultScanData;
- ui64 resultPosition;
- while (SortHeap.Size() && !cpReachedFlag && !builder.IsBufferExhausted()) {
- if (SortHeap.Current().IsControlPoint()) {
- auto keyColumns = SortHeap.Current().GetKeyColumns().BuildSortingCursor();
- RemoveControlPoint();
- cpReachedFlag = true;
- if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(keyColumns) == std::partial_ordering::greater) {
- if (lastResultPosition && resultScanData) {
- *lastResultPosition = resultScanData->BuildCursor(resultPosition);
- }
- return true;
- }
- }
-
- DrainCurrentPosition(&builder, &resultScanData, &resultPosition);
- }
- if (lastResultPosition && resultScanData) {
- *lastResultPosition = resultScanData->BuildCursor(resultPosition);
- }
- return cpReachedFlag;
-}
-
-bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
- PutControlPoint(readTo, false);
- return DrainToControlPoint(builder, includeFinish, lastResultPosition);
-}
-
std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
std::shared_ptr<arrow::Table> result;
if (SortHeap.Empty()) {
@@ -143,53 +110,6 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
return result;
}
-void TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
- Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
- while (SortHeap.Size()) {
- DrainCurrentPosition(&builder, nullptr, nullptr);
- }
-}
-
-void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition) {
- Y_ABORT_UNLESS(SortHeap.Size());
- Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
- if (!SortHeap.Current().IsDeleted()) {
-// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_add", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
- if (builder) {
- builder->AddRecord(SortHeap.Current().GetKeyColumns());
- }
- if (resultScanData && resultPosition) {
- *resultScanData = SortHeap.Current().GetKeyColumns().GetSorting();
- *resultPosition = SortHeap.Current().GetKeyColumns().GetPosition();
- }
- } else {
-// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
- }
- CheckSequenceInDebug(SortHeap.Current().GetKeyColumns());
- const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition();
- const TSortableScanData* startSorting = SortHeap.Current().GetKeyColumns().GetSorting().get();
- const TSortableScanData* startVersion = SortHeap.Current().GetVersionColumns().GetSorting().get();
- bool isFirst = true;
- while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) {
- if (!isFirst) {
-// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip1", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
- auto& anotherIterator = SortHeap.Current();
- if (PossibleSameVersionFlag) {
- AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater)
- ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
- ("key", startSorting->BuildCursor(startPosition).DebugJson());
- } else {
- AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) == std::partial_ordering::less)
- ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
- ("key", startSorting->BuildCursor(startPosition).DebugJson());
- }
- }
- SortHeap.Next();
- isFirst = false;
- }
- SortHeap.CleanFinished();
-}
-
std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const TIntervalPositions& positions,
const std::vector<std::shared_ptr<arrow::Field>>& resultFields)
{
diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h
index 4950e49bee3..b49e1b6a67e 100644
--- a/ydb/core/formats/arrow/reader/merger.h
+++ b/ydb/core/formats/arrow/reader/merger.h
@@ -1,13 +1,20 @@
#pragma once
-#include "position.h"
-#include "heap.h"
-#include "result_builder.h"
#include "batch_iterator.h"
+#include "heap.h"
+#include "position.h"
#include <ydb/core/formats/arrow/arrow_filter.h>
namespace NKikimr::NArrow::NMerger {
+template <typename T>
+concept MergeResultBuilder = requires(const T& constT, T& mutT, const std::shared_ptr<arrow::Schema>& schema, const TBatchIterator& cursor) {
+ { constT.IsBufferExhausted() } -> std::same_as<bool>;
+ { constT.ValidateDataSchema(schema) } -> std::same_as<void>;
+ { mutT.AddRecord(cursor) } -> std::same_as<void>;
+ { mutT.SkipRecord(cursor) } -> std::same_as<void>;
+};
+
class TMergePartialStream {
private:
#ifndef NDEBUG
@@ -19,6 +26,7 @@ private:
std::shared_ptr<arrow::Schema> DataSchema;
const bool Reverse;
const std::vector<std::string> VersionColumnNames;
+ std::optional<TCursor> MaxVersion;
ui32 ControlPoints = 0;
TSortingHeap<TBatchIterator> SortHeap;
@@ -34,19 +42,92 @@ private:
return result;
}
- void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition);
+ template <MergeResultBuilder TBuilder>
+ [[nodiscard]] bool DrainCurrentPosition(TBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition) {
+ Y_ABORT_UNLESS(SortHeap.Size());
+ Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
+ CheckSequenceInDebug(SortHeap.Current().GetKeyColumns());
+
+ const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition();
+ const TSortableScanData* startSorting = SortHeap.Current().GetKeyColumns().GetSorting().get();
+ const TSortableScanData* startVersion = SortHeap.Current().GetVersionColumns().GetSorting().get();
+
+ if (MaxVersion) {
+ bool skippedPk = false;
+ while (SortHeap.Size() && SortHeap.Current().GetVersionColumns().Compare(*MaxVersion) == std::partial_ordering::greater && !skippedPk) {
+ if (builder) {
+ builder->SkipRecord(SortHeap.Current());
+ }
+ SortHeap.Next();
+ if (SortHeap.Empty() ||
+ SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) != std::partial_ordering::equivalent) {
+ skippedPk = true;
+ }
+ }
+ if (skippedPk) {
+ SortHeap.CleanFinished();
+ return false;
+ }
+ }
+
+ bool foundResult = false;
+ if (!SortHeap.Current().IsDeleted()) {
+ foundResult = true;
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_add", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
+ if (builder) {
+ builder->AddRecord(SortHeap.Current());
+ }
+ if (resultScanData && resultPosition) {
+ *resultScanData = SortHeap.Current().GetKeyColumns().GetSorting();
+ *resultPosition = SortHeap.Current().GetKeyColumns().GetPosition();
+ }
+ } else {
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
+ if (builder) {
+ builder->SkipRecord(SortHeap.Current());
+ }
+ }
+ SortHeap.Next();
+
+ while (
+ SortHeap.Size() && (SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) {
+ if (builder) {
+ builder->SkipRecord(SortHeap.Current());
+ }
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip1", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
+ auto& anotherIterator = SortHeap.Current();
+ if (PossibleSameVersionFlag) {
+ AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater)
+ ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())(
+ "key", startSorting->BuildCursor(startPosition).DebugJson());
+ } else {
+ AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) == std::partial_ordering::less)
+ ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())(
+ "key", startSorting->BuildCursor(startPosition).DebugJson());
+ }
+ SortHeap.Next();
+ }
+ SortHeap.CleanFinished();
+ return foundResult;
+ }
void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition);
- bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish,
- std::optional<TCursor>* lastResultPosition = nullptr);
+
+ template <MergeResultBuilder TBuilder>
+ bool DrainCurrentTo(TBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish,
+ std::optional<TCursor>* lastResultPosition = nullptr) {
+ PutControlPoint(readTo, false);
+ return DrainToControlPoint(builder, includeFinish, lastResultPosition);
+ }
public:
- TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
+ TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse,
+ const std::vector<std::string>& versionColumnNames, const std::optional<TCursor>& maxVersion)
: SortSchema(sortSchema)
, DataSchema(dataSchema)
, Reverse(reverse)
, VersionColumnNames(versionColumnNames)
- {
+ , MaxVersion(maxVersion) {
Y_ABORT_UNLESS(SortSchema);
Y_ABORT_UNLESS(SortSchema->num_fields());
Y_ABORT_UNLESS(!DataSchema || DataSchema->num_fields());
@@ -78,25 +159,67 @@ public:
}
template <class TDataContainer>
- void AddSource(const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter) {
+ void AddSource(const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter,
+ const std::optional<ui64> sourceIdExt = std::nullopt) {
+ const ui64 sourceId = sourceIdExt.value_or(SortHeap.Size());
if (!batch || !batch->num_rows()) {
return;
}
-// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
+ // Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
const bool isDenyFilter = filter && filter->IsTotalDenyFilter();
auto filterImpl = (!filter || filter->IsTotalAllowFilter()) ? nullptr : filter;
- SortHeap.Push(TBatchIterator(batch, filterImpl, SortSchema->field_names(), (!isDenyFilter && DataSchema) ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
+ static const arrow::Schema emptySchema = arrow::Schema(arrow::FieldVector());
+ TBatchIterator iterator(
+ batch, filterImpl, *SortSchema, (!isDenyFilter && DataSchema) ? *DataSchema : emptySchema, Reverse, VersionColumnNames, sourceId);
+ if (MaxVersion) {
+ MaxVersion->ValidateSchema(*iterator.GetVersionColumns().GetSorting());
+ }
+ SortHeap.Push(std::move(iterator));
}
bool IsEmpty() const {
return !SortHeap.Size();
}
- void DrainAll(TRecordBatchBuilder& builder);
- std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
- bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
- std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const TIntervalPositions& positions,
- const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
+ template <MergeResultBuilder TBuilder>
+ void DrainAll(TBuilder& builder) {
+ builder.ValidateDataSchema(DataSchema);
+ while (SortHeap.Size()) {
+ Y_UNUSED(DrainCurrentPosition(&builder, nullptr, nullptr));
+ }
+ }
+ std::shared_ptr<arrow::Table> SingleSourceDrain(
+ const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
+ std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(
+ const TIntervalPositions& positions, const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
+
+ template <MergeResultBuilder TBuilder>
+ bool DrainToControlPoint(TBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr) {
+ AFL_VERIFY(ControlPoints == 1);
+ builder.ValidateDataSchema(DataSchema);
+ bool cpReachedFlag = false;
+ std::shared_ptr<TSortableScanData> resultScanData;
+ ui64 resultPosition;
+ while (SortHeap.Size() && !cpReachedFlag && !builder.IsBufferExhausted()) {
+ if (SortHeap.Current().IsControlPoint()) {
+ auto keyColumns = SortHeap.Current().GetKeyColumns().BuildSortingCursor();
+ RemoveControlPoint();
+ cpReachedFlag = true;
+ if (SortHeap.Empty() || !includeFinish ||
+ SortHeap.Current().GetKeyColumns().Compare(keyColumns) == std::partial_ordering::greater) {
+ if (lastResultPosition && resultScanData) {
+ *lastResultPosition = resultScanData->BuildCursor(resultPosition);
+ }
+ return true;
+ }
+ }
+ Y_UNUSED(DrainCurrentPosition(&builder, &resultScanData, &resultPosition));
+ }
+ if (lastResultPosition && resultScanData) {
+ *lastResultPosition = resultScanData->BuildCursor(resultPosition);
+ }
+ return cpReachedFlag;
+ }
};
-}
+} // namespace NKikimr::NArrow::NMerger
diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp
index 02ddbaf2e26..ae8ecdc8317 100644
--- a/ydb/core/formats/arrow/reader/position.cpp
+++ b/ydb/core/formats/arrow/reader/position.cpp
@@ -32,14 +32,14 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
};
{
- AFL_VERIFY(guard.InitSortingPosition(posStart));
+ AFL_VERIFY(guard.InitSortingPosition(posStart))("start", posStart)("finish", posFinish);
auto cmp = position.Compare(forFound);
if (cond(cmp)) {
return TFoundPosition(posStart, cmp);
}
}
{
- AFL_VERIFY(guard.InitSortingPosition(posFinish));
+ AFL_VERIFY(guard.InitSortingPosition(posFinish))("start", posStart)("finish", posFinish);
auto cmp = position.Compare(forFound);
if (!cond(cmp)) {
return std::nullopt;
@@ -266,6 +266,15 @@ std::partial_ordering TCursor::Compare(const TCursor& item) const {
return std::partial_ordering::equivalent;
}
+void TCursor::ValidateSchema(const TSortableScanData& position) const {
+ AFL_VERIFY(position.GetFields().size() == PositionAddress.size());
+ for (ui64 i = 0; i < PositionAddress.size(); ++i) {
+ const auto& posType = position.GetFields()[i]->type();
+ const auto& cursorType = PositionAddress[i].GetArray()->type();
+ AFL_VERIFY(posType->Equals(cursorType))("pos", posType->ToString())("cursor", cursorType->ToString());
+ }
+}
+
void TCursor::AppendPositionTo(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, ui64* recordSize) const {
AFL_VERIFY(builders.size() == PositionAddress.size());
for (ui32 i = 0; i < PositionAddress.size(); ++i) {
diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h
index 973abfc92ac..5508fb0d3b4 100644
--- a/ydb/core/formats/arrow/reader/position.h
+++ b/ydb/core/formats/arrow/reader/position.h
@@ -56,6 +56,8 @@ public:
std::partial_ordering Compare(const TSortableScanData& item, const ui64 itemPosition) const;
std::partial_ordering Compare(const TCursor& item) const;
+
+ void ValidateSchema(const TSortableScanData& position) const;
};
class TSortableScanData {
@@ -175,15 +177,15 @@ public:
return arrow::Table::Make(std::make_shared<arrow::Schema>(Fields), slicedArrays, count);
}
- bool IsSameSchema(const std::shared_ptr<arrow::Schema>& schema) const {
- if (Fields.size() != (size_t)schema->num_fields()) {
+ bool IsSameSchema(const arrow::Schema& schema) const {
+ if (Fields.size() != (size_t)schema.num_fields()) {
return false;
}
for (ui32 i = 0; i < Fields.size(); ++i) {
- if (!Fields[i]->type()->Equals(schema->field(i)->type())) {
+ if (!Fields[i]->type()->Equals(schema.field(i)->type())) {
return false;
}
- if (Fields[i]->name() != schema->field(i)->name()) {
+ if (Fields[i]->name() != schema.field(i)->name()) {
return false;
}
}
@@ -360,10 +362,17 @@ public:
TRWSortableBatchPosition BuildRWPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const;
- bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) const {
+ bool IsSameSortingSchema(const arrow::Schema& schema) const {
return Sorting->IsSameSchema(schema);
}
+ bool IsSameDataSchema(const arrow::Schema& schema) const {
+ if (!Data) {
+ return schema.num_fields() == 0;
+ }
+ return Data->IsSameSchema(schema);
+ }
+
template <class TRecords>
TSortableBatchPosition(const std::shared_ptr<TRecords>& batch, const ui32 position, const std::vector<std::string>& sortingColumns,
const std::vector<std::string>& dataColumns, const bool reverseSort)
diff --git a/ydb/core/formats/arrow/reader/result_builder.cpp b/ydb/core/formats/arrow/reader/result_builder.cpp
index 795e693fe2a..ec942e7befa 100644
--- a/ydb/core/formats/arrow/reader/result_builder.cpp
+++ b/ydb/core/formats/arrow/reader/result_builder.cpp
@@ -9,10 +9,17 @@
namespace NKikimr::NArrow::NMerger {
-void TRecordBatchBuilder::ValidateDataSchema(const std::shared_ptr<arrow::Schema>& schema) {
+void TRecordBatchBuilder::ValidateDataSchema(const std::shared_ptr<arrow::Schema>& schema) const {
AFL_VERIFY(IsSameFieldsSequence(schema->fields(), Fields));
}
+void TRecordBatchBuilder::AddRecord(const TBatchIterator& cursor) {
+ AddRecord(cursor.GetKeyColumns());
+}
+
+void TRecordBatchBuilder::SkipRecord(const TBatchIterator& /*cursor*/) {
+}
+
void TRecordBatchBuilder::AddRecord(const TCursor& position) {
// AFL_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields));
// AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson());
@@ -58,6 +65,7 @@ TRecordBatchBuilder::TRecordBatchBuilder(const std::vector<std::shared_ptr<arrow
NArrow::TStatusValidator::Validate(Builders.back()->Reserve(*rowsCountExpectation));
}
}
+ AFL_VERIFY(Fields.size() == Builders.size());
}
std::shared_ptr<arrow::RecordBatch> TRecordBatchBuilder::Finalize() {
diff --git a/ydb/core/formats/arrow/reader/result_builder.h b/ydb/core/formats/arrow/reader/result_builder.h
index 786c68f1ed7..de1c9df25ad 100644
--- a/ydb/core/formats/arrow/reader/result_builder.h
+++ b/ydb/core/formats/arrow/reader/result_builder.h
@@ -1,8 +1,13 @@
#pragma once
#include "position.h"
+
+#include <ydb/core/formats/arrow/reader/merger.h>
+
#include <ydb/library/accessor/accessor.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
#include <util/system/types.h>
+
#include <optional>
namespace NKikimr::NArrow::NMerger {
@@ -15,7 +20,7 @@ private:
YDB_ACCESSOR_DEF(std::optional<ui32>, MemoryBufferLimit);
ui64 CurrentBytesUsed = 0;
- bool IsSameFieldsSequence(const std::vector<std::shared_ptr<arrow::Field>>& f1, const std::vector<std::shared_ptr<arrow::Field>>& f2);
+ static bool IsSameFieldsSequence(const std::vector<std::shared_ptr<arrow::Field>>& f1, const std::vector<std::shared_ptr<arrow::Field>>& f2);
public:
@@ -60,7 +65,8 @@ public:
}
void AddRecord(const TCursor& position);
void AddRecord(const TRWSortableBatchPosition& position);
- void ValidateDataSchema(const std::shared_ptr<arrow::Schema>& schema);
+ void ValidateDataSchema(const std::shared_ptr<arrow::Schema>& schema) const;
+ void AddRecord(const TBatchIterator& cursor);
+ void SkipRecord(const TBatchIterator& cursor);
};
-
}
diff --git a/ydb/core/formats/arrow/ut/ut_arrow.cpp b/ydb/core/formats/arrow/ut/ut_arrow.cpp
index bc46b4f329f..ccbf117a5ee 100644
--- a/ydb/core/formats/arrow/ut/ut_arrow.cpp
+++ b/ydb/core/formats/arrow/ut/ut_arrow.cpp
@@ -691,7 +691,8 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
{
NArrow::NMerger::TRecordBatchBuilder builder(batch->schema()->fields());
const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()};
- auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), false, vColumns);
+ auto merger =
+ std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), false, vColumns, std::nullopt);
for (auto&& i : batches) {
merger->AddSource(i, nullptr);
}
@@ -718,7 +719,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
{
NArrow::NMerger::TRecordBatchBuilder builder(batch->schema()->fields());
const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()};
- auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), true, vColumns);
+ auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), true, vColumns, std::nullopt);
for (auto&& i : batches) {
merger->AddSource(i, nullptr);
}
@@ -744,7 +745,8 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
{
NArrow::NMerger::TRecordBatchBuilder builder(batches[0]->schema()->fields());
const std::vector<std::string> vColumns = {"snap"};
- auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns);
+ auto merger =
+ std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns, std::nullopt);
for (auto&& i : batches) {
merger->AddSource(i, nullptr);
}
@@ -762,6 +764,96 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
UNIT_ASSERT_VALUES_EQUAL(counts[2], 200);
UNIT_ASSERT_VALUES_EQUAL(counts[3], 400);
}
+
+ Y_UNIT_TEST(MaxVersionFilter) {
+ std::shared_ptr<arrow::RecordBatch> batch = ExtractBatch(MakeTable1000());
+ UNIT_ASSERT(CheckSorted1000(batch));
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ batches.push_back(AddSnapColumn(batch->Slice(0, 400), 0));
+ batches.push_back(AddSnapColumn(batch->Slice(200, 400), 1));
+ batches.push_back(AddSnapColumn(batch->Slice(400, 400), 2));
+ batches.push_back(AddSnapColumn(batch->Slice(600, 400), 3));
+
+ std::shared_ptr<arrow::RecordBatch> maxVersion =
+ arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(arrow::FieldVector()), 1, std::vector<std::shared_ptr<arrow::Array>>());
+ maxVersion = AddSnapColumn(maxVersion, 1);
+ NArrow::NMerger::TCursor maxVersionCursor(arrow::Table::FromRecordBatches({ maxVersion }).ValueOrDie(), 0, { "snap" });
+
+ std::shared_ptr<arrow::RecordBatch> sorted;
+ {
+ NArrow::NMerger::TRecordBatchBuilder builder(batches[0]->schema()->fields());
+ const std::vector<std::string> vColumns = { "snap" };
+ auto merger =
+ std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns, maxVersionCursor);
+ for (auto&& i : batches) {
+ merger->AddSource(i, nullptr);
+ }
+ merger->DrainAll(builder);
+ sorted = builder.Finalize();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 600);
+ UNIT_ASSERT(CheckSorted1000(sorted));
+ UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted, batch->schema()));
+
+ auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted->GetColumnByName("snap")));
+ UNIT_ASSERT_VALUES_EQUAL(counts[0], 200);
+ UNIT_ASSERT_VALUES_EQUAL(counts[1], 400);
+ }
+
+ Y_UNIT_TEST(EqualKeysVersionFilter) {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batchesByKey(3);
+ for (ui64 i = 0; i < batchesByKey.size(); ++i) {
+ batchesByKey[i] = arrow::RecordBatch::Make(
+ std::make_shared<arrow::Schema>(arrow::FieldVector()), 1, std::vector<std::shared_ptr<arrow::Array>>());
+ batchesByKey[i] = batchesByKey[i]
+ ->AddColumn(batchesByKey[i]->num_columns(), "key", NArrow::MakeUI64Array(i, batchesByKey[i]->num_rows()))
+ .ValueOrDie();
+ }
+
+ std::shared_ptr<arrow::Schema> sortingSchema = std::make_shared<arrow::Schema>(*batchesByKey.front()->schema());
+
+ std::shared_ptr<arrow::RecordBatch> maxVersion =
+ arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(arrow::FieldVector()), 1, std::vector<std::shared_ptr<arrow::Array>>());
+ maxVersion = AddSnapColumn(maxVersion, 1);
+ NArrow::NMerger::TCursor maxVersionCursor(arrow::Table::FromRecordBatches({ maxVersion }).ValueOrDie(), 0, { "snap" });
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ batches.push_back(AddSnapColumn(batchesByKey[0], 1));
+ batches.push_back(AddSnapColumn(batchesByKey[1], 1));
+ batches.push_back(AddSnapColumn(batchesByKey[1], 1));
+ batches.push_back(AddSnapColumn(batchesByKey[2], 1));
+ batches.push_back(AddSnapColumn(batchesByKey[2], 2));
+
+ std::shared_ptr<arrow::RecordBatch> sorted;
+ {
+ NArrow::NMerger::TRecordBatchBuilder builder(batches[0]->schema()->fields());
+ const std::vector<std::string> vColumns = { "snap" };
+ auto merger =
+ std::make_shared<NArrow::NMerger::TMergePartialStream>(sortingSchema, batches[0]->schema(), false, vColumns, maxVersionCursor);
+ for (auto&& i : batches) {
+ merger->AddSource(i, nullptr);
+ }
+ merger->DrainAll(builder);
+ sorted = builder.Finalize();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(sorted->num_rows(), 3);
+ UNIT_ASSERT(NArrow::IsSortedAndUnique(sorted, batches[0]->schema()));
+
+ {
+ auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted->GetColumnByName("key")));
+ UNIT_ASSERT_VALUES_EQUAL(counts[0], 1);
+ UNIT_ASSERT_VALUES_EQUAL(counts[1], 1);
+ UNIT_ASSERT_VALUES_EQUAL(counts[2], 1);
+ }
+
+ {
+ auto counts = CountValues(std::static_pointer_cast<arrow::UInt64Array>(sorted->GetColumnByName("snap")));
+ UNIT_ASSERT_VALUES_EQUAL(counts[1], 3);
+ }
+ }
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
index f0ecb83428b..c3a38a045e7 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
@@ -205,7 +205,7 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
IIndexInfo::AddSnapshotFields(indexFields);
auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
NArrow::NMerger::TMergePartialStream mergeStream(
- resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames());
+ resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames(), std::nullopt);
ui32 idx = 0;
for (auto&& batch : Batches) {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
index 2636db28693..592fd546d0c 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
@@ -9,7 +9,7 @@ namespace NKikimr::NOlap::NReader::NPlain {
std::unique_ptr<NArrow::NMerger::TMergePartialStream> TSpecialReadContext::BuildMerger() const {
return std::make_unique<NArrow::NMerger::TMergePartialStream>(GetReadMetadata()->GetReplaceKey(), GetProgramInputColumns()->GetSchema(),
- GetCommonContext()->IsReverse(), IIndexInfo::GetSnapshotColumnNames());
+ GetCommonContext()->IsReverse(), IIndexInfo::GetSnapshotColumnNames(), std::nullopt);
}
ui64 TSpecialReadContext::GetMemoryForSources(const THashMap<ui32, std::shared_ptr<IDataSource>>& sources) {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
index 9ae0c44e7c0..a27739ffcdf 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
@@ -3,6 +3,7 @@
#include "source.h"
#include <ydb/core/formats/arrow/program/collection.h>
+#include <ydb/core/formats/arrow/reader/result_builder.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/tx/conveyor/usage/service.h>
diff --git a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp
index 9b9b0e351b5..f697f6d3a4c 100644
--- a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp
+++ b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp
@@ -157,8 +157,8 @@ public:
if (!dataSchema) {
dataSchema = indexInfo.GetColumnsSchemaByOrderedIndexes(indexes);
}
- NArrow::NMerger::TMergePartialStream stream(
- context.GetActualSchema()->GetIndexInfo().GetReplaceKey(), dataSchema, false, { IIndexInfo::GetWriteIdField()->name() });
+ NArrow::NMerger::TMergePartialStream stream(context.GetActualSchema()->GetIndexInfo().GetReplaceKey(), dataSchema, false,
+ { IIndexInfo::GetWriteIdField()->name() }, std::nullopt);
for (auto&& i : containers) {
stream.AddSource(i, nullptr);
}