diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-14 21:13:44 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-14 21:30:51 +0300 |
commit | 306a5ee7310d56c5b809e91bad189e67836f5875 (patch) | |
tree | dfe4c927199fe5c135b1281379f58556c033e549 | |
parent | b8a327d16e258b2d459ec0c25fe08cc456b066b6 (diff) | |
download | ydb-306a5ee7310d56c5b809e91bad189e67836f5875.tar.gz |
KIKIMR-19216: improve merging
5 files changed, 156 insertions, 107 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 3eb05ff1e0..b5a83d14bd 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -60,9 +60,9 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), portionRecordIndexField, column->BuildArray(batch->num_rows()))); } Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey())); - mergeStream.AddPoolSource({}, batch, nullptr); + mergeStream.AddSource(batch, nullptr); } - batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields, true); + batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields); } Y_ABORT_UNLESS(batchResults.size()); @@ -208,11 +208,8 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL; } -void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position) { - if (CheckPoints.size()) { - AFL_VERIFY(CheckPoints.back().Compare(position) == std::partial_ordering::less); - } - CheckPoints.emplace_back(position); +void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include) { + AFL_VERIFY(CheckPoints.emplace(position, include).second); } } diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index 2a5309646c..1ecd56e5a5 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -8,7 +8,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { private: using TBase = TCompactColumnEngineChanges; virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; - std::vector<NIndexedReader::TSortableBatchPosition> CheckPoints; + std::map<NIndexedReader::TSortableBatchPosition, bool> CheckPoints; protected: virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override; virtual TPortionMeta::EProduced GetResultProducedClass() const override { @@ -19,7 +19,7 @@ protected: public: using TBase::TBase; - void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position); + void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include = true); virtual TString TypeString() const override { return StaticTypeName(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 522d2a2eb2..d17e3de001 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -16,7 +16,7 @@ void TFetchingInterval::ConstructResult() { if (i->GetStart().Compare(Start) == std::partial_ordering::equivalent && !i->IsMergingStarted()) { auto rb = i->GetBatch(); if (rb) { - Merger->AddPoolSource({}, rb, i->GetFilterStageData().GetNotAppliedEarlyFilter()); + Merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter()); } i->StartMerging(); } diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp index d2c1676aa8..9141a15c0d 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -5,43 +5,28 @@ namespace NKikimr::NOlap::NIndexedReader { void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) { Y_ABORT_UNLESS(point); - Y_ABORT_UNLESS(point->IsSameSortingSchema(SortSchema)); + AFL_VERIFY(point->IsSameSortingSchema(SortSchema))("point", point->DebugJson())("schema", SortSchema->ToString()); Y_ABORT_UNLESS(point->IsReverseSort() == Reverse); Y_ABORT_UNLESS(++ControlPoints == 1); - SortHeap.emplace_back(TBatchIterator(*point)); - std::push_heap(SortHeap.begin(), SortHeap.end()); + SortHeap.Push(TBatchIterator(*point)); } -void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { +void TMergePartialStream::AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { if (!batch || !batch->num_rows()) { return; } Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema)); - if (!poolId) { - AddNewToHeap(poolId, batch, filter, true); - } else { - auto it = BatchPools.find(*poolId); - if (it == BatchPools.end()) { - it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first; - } - it->second.emplace_back(batch, filter); - if (it->second.size() == 1) { - AddNewToHeap(poolId, batch, filter, true); - } - } + AddNewToHeap(batch, filter); } -void TMergePartialStream::AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) { +void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { if (!filter || filter->IsTotalAllowFilter()) { - SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId)); + SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse)); } else if (filter->IsTotalDenyFilter()) { return; } else { - SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId)); - } - if (restoreHeap) { - std::push_heap(SortHeap.begin(), SortHeap.end()); + SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse)); } } @@ -49,8 +34,8 @@ void TMergePartialStream::RemoveControlPoint() { Y_ABORT_UNLESS(ControlPoints == 1); Y_ABORT_UNLESS(ControlPointEnriched()); Y_ABORT_UNLESS(-- ControlPoints == 0); - std::pop_heap(SortHeap.begin(), SortHeap.end()); - SortHeap.pop_back(); + Y_ABORT_UNLESS(SortHeap.Current().IsControlPoint()); + SortHeap.RemoveTop(); } void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) { @@ -73,11 +58,11 @@ bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSo Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); PutControlPoint(std::make_shared<TSortableBatchPosition>(readTo)); bool cpReachedFlag = false; - while (SortHeap.size() && !cpReachedFlag) { - if (SortHeap.front().IsControlPoint()) { + while (SortHeap.Size() && !cpReachedFlag) { + if (SortHeap.Current().IsControlPoint()) { RemoveControlPoint(); cpReachedFlag = true; - if (SortHeap.empty() || !includeFinish || SortHeap.front().GetKeyColumns().Compare(readTo) == std::partial_ordering::greater) { + if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(readTo) == std::partial_ordering::greater) { return true; } } @@ -92,7 +77,7 @@ bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSo bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); - while (SortHeap.size()) { + while (SortHeap.Size()) { if (auto currentPosition = DrainCurrentPosition()) { CheckSequenceInDebug(*currentPosition); builder.AddRecord(*currentPosition); @@ -102,19 +87,19 @@ bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { } std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() { - Y_ABORT_UNLESS(SortHeap.size()); - Y_ABORT_UNLESS(!SortHeap.front().IsControlPoint()); - TSortableBatchPosition result = SortHeap.front().GetKeyColumns(); - TSortableBatchPosition resultVersion = SortHeap.front().GetVersionColumns(); + Y_ABORT_UNLESS(SortHeap.Size()); + Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint()); + TSortableBatchPosition result = SortHeap.Current().GetKeyColumns(); + TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns(); bool isFirst = true; - const bool deletedFlag = SortHeap.front().IsDeleted(); - while (SortHeap.size() && (isFirst || result.Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) { - auto& anotherIterator = SortHeap.front(); + const bool deletedFlag = SortHeap.Current().IsDeleted(); + while (SortHeap.Size() && (isFirst || result.Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::equivalent)) { + auto& anotherIterator = SortHeap.Current(); if (!isFirst) { - AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) + AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) != std::partial_ordering::less)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) ("key", result.DebugJson()); } - NextInHeap(true); + SortHeap.Next(); isFirst = false; } if (deletedFlag) { @@ -123,13 +108,13 @@ std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition( return result; } -std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::vector<TSortableBatchPosition>& positions, - const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions) +std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions, + const std::vector<std::shared_ptr<arrow::Field>>& resultFields) { std::vector<std::shared_ptr<arrow::RecordBatch>> result; for (auto&& i : positions) { NIndexedReader::TRecordBatchBuilder indexesBuilder(resultFields); - DrainCurrentTo(indexesBuilder, i, includePositions); + DrainCurrentTo(indexesBuilder, i.first, i.second); result.emplace_back(indexesBuilder.Finalize()); if (result.back()->num_rows() == 0) { result.pop_back(); @@ -147,11 +132,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const { NJson::TJsonValue result; result["is_cp"] = IsControlPoint(); - if (PoolId) { - result["pool_id"] = *PoolId; - } else { - result["pool_id"] = "absent"; - } result["key"] = KeyColumns.DebugJson(); return result; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index c5d52219f2..c7884bd7ab 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -14,6 +14,120 @@ namespace NKikimr::NOlap::NIndexedReader { class TRecordBatchBuilder; +template <class TSortCursor> +class TSortingHeap { +public: + TSortingHeap() = default; + + template <typename TCursors> + TSortingHeap(TCursors& cursors, bool notNull) { + Queue.reserve(cursors.size()); + for (auto& cur : cursors) { + if (!cur.Empty()) { + Queue.emplace_back(TSortCursor(&cur, notNull)); + } + } + std::make_heap(Queue.begin(), Queue.end()); + } + + const TSortCursor& Current() const { return Queue.front(); } + size_t Size() const { return Queue.size(); } + bool Empty() const { return Queue.empty(); } + TSortCursor& NextChild() { return Queue[NextChildIndex()]; } + + void Next() { + Y_ABORT_UNLESS(Size()); + + if (Queue.front().Next()) { + UpdateTop(); + } else { + RemoveTop(); + } + } + + void RemoveTop() { + std::pop_heap(Queue.begin(), Queue.end()); + Queue.pop_back(); + NextIdx = 0; + } + + void Push(TSortCursor&& cursor) { + Queue.emplace_back(cursor); + std::push_heap(Queue.begin(), Queue.end()); + NextIdx = 0; + } + + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_ARRAY; + for (auto&& i : Queue) { + result.AppendValue(i.DebugJson()); + } + return result; + } + +private: + std::vector<TSortCursor> Queue; + /// Cache comparison between first and second child if the order in queue has not been changed. + size_t NextIdx = 0; + + size_t NextChildIndex() { + if (NextIdx == 0) { + NextIdx = 1; + if (Queue.size() > 2 && Queue[1] < Queue[2]) { + ++NextIdx; + } + } + + return NextIdx; + } + + /// This is adapted version of the function __sift_down from libc++. + /// Why cannot simply use std::priority_queue? + /// - because it doesn't support updating the top element and requires pop and push instead. + /// Also look at "Boost.Heap" library. + void UpdateTop() { + size_t size = Queue.size(); + if (size < 2) + return; + + auto begin = Queue.begin(); + + size_t child_idx = NextChildIndex(); + auto child_it = begin + child_idx; + + /// Check if we are in order. + if (*child_it < *begin) + return; + + NextIdx = 0; + + auto curr_it = begin; + auto top(std::move(*begin)); + do { + /// We are not in heap-order, swap the parent with it's largest child. + *curr_it = std::move(*child_it); + curr_it = child_it; + + // recompute the child based off of the updated parent + child_idx = 2 * child_idx + 1; + + if (child_idx >= size) + break; + + child_it = begin + child_idx; + + if ((child_idx + 1) < size && *child_it < *(child_it + 1)) { + /// Right child exists and is greater than left child. + ++child_it; + ++child_idx; + } + + /// Check if we are in order. + } while (!(*child_it < top)); + *curr_it = std::move(top); + } +}; + class TMergePartialStream { private: #ifndef NDEBUG @@ -26,7 +140,6 @@ private: TSortableBatchPosition VersionColumns; i64 RecordsCount; int ReverseSortKff; - YDB_OPT(ui32, PoolId); std::shared_ptr<NArrow::TColumnFilter> Filter; std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator; @@ -62,13 +175,12 @@ private: } TBatchIterator(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, - const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::optional<ui32> poolId) + const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort) : ControlPointFlag(false) , KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort) , VersionColumns(batch, 0, TIndexInfo::GetSpecialColumnNames(), {}, false) , RecordsCount(batch->num_rows()) , ReverseSortKff(reverseSort ? -1 : 1) - , PoolId(poolId) , Filter(filter) { Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition())); @@ -136,43 +248,11 @@ private: result["current"] = CurrentKeyColumns->DebugJson(); } #endif - for (auto&& i : SortHeap) { - result["heap"].AppendValue(i.DebugJson()); - } + result.InsertValue("heap", SortHeap.DebugJson()); return result; } - bool NextInHeap(const bool needPop) { - if (SortHeap.empty()) { - return false; - } - if (needPop) { - std::pop_heap(SortHeap.begin(), SortHeap.end()); - } - if (SortHeap.back().Next()) { - std::push_heap(SortHeap.begin(), SortHeap.end()); - } else if (!SortHeap.back().HasPoolId()) { - SortHeap.pop_back(); - } else { - auto it = BatchPools.find(SortHeap.back().GetPoolIdUnsafe()); - Y_ABORT_UNLESS(it->second.size()); - if (it->second.size() == 1) { - BatchPools.erase(it); - SortHeap.pop_back(); - } else { - it->second.pop_front(); - TBatchIterator oldIterator = std::move(SortHeap.back()); - SortHeap.pop_back(); - AddNewToHeap(SortHeap.back().GetPoolIdUnsafe(), it->second.front().GetBatch(), it->second.front().GetFilter(), false); - oldIterator.CheckNextBatch(SortHeap.back()); - std::push_heap(SortHeap.begin(), SortHeap.end()); - } - } - return SortHeap.size(); - } - - THashMap<ui32, std::deque<TIteratorData>> BatchPools; - std::vector<TBatchIterator> SortHeap; + TSortingHeap<TBatchIterator> SortHeap; std::shared_ptr<arrow::Schema> SortSchema; std::shared_ptr<arrow::Schema> DataSchema; const bool Reverse; @@ -180,7 +260,7 @@ private: std::optional<TSortableBatchPosition> DrainCurrentPosition(); - void AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap); + void AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter); void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition); public: TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse) @@ -193,15 +273,7 @@ public: } bool IsValid() const { - return SortHeap.size(); - } - - bool HasRecordsInPool(const ui32 poolId) const { - auto it = BatchPools.find(poolId); - if (it == BatchPools.end()) { - return false; - } - return it->second.size(); + return SortHeap.Size(); } void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point); @@ -209,19 +281,19 @@ public: void RemoveControlPoint(); bool ControlPointEnriched() const { - return SortHeap.size() && SortHeap.front().IsControlPoint(); + return SortHeap.Size() && SortHeap.Current().IsControlPoint(); } - void AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter); + void AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter); bool IsEmpty() const { - return SortHeap.empty(); + return !SortHeap.Size(); } bool DrainAll(TRecordBatchBuilder& builder); bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish); - std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::vector<TSortableBatchPosition>& positions, - const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions); + std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions, + const std::vector<std::shared_ptr<arrow::Field>>& resultFields); }; class TRecordBatchBuilder { |