diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-03 11:36:06 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-03 11:36:06 +0300 |
commit | 2366526b03d6e6f274a112c5276300f09d101ea8 (patch) | |
tree | e3a065f461c0b34b3b49b0e47dd6890cb97dd57b | |
parent | 1818d24064c3177c8ca6226b878f3f69d90965dc (diff) | |
download | ydb-2366526b03d6e6f274a112c5276300f09d101ea8.tar.gz |
correct merge control for internal duplications
6 files changed, 71 insertions, 24 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 22e319727f..1cf45874a5 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -72,7 +72,7 @@ GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo, const std::shared_ptr<NArrow::TSortDescription>& description, - const THashSet<const void*> batchesToDedup) { + const THashSet<const void*>& batchesToDedup) { auto rangesSlices = GroupInKeyRanges(batches, indexInfo); // Merge slices in ranges diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 6257c1788e..dfe30b754f 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -15,9 +15,11 @@ TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionI Y_VERIFY(portionInfo.Records.size()); if (portionInfo.CanIntersectOthers()) { + AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "intersect_portion"); Owner->SetDuplicationsAvailable(true); if (portionInfo.CanHaveDups()) { - SetDuplicationsAvailable(true); + AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion"); + DuplicationsAvailableFlag = true; } } } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 25f6002a44..d854b4a997 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -33,7 +33,7 @@ private: ui32 OriginalRecordsCount = 0; - YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); + YDB_READONLY_FLAG(DuplicationsAvailable, false); THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; TGranule* Owner; const TPortionInfo* PortionInfo = nullptr; diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index 58283b6051..3de3003cbe 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -17,9 +17,16 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco if (batch && batch->num_rows()) { if (batchInfo.IsSortableInGranule()) { SortableBatches.emplace_back(batch); - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false)); } else { NonSortableBatches.emplace_back(batch); + } + + if (!batchInfo.IsDuplicationsAvailable()) { + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false)); + } else { + AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion_on_ready"); + Y_VERIFY_DEBUG(NArrow::IsSorted(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false)); + Y_VERIFY(IsDuplicationsAvailable()); BatchesToDedup.insert(batch.get()); } } @@ -91,6 +98,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { if (!batch || !batch->num_rows()) { return; } + AFL_ERROR(NKikimrServices::KQP_COMPUTE)("event", "add_not_indexed_batch"); Y_VERIFY(NonSortableBatches.empty()); Y_VERIFY(SortableBatches.empty()); Y_VERIFY(!NotIndexedBatch); diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp index a20f94f1c9..0c82375122 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp @@ -63,10 +63,8 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& OnBatchFilterInitialized(*b, context); batches.pop_front(); } - if (MergeStream.IsValid()) { - while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.Next()) { - --CurrentItemsLimit; - } + while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.DrainCurrent()) { + --CurrentItemsLimit; } if (!CurrentItemsLimit || batches.empty()) { while (batches.size()) { diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index d88dda1a68..a43345df60 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -84,6 +84,37 @@ private: return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, 0) * ReverseSortKff < 0; } + class TPosition { + private: + const TBatchIterator* Owner; + ui32 Position = 0; + bool DeletedFlag = false; + public: + bool IsDeleted() const { + return DeletedFlag; + } + + void TakeIfMoreActual(const TBatchIterator& anotherIterator) { + if (NArrow::ColumnsCompare(Owner->VersionColumns, Position, anotherIterator.VersionColumns, anotherIterator.Position) < 0) { + Owner = &anotherIterator; + Position = anotherIterator.Position; + DeletedFlag = Owner->IsDeleted(); + } + } + + TPosition(const TBatchIterator& owner) + : Owner(&owner) + , Position(Owner->Position) + { + DeletedFlag = Owner->IsDeleted(); + } + + int CompareNoVersion(const TBatchIterator& item) const { + Y_VERIFY_DEBUG(item.Columns.size() == Owner->Columns.size()); + return NArrow::ColumnsCompare(Owner->Columns, Position, item.Columns, item.Position); + } + }; + int CompareNoVersion(const TBatchIterator& item) const { Y_VERIFY_DEBUG(item.Columns.size() == Columns.size()); return NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position); @@ -117,7 +148,7 @@ private: bool operator<(const TBatchIterator& item) const { const int result = CompareNoVersion(item) * ReverseSortKff; if (result == 0) { - return NArrow::ColumnsCompare(VersionColumns, Position, item.VersionColumns, item.Position) > 0; + return NArrow::ColumnsCompare(VersionColumns, Position, item.VersionColumns, item.Position) < 0; } else { return result > 0; } @@ -157,6 +188,20 @@ private: std::vector<TBatchIterator> SortHeap; std::shared_ptr<arrow::Schema> SortSchema; const bool Reverse; + + TBatchIterator::TPosition DrainCurrentPosition() { + Y_VERIFY(SortHeap.size()); + auto position = TBatchIterator::TPosition(SortHeap.front()); + bool isFirst = true; + while (SortHeap.size() && (isFirst || !position.CompareNoVersion(SortHeap.front()))) { + if (!isFirst) { + position.TakeIfMoreActual(SortHeap.front()); + } + NextInHeap(true); + isFirst = false; + } + return position; + } public: TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse) : SortSchema(sortSchema) @@ -207,24 +252,18 @@ public: } } - - - bool Next() { + bool DrainCurrent() { + if (SortHeap.empty()) { + return false; + } while (SortHeap.size()) { - std::pop_heap(SortHeap.begin(), SortHeap.end()); - TBatchIterator mainIterator = std::move(SortHeap.back()); - SortHeap.pop_back(); - while (SortHeap.size() && !mainIterator.CompareNoVersion(SortHeap.front())) { - NextInHeap(true); - } - const bool isDeleted = mainIterator.IsDeleted(); - SortHeap.emplace_back(std::move(mainIterator)); - NextInHeap(false); - if (!isDeleted) { - break; + auto currentPosition = DrainCurrentPosition(); + if (currentPosition.IsDeleted()) { + continue; } + return true; } - return SortHeap.size(); + return false; } }; |