aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-03 11:36:06 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-03 11:36:06 +0300
commit2366526b03d6e6f274a112c5276300f09d101ea8 (patch)
treee3a065f461c0b34b3b49b0e47dd6890cb97dd57b
parent1818d24064c3177c8ca6226b878f3f69d90965dc (diff)
downloadydb-2366526b03d6e6f274a112c5276300f09d101ea8.tar.gz
correct merge control for internal duplications
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h71
6 files changed, 71 insertions, 24 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 22e319727f..1cf45874a5 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -72,7 +72,7 @@ GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches
std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const TIndexInfo& indexInfo,
const std::shared_ptr<NArrow::TSortDescription>& description,
- const THashSet<const void*> batchesToDedup) {
+ const THashSet<const void*>& batchesToDedup) {
auto rangesSlices = GroupInKeyRanges(batches, indexInfo);
// Merge slices in ranges
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 6257c1788e..dfe30b754f 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -15,9 +15,11 @@ TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionI
Y_VERIFY(portionInfo.Records.size());
if (portionInfo.CanIntersectOthers()) {
+ AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "intersect_portion");
Owner->SetDuplicationsAvailable(true);
if (portionInfo.CanHaveDups()) {
- SetDuplicationsAvailable(true);
+ AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion");
+ DuplicationsAvailableFlag = true;
}
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 25f6002a44..d854b4a997 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -33,7 +33,7 @@ private:
ui32 OriginalRecordsCount = 0;
- YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
+ YDB_READONLY_FLAG(DuplicationsAvailable, false);
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
TGranule* Owner;
const TPortionInfo* PortionInfo = nullptr;
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index 58283b6051..3de3003cbe 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -17,9 +17,16 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
if (batch && batch->num_rows()) {
if (batchInfo.IsSortableInGranule()) {
SortableBatches.emplace_back(batch);
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false));
} else {
NonSortableBatches.emplace_back(batch);
+ }
+
+ if (!batchInfo.IsDuplicationsAvailable()) {
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false));
+ } else {
+ AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion_on_ready");
+ Y_VERIFY_DEBUG(NArrow::IsSorted(batch, Owner->GetReadMetadata()->IndexInfo.GetReplaceKey(), false));
+ Y_VERIFY(IsDuplicationsAvailable());
BatchesToDedup.insert(batch.get());
}
}
@@ -91,6 +98,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
if (!batch || !batch->num_rows()) {
return;
}
+ AFL_ERROR(NKikimrServices::KQP_COMPUTE)("event", "add_not_indexed_batch");
Y_VERIFY(NonSortableBatches.empty());
Y_VERIFY(SortableBatches.empty());
Y_VERIFY(!NotIndexedBatch);
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
index a20f94f1c9..0c82375122 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
@@ -63,10 +63,8 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule&
OnBatchFilterInitialized(*b, context);
batches.pop_front();
}
- if (MergeStream.IsValid()) {
- while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.Next()) {
- --CurrentItemsLimit;
- }
+ while ((batches.empty() || MergeStream.HasRecordsInPool(0)) && CurrentItemsLimit && MergeStream.DrainCurrent()) {
+ --CurrentItemsLimit;
}
if (!CurrentItemsLimit || batches.empty()) {
while (batches.size()) {
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index d88dda1a68..a43345df60 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -84,6 +84,37 @@ private:
return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, 0) * ReverseSortKff < 0;
}
+ class TPosition {
+ private:
+ const TBatchIterator* Owner;
+ ui32 Position = 0;
+ bool DeletedFlag = false;
+ public:
+ bool IsDeleted() const {
+ return DeletedFlag;
+ }
+
+ void TakeIfMoreActual(const TBatchIterator& anotherIterator) {
+ if (NArrow::ColumnsCompare(Owner->VersionColumns, Position, anotherIterator.VersionColumns, anotherIterator.Position) < 0) {
+ Owner = &anotherIterator;
+ Position = anotherIterator.Position;
+ DeletedFlag = Owner->IsDeleted();
+ }
+ }
+
+ TPosition(const TBatchIterator& owner)
+ : Owner(&owner)
+ , Position(Owner->Position)
+ {
+ DeletedFlag = Owner->IsDeleted();
+ }
+
+ int CompareNoVersion(const TBatchIterator& item) const {
+ Y_VERIFY_DEBUG(item.Columns.size() == Owner->Columns.size());
+ return NArrow::ColumnsCompare(Owner->Columns, Position, item.Columns, item.Position);
+ }
+ };
+
int CompareNoVersion(const TBatchIterator& item) const {
Y_VERIFY_DEBUG(item.Columns.size() == Columns.size());
return NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position);
@@ -117,7 +148,7 @@ private:
bool operator<(const TBatchIterator& item) const {
const int result = CompareNoVersion(item) * ReverseSortKff;
if (result == 0) {
- return NArrow::ColumnsCompare(VersionColumns, Position, item.VersionColumns, item.Position) > 0;
+ return NArrow::ColumnsCompare(VersionColumns, Position, item.VersionColumns, item.Position) < 0;
} else {
return result > 0;
}
@@ -157,6 +188,20 @@ private:
std::vector<TBatchIterator> SortHeap;
std::shared_ptr<arrow::Schema> SortSchema;
const bool Reverse;
+
+ TBatchIterator::TPosition DrainCurrentPosition() {
+ Y_VERIFY(SortHeap.size());
+ auto position = TBatchIterator::TPosition(SortHeap.front());
+ bool isFirst = true;
+ while (SortHeap.size() && (isFirst || !position.CompareNoVersion(SortHeap.front()))) {
+ if (!isFirst) {
+ position.TakeIfMoreActual(SortHeap.front());
+ }
+ NextInHeap(true);
+ isFirst = false;
+ }
+ return position;
+ }
public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse)
: SortSchema(sortSchema)
@@ -207,24 +252,18 @@ public:
}
}
-
-
- bool Next() {
+ bool DrainCurrent() {
+ if (SortHeap.empty()) {
+ return false;
+ }
while (SortHeap.size()) {
- std::pop_heap(SortHeap.begin(), SortHeap.end());
- TBatchIterator mainIterator = std::move(SortHeap.back());
- SortHeap.pop_back();
- while (SortHeap.size() && !mainIterator.CompareNoVersion(SortHeap.front())) {
- NextInHeap(true);
- }
- const bool isDeleted = mainIterator.IsDeleted();
- SortHeap.emplace_back(std::move(mainIterator));
- NextInHeap(false);
- if (!isDeleted) {
- break;
+ auto currentPosition = DrainCurrentPosition();
+ if (currentPosition.IsDeleted()) {
+ continue;
}
+ return true;
}
- return SortHeap.size();
+ return false;
}
};