diff options
| author | ivanmorozov <[email protected]> | 2023-08-03 19:16:55 +0300 |
|---|---|---|
| committer | ivanmorozov <[email protected]> | 2023-08-03 19:16:55 +0300 |
| commit | 358091205166a3237041ce0c7c601bc67adc1239 (patch) | |
| tree | 12f8a0ab46aa139bcf66d6b5489c454d4737c76e | |
| parent | fe3ee098474d0e60e87a3e3cd2509f0d4806c91b (diff) | |
KIKIMR-18937: async deduplication in special CPU pool
10 files changed, 159 insertions, 86 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index a3ac5d2e0a6..4ae43fe7b31 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -36,77 +36,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SliceBatch(const std::shared_pt return result; }; -std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> -GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo) { - std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo] - rangesSlices.reserve(batches.size()); - { - TMap<NArrow::TReplaceKey, std::vector<std::shared_ptr<arrow::RecordBatch>>> points; - - for (auto& batch : batches) { - auto compositeKey = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); - Y_VERIFY(compositeKey && compositeKey->num_rows() > 0); - auto keyColumns = std::make_shared<NArrow::TArrayVec>(compositeKey->columns()); - - NArrow::TReplaceKey min(keyColumns, 0); - NArrow::TReplaceKey max(keyColumns, compositeKey->num_rows() - 1); - - points[min].push_back(batch); // insert start - points[max].push_back({}); // insert end - } - - int sum = 0; - for (auto& [key, vec] : points) { - if (!sum) { // count(start) == count(end), start new range - rangesSlices.push_back({}); - rangesSlices.back().reserve(batches.size()); - } - - for (auto& batch : vec) { - if (batch) { - ++sum; - rangesSlices.back().push_back(batch); - } else { - --sum; - } - } - } - } - return rangesSlices; -} - -std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const TIndexInfo& indexInfo, - const std::shared_ptr<NArrow::TSortDescription>& description, - const THashSet<const void*>& batchesToDedup) { - auto rangesSlices = GroupInKeyRanges(batches, indexInfo); - - // Merge slices in ranges - std::vector<std::shared_ptr<arrow::RecordBatch>> out; - out.reserve(rangesSlices.size()); - for (auto& slices : rangesSlices) { - if (slices.empty()) { - continue; - } - - // Do not merge slice if it's alone in its key range - if (slices.size() == 1) { - auto batch = slices[0]; - if (batchesToDedup.count(batch.get())) { - NArrow::DedupSortedBatch(batch, description->ReplaceKey, out); - } else { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); - out.push_back(batch); - } - continue; - } - auto batch = NArrow::CombineSortedBatches(slices, description); - out.push_back(batch); - } - - return out; -} - } void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { @@ -274,7 +203,6 @@ std::vector<TPartialReadResult> TIndexedReadData::ReadyToOut(const i64 maxRowsIn Y_VERIFY(SortReplaceDescription); Y_VERIFY(GranulesContext); - auto& indexInfo = ReadMetadata->GetIndexInfo(); std::vector<NIndexedReader::TGranule::TPtr> ready = GranulesContext->DetachReadyInOrder(); std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out; out.reserve(ready.size() + 1); @@ -292,17 +220,7 @@ std::vector<TPartialReadResult> TIndexedReadData::ReadyToOut(const i64 maxRowsIn if (inGranule.empty()) { continue; } - - if (granule->IsDuplicationsAvailable()) { - for (auto& batch : inGranule) { - Y_VERIFY(batch->num_rows()); - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey)); - } - auto deduped = SpecialMergeSorted(inGranule, indexInfo, SortReplaceDescription, granule->GetBatchesToDedup()); - out.emplace_back(std::move(deduped)); - } else { - out.emplace_back(std::move(inGranule)); - } + out.emplace_back(std::move(inGranule)); } // Append not indexed data (less then first granule) after granules for DESC sorting diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index bd3a1e2c7fb..0c0c533069f 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -46,6 +46,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index da51f9cbc27..36fdaa3c97e 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -47,6 +47,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index da51f9cbc27..36fdaa3c97e 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -47,6 +47,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index bd3a1e2c7fb..0c0c533069f 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -46,6 +46,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index f46d8aabe6c..d20b7405cc2 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -1,4 +1,5 @@ #include "granule.h" +#include "granule_preparation.h" #include "filling_context.h" #include <ydb/core/tx/columnshard/engines/portion_info.h> #include <ydb/core/tx/columnshard/engines/indexed_read_data.h> @@ -131,11 +132,25 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { Owner->Wakeup(*this); } +void TGranule::OnGranuleDataPrepared(std::vector<std::shared_ptr<arrow::RecordBatch>>&& data) { + ReadyFlag = true; + RecordBatches = data; + Owner->OnGranuleReady(GranuleId); +} + void TGranule::CheckReady() { if (WaitBatches.empty() && NotIndexedBatchReadyFlag) { - ReadyFlag = true; - ACFL_DEBUG("event", "granule_ready")("predicted_size", RawDataSize)("real_size", RawDataSizeReal); - Owner->OnGranuleReady(GranuleId); + + if (RecordBatches.empty() || !IsDuplicationsAvailable()) { + ReadyFlag = true; + ACFL_DEBUG("event", "granule_ready")("predicted_size", RawDataSize)("real_size", RawDataSizeReal); + Owner->OnGranuleReady(GranuleId); + } else { + ACFL_DEBUG("event", "granule_preparation")("predicted_size", RawDataSize)("real_size", RawDataSizeReal); + std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = std::move(RecordBatches); + auto processor = Owner->GetTasksProcessor(); + processor.Add(*Owner, std::make_shared<TTaskGranulePreparation>(std::move(inGranule), std::move(BatchesToDedup), GranuleId, Owner->GetReadMetadata(), processor.GetObject())); + } } } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index 95754c8cb1c..89e0ae19879 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -59,6 +59,8 @@ public: return GranuleId; } + void OnGranuleDataPrepared(std::vector<std::shared_ptr<arrow::RecordBatch>>&& data); + const THashSet<const void*>& GetBatchesToDedup() const noexcept { return BatchesToDedup; } diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp new file mode 100644 index 00000000000..db05b35257d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp @@ -0,0 +1,87 @@ +#include "granule_preparation.h" + +namespace NKikimr::NOlap::NIndexedReader { + +std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TTaskGranulePreparation::GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo) { + std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo] + rangesSlices.reserve(batches.size()); + { + TMap<NArrow::TReplaceKey, std::vector<std::shared_ptr<arrow::RecordBatch>>> points; + + for (auto& batch : batches) { + auto compositeKey = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); + Y_VERIFY(compositeKey && compositeKey->num_rows() > 0); + auto keyColumns = std::make_shared<NArrow::TArrayVec>(compositeKey->columns()); + + NArrow::TReplaceKey min(keyColumns, 0); + NArrow::TReplaceKey max(keyColumns, compositeKey->num_rows() - 1); + + points[min].push_back(batch); // insert start + points[max].push_back({}); // insert end + } + + int sum = 0; + for (auto& [key, vec] : points) { + if (!sum) { // count(start) == count(end), start new range + rangesSlices.push_back({}); + rangesSlices.back().reserve(batches.size()); + } + + for (auto& batch : vec) { + if (batch) { + ++sum; + rangesSlices.back().push_back(batch); + } else { + --sum; + } + } + } + } + return rangesSlices; +} + +std::vector<std::shared_ptr<arrow::RecordBatch>> TTaskGranulePreparation::SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo, const std::shared_ptr<NArrow::TSortDescription>& description, const THashSet<const void*>& batchesToDedup) { + auto rangesSlices = GroupInKeyRanges(batches, indexInfo); + + // Merge slices in ranges + std::vector<std::shared_ptr<arrow::RecordBatch>> out; + out.reserve(rangesSlices.size()); + for (auto& slices : rangesSlices) { + if (slices.empty()) { + continue; + } + + // Do not merge slice if it's alone in its key range + if (slices.size() == 1) { + auto batch = slices[0]; + if (batchesToDedup.count(batch.get())) { + NArrow::DedupSortedBatch(batch, description->ReplaceKey, out); + } else { + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); + out.push_back(batch); + } + continue; + } + auto batch = NArrow::CombineSortedBatches(slices, description); + out.push_back(batch); + } + + return out; +} + +bool TTaskGranulePreparation::DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const { + indexedDataRead.GetGranuleVerified(GranuleId)->OnGranuleDataPrepared(std::move(BatchesInGranule)); + return true; +} + +bool TTaskGranulePreparation::DoExecuteImpl() { + auto& indexInfo = ReadMetadata->GetIndexInfo(); + for (auto& batch : BatchesInGranule) { + Y_VERIFY(batch->num_rows()); + Y_VERIFY_DEBUG(NArrow::IsSorted(batch, indexInfo.SortReplaceDescription()->ReplaceKey)); + } + BatchesInGranule = SpecialMergeSorted(BatchesInGranule, indexInfo, indexInfo.SortReplaceDescription(), BatchesToDedup); + return true; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h new file mode 100644 index 00000000000..7b893f89fbd --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h @@ -0,0 +1,46 @@ +#pragma once +#include "conveyor_task.h" +#include "filling_context.h" +#include "read_metadata.h" +#include <ydb/core/formats/arrow/sort_cursor.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NOlap::NIndexedReader { + +class TTaskGranulePreparation: public NColumnShard::IDataTasksProcessor::ITask { +private: + using TBase = NColumnShard::IDataTasksProcessor::ITask; + mutable std::vector<std::shared_ptr<arrow::RecordBatch>> BatchesInGranule; + THashSet<const void*> BatchesToDedup; + const ui64 GranuleId; + NOlap::TReadMetadata::TConstPtr ReadMetadata; + + static std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> + GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo); + + static std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, + const TIndexInfo& indexInfo, + const std::shared_ptr<NArrow::TSortDescription>& description, + const THashSet<const void*>& batchesToDedup); + +protected: + virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const override; + virtual bool DoExecuteImpl() override; +public: + + virtual TString GetTaskClassIdentifier() const override { + return "Reading::GranulePreparation"; + } + + TTaskGranulePreparation(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches, THashSet<const void*>&& batchesToDedup, + const ui64 granuleId, NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::IDataTasksProcessor::TPtr processor) + : TBase(processor) + , BatchesInGranule(std::move(batches)) + , BatchesToDedup(std::move(batchesToDedup)) + , GranuleId(granuleId) + , ReadMetadata(readMetadata) { + + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make index 4894dbca62c..a5418d99cc7 100644 --- a/ydb/core/tx/columnshard/engines/reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/ya.make @@ -14,6 +14,7 @@ SRCS( read_filter_merger.cpp read_metadata.cpp read_context.cpp + granule_preparation.cpp ) PEERDIR( |
