aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-17 15:11:28 +0300
committerchertus <azuikov@ydb.tech>2023-03-17 15:11:28 +0300
commitd20dfd4837a75ee004cde1ad32f1593453a24d9e (patch)
tree35eed235e0a6372b4bab908ba7dce1f8294b7b50
parentccc1f1bc46a5282af7e0372de86bc26b71adc462 (diff)
downloadydb-d20dfd4837a75ee004cde1ad32f1593453a24d9e.tar.gz
rewrite merge dups logic in TIndexedReadData
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp184
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h13
2 files changed, 101 insertions, 96 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 323bd54bb4..716e099cfc 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -19,6 +19,11 @@ void SliceBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const int64_t maxRowsInBatch,
std::vector<std::shared_ptr<arrow::RecordBatch>>& result)
{
+ if (batch->num_rows() <= maxRowsInBatch) {
+ result.push_back(batch);
+ return;
+ }
+
int64_t offset = 0;
while (offset < batch->num_rows()) {
int64_t rows = std::min<int64_t>(maxRowsInBatch, batch->num_rows() - offset);
@@ -27,43 +32,12 @@ void SliceBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
}
};
-std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& src,
- const TIndexInfo& indexInfo,
- const std::shared_ptr<NArrow::TSortDescription>& description,
- const int64_t maxRowsInBatch) {
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.reserve(src.size());
- ui64 size = 0;
- for (auto& batch : src) {
- if (!batch->num_rows()) {
- continue;
- }
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, description->ReplaceKey));
-
- size += batch->num_rows();
- batches.push_back(batch);
- }
- if (batches.empty()) {
- return {};
- }
-
-#if 1 // Optimization [remove portion's dups]
- if (batches.size() == 1) {
- if (NArrow::IsSortedAndUnique(batches[0], description->ReplaceKey)) {
- std::vector<std::shared_ptr<arrow::RecordBatch>> out;
- SliceBatch(batches[0], maxRowsInBatch, out);
- return out;
- } else {
- return NArrow::MergeSortedBatches(batches, description, size);
- }
- }
-#endif
-
-#if 1 // Optimization [special merge], requires [remove portion's dups]
- TVector<TVector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo]
+std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>
+GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo) {
+ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo]
rangesSlices.reserve(batches.size());
{
- TMap<TMark, TVector<std::shared_ptr<arrow::RecordBatch>>> points;
+ TMap<TMark, std::vector<std::shared_ptr<arrow::RecordBatch>>> points;
for (auto& batch : batches) {
std::shared_ptr<arrow::Array> keyColumn = GetFirstPKColumn(indexInfo, batch);
@@ -93,6 +67,14 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
}
}
}
+ return rangesSlices;
+}
+
+std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
+ const TIndexInfo& indexInfo,
+ const std::shared_ptr<NArrow::TSortDescription>& description,
+ const THashSet<const void*> batchesToDedup) {
+ auto rangesSlices = GroupInKeyRanges(batches, indexInfo);
// Merge slices in ranges
std::vector<std::shared_ptr<arrow::RecordBatch>> out;
@@ -104,23 +86,23 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
// The core of optimization: do not merge slice if it's alone in its key range
if (slices.size() == 1) {
- if (NArrow::IsSortedAndUnique(slices[0], description->ReplaceKey)) {
- // Split big batch into smaller batches if needed
- SliceBatch(slices[0], maxRowsInBatch, out);
- continue;
+ auto batch = slices[0];
+ if (batchesToDedup.count(batch.get())) {
+ if (!NArrow::IsSortedAndUnique(batch, description->ReplaceKey)) {
+ batch = NArrow::CombineSortedBatches({batch}, description);
+ Y_VERIFY(batch);
+ }
}
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey));
+ out.push_back(batch);
+ continue;
}
- auto merged = NArrow::MergeSortedBatches(slices, description, maxRowsInBatch);
- Y_VERIFY(merged.size() >= 1);
- out.insert(out.end(), merged.begin(), merged.end());
+ auto batch = NArrow::CombineSortedBatches(slices, description);
+ out.push_back(batch);
}
return out;
-#else
- Y_UNUSED(indexInfo);
- return NArrow::MergeSortedBatches(batches, description, size);
-#endif
}
}
@@ -174,9 +156,10 @@ THashMap<TBlobRange, ui64> TIndexedReadData::InitRead(ui32 inputBatch, bool inGr
++GranuleWaits[granule];
}
- // If no PK dups in portions we could use optimized version of merge
+ // If there's no PK dups in granule we could use optimized version of merge
if (portionInfo.CanHaveDups()) {
- PortionsWithSelfDups.emplace(granule);
+ GranulesWithDups.emplace(granule);
+ PortionsWithDups.emplace(portion);
}
for (const NOlap::TColumnRecord& rec : portionInfo.Records) {
@@ -249,6 +232,10 @@ std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32
auto portion = portionInfo.Assemble(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data);
Y_VERIFY(portion);
+
+ /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey.
+ /// It's not OK to apply predicate before replacing key duplicates otherwise.
+ /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
auto batch = NOlap::FilterPortion(portion, *ReadMetadata);
Y_VERIFY(batch);
@@ -321,6 +308,8 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
}
TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxRowsInBatch) {
+ Y_VERIFY(SortReplaceDescription);
+
if (NotIndexed.size() != ReadyNotIndexed) {
// Wait till we have all not indexed data so we could replace keys in granules
return {};
@@ -329,7 +318,9 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
// First time extract OutNotIndexed data
if (NotIndexed.size()) {
/// @note not indexed data could contain data out of indexed granules
- OutNotIndexed = SplitByGranules(std::move(NotIndexed));
+ Y_VERIFY(!TsGranules.empty());
+ auto mergedBatch = MergeNotIndexed(std::move(NotIndexed)); // merged has no dups
+ OutNotIndexed = SliceIntoGranules(mergedBatch, TsGranules, IndexInfo());
NotIndexed.clear();
ReadyNotIndexed = 0;
}
@@ -340,16 +331,16 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
ui64 granule = BatchGranule(batchNo);
if (ReadyGranules.count(granule)) {
Y_VERIFY(batch);
- ui64 portion = BatchPortion[batchNo];
-#if 1 // Optimization [remove portion's dups]
- // There could be PK self dups if portion is result of insert (same PK, different snapshot). Remove them.
- if (batch->num_rows() && PortionsWithSelfDups.count(portion)) {
- auto merged = NArrow::MergeSortedBatches({batch}, SortReplaceDescription, batch->num_rows());
- Y_VERIFY(merged.size() == 1);
- batch = merged[0];
+ if (batch->num_rows()) {
+ ui64 portion = BatchPortion[batchNo];
+ if (PortionsWithDups.count(portion)) {
+ Y_VERIFY(GranulesWithDups.count(granule));
+ BatchesToDedup.insert(batch.get());
+ } else {
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, IndexInfo().GetReplaceKey(), false));
+ }
+ ReadyGranules[granule].push_back(batch);
}
-#endif
- ReadyGranules[granule].emplace(portion, batch);
ready.push_back(batchNo);
}
}
@@ -371,8 +362,8 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
}
template <typename TCont>
-static TVector<ui64> GetReadyInOrder(const TCont& ready, TDeque<ui64>& order) {
- TVector<ui64> out;
+static std::vector<ui64> GetReadyInOrder(const TCont& ready, TDeque<ui64>& order) {
+ std::vector<ui64> out;
out.reserve(ready.size());
if (order.empty()) {
@@ -394,8 +385,10 @@ static TVector<ui64> GetReadyInOrder(const TCont& ready, TDeque<ui64>& order) {
}
/// @return batches that are not blocked by others
-TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::ReadyToOut() {
- TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out;
+std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::ReadyToOut() {
+ Y_VERIFY(SortReplaceDescription);
+
+ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out;
out.reserve(ReadyGranules.size() + 1);
// Prepend not indexed data (less then first granule) before granules for ASC sorting
@@ -405,33 +398,41 @@ TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::Read
OutNotIndexed.erase(0);
}
- TVector<ui64> ready = GetReadyInOrder(ReadyGranules, GranulesOutOrder);
+ std::vector<ui64> ready = GetReadyInOrder(ReadyGranules, GranulesOutOrder);
for (ui64 granule : ready) {
- auto& map = ReadyGranules[granule];
- std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule;
-
- // Add indexed granule data
- for (auto& [portion, batch] : map) {
- // batch could be empty cause of prefiltration
- if (batch->num_rows()) {
- inGranule.push_back(batch);
- }
- }
+ std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = std::move(ReadyGranules[granule]);
+ ReadyGranules.erase(granule);
+ bool canHaveDups = GranulesWithDups.count(granule);
// Append not indexed data to granules
if (OutNotIndexed.count(granule)) {
auto batch = OutNotIndexed[granule];
- if (batch->num_rows()) { // TODO: check why it could be empty
+ if (batch && batch->num_rows()) { // TODO: check why it could be empty
inGranule.push_back(batch);
+ canHaveDups = true;
}
OutNotIndexed.erase(granule);
}
if (inGranule.empty()) {
- inGranule.push_back(NArrow::MakeEmptyBatch(ReadMetadata->ResultSchema));
+ continue;
+ }
+
+ if (canHaveDups) {
+ for (auto& batch : inGranule) {
+ Y_VERIFY(batch->num_rows());
+ Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey));
+ }
+#if 1 // optimization
+ auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, BatchesToDedup);
+ out.emplace_back(std::move(deduped));
+#else
+ out.push_back();
+ out.back().emplace_back(CombineSortedBatches(inGranule, SortReplaceDescription));
+#endif
+ } else {
+ out.emplace_back(std::move(inGranule));
}
- out.push_back(std::move(inGranule));
- ReadyGranules.erase(granule);
}
// Append not indexed data (less then first granule) after granules for DESC sorting
@@ -444,8 +445,8 @@ TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::Read
return out;
}
-THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-TIndexedReadData::SplitByGranules(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const {
+std::shared_ptr<arrow::RecordBatch>
+TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const {
Y_VERIFY(ReadMetadata->IsSorted());
Y_VERIFY(IndexInfo().GetSortingKey());
@@ -471,17 +472,12 @@ TIndexedReadData::SplitByGranules(std::vector<std::shared_ptr<arrow::RecordBatch
auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription());
Y_VERIFY(merged);
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey()));
-
- Y_VERIFY(!TsGranules.empty());
- return SliceIntoGranules(merged, TsGranules, indexInfo);
+ return merged;
}
TVector<TPartialReadResult>
-TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, const int64_t maxRowsInBatch) const {
- /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey.
- /// It's not OK to apply predicate before replacing key duplicates otherwise.
- /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
-
+TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules,
+ int64_t maxRowsInBatch) const {
Y_VERIFY(ReadMetadata->IsSorted());
Y_VERIFY(SortReplaceDescription);
@@ -489,14 +485,22 @@ TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBa
bool isDesc = ReadMetadata->IsDescSorted();
- for (auto& vec : granules) {
- auto batches = SpecialMergeSorted(vec, IndexInfo(), SortReplaceDescription, maxRowsInBatch);
+ for (auto& batches : granules) {
if (batches.empty()) {
continue;
}
+ {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> splitted;
+ splitted.reserve(batches.size());
+ for (auto& batch : batches) {
+ SliceBatch(batch, maxRowsInBatch, splitted);
+ }
+ batches.swap(splitted);
+ }
+
if (isDesc) {
- TVector<std::shared_ptr<arrow::RecordBatch>> reversed;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> reversed;
reversed.reserve(batches.size());
for (int i = batches.size() - 1; i >= 0; --i) {
auto& batch = batches[i];
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 92d5278950..ab167ef694 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -221,14 +221,16 @@ private:
THashMap<TBlobRange, ui32> IndexedBlobs; // blobId -> batchNo
ui32 ReadyNotIndexed{0};
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append
- THashMap<ui64, TMap<ui64, std::shared_ptr<arrow::RecordBatch>>> ReadyGranules; // granule -> portions
+ THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyGranules; // granule -> portions data
THashMap<ui64, ui32> PortionBatch; // portion -> batch
TVector<ui64> BatchPortion; // batch -> portion
THashMap<ui64, ui64> PortionGranule; // portion -> granule
THashMap<ui64, ui32> GranuleWaits; // granule -> num portions to wait
TDeque<ui64> GranulesOutOrder;
TMap<TColumnEngineForLogs::TMark, ui64> TsGranules; // ts (key) -> granule
- THashSet<ui64> PortionsWithSelfDups;
+ THashSet<ui64> GranulesWithDups;
+ THashSet<ui64> PortionsWithDups;
+ THashSet<const void*> BatchesToDedup;
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
const TIndexInfo& IndexInfo() const {
@@ -251,12 +253,11 @@ private:
const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const;
std::shared_ptr<arrow::RecordBatch> AssembleIndexedBatch(ui32 batchNo);
void UpdateGranuleWaits(ui32 batchNo);
- THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SplitByGranules(
+ std::shared_ptr<arrow::RecordBatch> MergeNotIndexed(
std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const;
- TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyToOut();
+ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyToOut();
TVector<TPartialReadResult> MakeResult(
- TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules,
- const int64_t maxRowsInBatch) const;
+ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, int64_t maxRowsInBatch) const;
};
}