diff options
author | chertus <azuikov@ydb.tech> | 2023-03-17 15:11:28 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-17 15:11:28 +0300 |
commit | d20dfd4837a75ee004cde1ad32f1593453a24d9e (patch) | |
tree | 35eed235e0a6372b4bab908ba7dce1f8294b7b50 | |
parent | ccc1f1bc46a5282af7e0372de86bc26b71adc462 (diff) | |
download | ydb-d20dfd4837a75ee004cde1ad32f1593453a24d9e.tar.gz |
rewrite merge dups logic in TIndexedReadData
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.cpp | 184 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.h | 13 |
2 files changed, 101 insertions, 96 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 323bd54bb4..716e099cfc 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -19,6 +19,11 @@ void SliceBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const int64_t maxRowsInBatch, std::vector<std::shared_ptr<arrow::RecordBatch>>& result) { + if (batch->num_rows() <= maxRowsInBatch) { + result.push_back(batch); + return; + } + int64_t offset = 0; while (offset < batch->num_rows()) { int64_t rows = std::min<int64_t>(maxRowsInBatch, batch->num_rows() - offset); @@ -27,43 +32,12 @@ void SliceBatch(const std::shared_ptr<arrow::RecordBatch>& batch, } }; -std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& src, - const TIndexInfo& indexInfo, - const std::shared_ptr<NArrow::TSortDescription>& description, - const int64_t maxRowsInBatch) { - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.reserve(src.size()); - ui64 size = 0; - for (auto& batch : src) { - if (!batch->num_rows()) { - continue; - } - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, description->ReplaceKey)); - - size += batch->num_rows(); - batches.push_back(batch); - } - if (batches.empty()) { - return {}; - } - -#if 1 // Optimization [remove portion's dups] - if (batches.size() == 1) { - if (NArrow::IsSortedAndUnique(batches[0], description->ReplaceKey)) { - std::vector<std::shared_ptr<arrow::RecordBatch>> out; - SliceBatch(batches[0], maxRowsInBatch, out); - return out; - } else { - return NArrow::MergeSortedBatches(batches, description, size); - } - } -#endif - -#if 1 // Optimization [special merge], requires [remove portion's dups] - TVector<TVector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo] +std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> +GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo) { + std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo] rangesSlices.reserve(batches.size()); { - TMap<TMark, TVector<std::shared_ptr<arrow::RecordBatch>>> points; + TMap<TMark, std::vector<std::shared_ptr<arrow::RecordBatch>>> points; for (auto& batch : batches) { std::shared_ptr<arrow::Array> keyColumn = GetFirstPKColumn(indexInfo, batch); @@ -93,6 +67,14 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v } } } + return rangesSlices; +} + +std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, + const TIndexInfo& indexInfo, + const std::shared_ptr<NArrow::TSortDescription>& description, + const THashSet<const void*> batchesToDedup) { + auto rangesSlices = GroupInKeyRanges(batches, indexInfo); // Merge slices in ranges std::vector<std::shared_ptr<arrow::RecordBatch>> out; @@ -104,23 +86,23 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v // The core of optimization: do not merge slice if it's alone in its key range if (slices.size() == 1) { - if (NArrow::IsSortedAndUnique(slices[0], description->ReplaceKey)) { - // Split big batch into smaller batches if needed - SliceBatch(slices[0], maxRowsInBatch, out); - continue; + auto batch = slices[0]; + if (batchesToDedup.count(batch.get())) { + if (!NArrow::IsSortedAndUnique(batch, description->ReplaceKey)) { + batch = NArrow::CombineSortedBatches({batch}, description); + Y_VERIFY(batch); + } } + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); + out.push_back(batch); + continue; } - auto merged = NArrow::MergeSortedBatches(slices, description, maxRowsInBatch); - Y_VERIFY(merged.size() >= 1); - out.insert(out.end(), merged.begin(), merged.end()); + auto batch = NArrow::CombineSortedBatches(slices, description); + out.push_back(batch); } return out; -#else - Y_UNUSED(indexInfo); - return NArrow::MergeSortedBatches(batches, description, size); -#endif } } @@ -174,9 +156,10 @@ THashMap<TBlobRange, ui64> TIndexedReadData::InitRead(ui32 inputBatch, bool inGr ++GranuleWaits[granule]; } - // If no PK dups in portions we could use optimized version of merge + // If there's no PK dups in granule we could use optimized version of merge if (portionInfo.CanHaveDups()) { - PortionsWithSelfDups.emplace(granule); + GranulesWithDups.emplace(granule); + PortionsWithDups.emplace(portion); } for (const NOlap::TColumnRecord& rec : portionInfo.Records) { @@ -249,6 +232,10 @@ std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32 auto portion = portionInfo.Assemble(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data); Y_VERIFY(portion); + + /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey. + /// It's not OK to apply predicate before replacing key duplicates otherwise. + /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here auto batch = NOlap::FilterPortion(portion, *ReadMetadata); Y_VERIFY(batch); @@ -321,6 +308,8 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& } TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxRowsInBatch) { + Y_VERIFY(SortReplaceDescription); + if (NotIndexed.size() != ReadyNotIndexed) { // Wait till we have all not indexed data so we could replace keys in granules return {}; @@ -329,7 +318,9 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR // First time extract OutNotIndexed data if (NotIndexed.size()) { /// @note not indexed data could contain data out of indexed granules - OutNotIndexed = SplitByGranules(std::move(NotIndexed)); + Y_VERIFY(!TsGranules.empty()); + auto mergedBatch = MergeNotIndexed(std::move(NotIndexed)); // merged has no dups + OutNotIndexed = SliceIntoGranules(mergedBatch, TsGranules, IndexInfo()); NotIndexed.clear(); ReadyNotIndexed = 0; } @@ -340,16 +331,16 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR ui64 granule = BatchGranule(batchNo); if (ReadyGranules.count(granule)) { Y_VERIFY(batch); - ui64 portion = BatchPortion[batchNo]; -#if 1 // Optimization [remove portion's dups] - // There could be PK self dups if portion is result of insert (same PK, different snapshot). Remove them. - if (batch->num_rows() && PortionsWithSelfDups.count(portion)) { - auto merged = NArrow::MergeSortedBatches({batch}, SortReplaceDescription, batch->num_rows()); - Y_VERIFY(merged.size() == 1); - batch = merged[0]; + if (batch->num_rows()) { + ui64 portion = BatchPortion[batchNo]; + if (PortionsWithDups.count(portion)) { + Y_VERIFY(GranulesWithDups.count(granule)); + BatchesToDedup.insert(batch.get()); + } else { + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, IndexInfo().GetReplaceKey(), false)); + } + ReadyGranules[granule].push_back(batch); } -#endif - ReadyGranules[granule].emplace(portion, batch); ready.push_back(batchNo); } } @@ -371,8 +362,8 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR } template <typename TCont> -static TVector<ui64> GetReadyInOrder(const TCont& ready, TDeque<ui64>& order) { - TVector<ui64> out; +static std::vector<ui64> GetReadyInOrder(const TCont& ready, TDeque<ui64>& order) { + std::vector<ui64> out; out.reserve(ready.size()); if (order.empty()) { @@ -394,8 +385,10 @@ static TVector<ui64> GetReadyInOrder(const TCont& ready, TDeque<ui64>& order) { } /// @return batches that are not blocked by others -TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::ReadyToOut() { - TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out; +std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::ReadyToOut() { + Y_VERIFY(SortReplaceDescription); + + std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out; out.reserve(ReadyGranules.size() + 1); // Prepend not indexed data (less then first granule) before granules for ASC sorting @@ -405,33 +398,41 @@ TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::Read OutNotIndexed.erase(0); } - TVector<ui64> ready = GetReadyInOrder(ReadyGranules, GranulesOutOrder); + std::vector<ui64> ready = GetReadyInOrder(ReadyGranules, GranulesOutOrder); for (ui64 granule : ready) { - auto& map = ReadyGranules[granule]; - std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule; - - // Add indexed granule data - for (auto& [portion, batch] : map) { - // batch could be empty cause of prefiltration - if (batch->num_rows()) { - inGranule.push_back(batch); - } - } + std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = std::move(ReadyGranules[granule]); + ReadyGranules.erase(granule); + bool canHaveDups = GranulesWithDups.count(granule); // Append not indexed data to granules if (OutNotIndexed.count(granule)) { auto batch = OutNotIndexed[granule]; - if (batch->num_rows()) { // TODO: check why it could be empty + if (batch && batch->num_rows()) { // TODO: check why it could be empty inGranule.push_back(batch); + canHaveDups = true; } OutNotIndexed.erase(granule); } if (inGranule.empty()) { - inGranule.push_back(NArrow::MakeEmptyBatch(ReadMetadata->ResultSchema)); + continue; + } + + if (canHaveDups) { + for (auto& batch : inGranule) { + Y_VERIFY(batch->num_rows()); + Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey)); + } +#if 1 // optimization + auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, BatchesToDedup); + out.emplace_back(std::move(deduped)); +#else + out.push_back(); + out.back().emplace_back(CombineSortedBatches(inGranule, SortReplaceDescription)); +#endif + } else { + out.emplace_back(std::move(inGranule)); } - out.push_back(std::move(inGranule)); - ReadyGranules.erase(granule); } // Append not indexed data (less then first granule) after granules for DESC sorting @@ -444,8 +445,8 @@ TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::Read return out; } -THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> -TIndexedReadData::SplitByGranules(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const { +std::shared_ptr<arrow::RecordBatch> +TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const { Y_VERIFY(ReadMetadata->IsSorted()); Y_VERIFY(IndexInfo().GetSortingKey()); @@ -471,17 +472,12 @@ TIndexedReadData::SplitByGranules(std::vector<std::shared_ptr<arrow::RecordBatch auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription()); Y_VERIFY(merged); Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey())); - - Y_VERIFY(!TsGranules.empty()); - return SliceIntoGranules(merged, TsGranules, indexInfo); + return merged; } TVector<TPartialReadResult> -TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, const int64_t maxRowsInBatch) const { - /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey. - /// It's not OK to apply predicate before replacing key duplicates otherwise. - /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - +TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, + int64_t maxRowsInBatch) const { Y_VERIFY(ReadMetadata->IsSorted()); Y_VERIFY(SortReplaceDescription); @@ -489,14 +485,22 @@ TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBa bool isDesc = ReadMetadata->IsDescSorted(); - for (auto& vec : granules) { - auto batches = SpecialMergeSorted(vec, IndexInfo(), SortReplaceDescription, maxRowsInBatch); + for (auto& batches : granules) { if (batches.empty()) { continue; } + { + std::vector<std::shared_ptr<arrow::RecordBatch>> splitted; + splitted.reserve(batches.size()); + for (auto& batch : batches) { + SliceBatch(batch, maxRowsInBatch, splitted); + } + batches.swap(splitted); + } + if (isDesc) { - TVector<std::shared_ptr<arrow::RecordBatch>> reversed; + std::vector<std::shared_ptr<arrow::RecordBatch>> reversed; reversed.reserve(batches.size()); for (int i = batches.size() - 1; i >= 0; --i) { auto& batch = batches[i]; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 92d5278950..ab167ef694 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -221,14 +221,16 @@ private: THashMap<TBlobRange, ui32> IndexedBlobs; // blobId -> batchNo ui32 ReadyNotIndexed{0}; THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append - THashMap<ui64, TMap<ui64, std::shared_ptr<arrow::RecordBatch>>> ReadyGranules; // granule -> portions + THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyGranules; // granule -> portions data THashMap<ui64, ui32> PortionBatch; // portion -> batch TVector<ui64> BatchPortion; // batch -> portion THashMap<ui64, ui64> PortionGranule; // portion -> granule THashMap<ui64, ui32> GranuleWaits; // granule -> num portions to wait TDeque<ui64> GranulesOutOrder; TMap<TColumnEngineForLogs::TMark, ui64> TsGranules; // ts (key) -> granule - THashSet<ui64> PortionsWithSelfDups; + THashSet<ui64> GranulesWithDups; + THashSet<ui64> PortionsWithDups; + THashSet<const void*> BatchesToDedup; std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; const TIndexInfo& IndexInfo() const { @@ -251,12 +253,11 @@ private: const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const; std::shared_ptr<arrow::RecordBatch> AssembleIndexedBatch(ui32 batchNo); void UpdateGranuleWaits(ui32 batchNo); - THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SplitByGranules( + std::shared_ptr<arrow::RecordBatch> MergeNotIndexed( std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const; - TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyToOut(); + std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyToOut(); TVector<TPartialReadResult> MakeResult( - TVector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, - const int64_t maxRowsInBatch) const; + std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, int64_t maxRowsInBatch) const; }; } |