summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-08-03 19:16:55 +0300
committerivanmorozov <[email protected]>2023-08-03 19:16:55 +0300
commit358091205166a3237041ce0c7c601bc67adc1239 (patch)
tree12f8a0ab46aa139bcf66d6b5489c454d4737c76e
parentfe3ee098474d0e60e87a3e3cd2509f0d4806c91b (diff)
KIKIMR-18937: async deduplication in special CPU pool
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp84
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp87
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule_preparation.h46
-rw-r--r--ydb/core/tx/columnshard/engines/reader/ya.make1
10 files changed, 159 insertions, 86 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index a3ac5d2e0a6..4ae43fe7b31 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -36,77 +36,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SliceBatch(const std::shared_pt
return result;
};
-std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>
-GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo) {
- std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo]
- rangesSlices.reserve(batches.size());
- {
- TMap<NArrow::TReplaceKey, std::vector<std::shared_ptr<arrow::RecordBatch>>> points;
-
- for (auto& batch : batches) {
- auto compositeKey = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
- Y_VERIFY(compositeKey && compositeKey->num_rows() > 0);
- auto keyColumns = std::make_shared<NArrow::TArrayVec>(compositeKey->columns());
-
- NArrow::TReplaceKey min(keyColumns, 0);
- NArrow::TReplaceKey max(keyColumns, compositeKey->num_rows() - 1);
-
- points[min].push_back(batch); // insert start
- points[max].push_back({}); // insert end
- }
-
- int sum = 0;
- for (auto& [key, vec] : points) {
- if (!sum) { // count(start) == count(end), start new range
- rangesSlices.push_back({});
- rangesSlices.back().reserve(batches.size());
- }
-
- for (auto& batch : vec) {
- if (batch) {
- ++sum;
- rangesSlices.back().push_back(batch);
- } else {
- --sum;
- }
- }
- }
- }
- return rangesSlices;
-}
-
-std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const TIndexInfo& indexInfo,
- const std::shared_ptr<NArrow::TSortDescription>& description,
- const THashSet<const void*>& batchesToDedup) {
- auto rangesSlices = GroupInKeyRanges(batches, indexInfo);
-
- // Merge slices in ranges
- std::vector<std::shared_ptr<arrow::RecordBatch>> out;
- out.reserve(rangesSlices.size());
- for (auto& slices : rangesSlices) {
- if (slices.empty()) {
- continue;
- }
-
- // Do not merge slice if it's alone in its key range
- if (slices.size() == 1) {
- auto batch = slices[0];
- if (batchesToDedup.count(batch.get())) {
- NArrow::DedupSortedBatch(batch, description->ReplaceKey, out);
- } else {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey));
- out.push_back(batch);
- }
- continue;
- }
- auto batch = NArrow::CombineSortedBatches(slices, description);
- out.push_back(batch);
- }
-
- return out;
-}
-
}
void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
@@ -274,7 +203,6 @@ std::vector<TPartialReadResult> TIndexedReadData::ReadyToOut(const i64 maxRowsIn
Y_VERIFY(SortReplaceDescription);
Y_VERIFY(GranulesContext);
- auto& indexInfo = ReadMetadata->GetIndexInfo();
std::vector<NIndexedReader::TGranule::TPtr> ready = GranulesContext->DetachReadyInOrder();
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out;
out.reserve(ready.size() + 1);
@@ -292,17 +220,7 @@ std::vector<TPartialReadResult> TIndexedReadData::ReadyToOut(const i64 maxRowsIn
if (inGranule.empty()) {
continue;
}
-
- if (granule->IsDuplicationsAvailable()) {
- for (auto& batch : inGranule) {
- Y_VERIFY(batch->num_rows());
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortReplaceDescription->ReplaceKey));
- }
- auto deduped = SpecialMergeSorted(inGranule, indexInfo, SortReplaceDescription, granule->GetBatchesToDedup());
- out.emplace_back(std::move(deduped));
- } else {
- out.emplace_back(std::move(inGranule));
- }
+ out.emplace_back(std::move(inGranule));
}
// Append not indexed data (less then first granule) after granules for DESC sorting
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index bd3a1e2c7fb..0c0c533069f 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -46,6 +46,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index da51f9cbc27..36fdaa3c97e 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -47,6 +47,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index da51f9cbc27..36fdaa3c97e 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -47,6 +47,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index bd3a1e2c7fb..0c0c533069f 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -46,6 +46,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index f46d8aabe6c..d20b7405cc2 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -1,4 +1,5 @@
#include "granule.h"
+#include "granule_preparation.h"
#include "filling_context.h"
#include <ydb/core/tx/columnshard/engines/portion_info.h>
#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
@@ -131,11 +132,25 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
Owner->Wakeup(*this);
}
+void TGranule::OnGranuleDataPrepared(std::vector<std::shared_ptr<arrow::RecordBatch>>&& data) {
+ ReadyFlag = true;
+ RecordBatches = data;
+ Owner->OnGranuleReady(GranuleId);
+}
+
void TGranule::CheckReady() {
if (WaitBatches.empty() && NotIndexedBatchReadyFlag) {
- ReadyFlag = true;
- ACFL_DEBUG("event", "granule_ready")("predicted_size", RawDataSize)("real_size", RawDataSizeReal);
- Owner->OnGranuleReady(GranuleId);
+
+ if (RecordBatches.empty() || !IsDuplicationsAvailable()) {
+ ReadyFlag = true;
+ ACFL_DEBUG("event", "granule_ready")("predicted_size", RawDataSize)("real_size", RawDataSizeReal);
+ Owner->OnGranuleReady(GranuleId);
+ } else {
+ ACFL_DEBUG("event", "granule_preparation")("predicted_size", RawDataSize)("real_size", RawDataSizeReal);
+ std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = std::move(RecordBatches);
+ auto processor = Owner->GetTasksProcessor();
+ processor.Add(*Owner, std::make_shared<TTaskGranulePreparation>(std::move(inGranule), std::move(BatchesToDedup), GranuleId, Owner->GetReadMetadata(), processor.GetObject()));
+ }
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index 95754c8cb1c..89e0ae19879 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -59,6 +59,8 @@ public:
return GranuleId;
}
+ void OnGranuleDataPrepared(std::vector<std::shared_ptr<arrow::RecordBatch>>&& data);
+
const THashSet<const void*>& GetBatchesToDedup() const noexcept {
return BatchesToDedup;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
new file mode 100644
index 00000000000..db05b35257d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
@@ -0,0 +1,87 @@
+#include "granule_preparation.h"
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TTaskGranulePreparation::GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo) {
+ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo]
+ rangesSlices.reserve(batches.size());
+ {
+ TMap<NArrow::TReplaceKey, std::vector<std::shared_ptr<arrow::RecordBatch>>> points;
+
+ for (auto& batch : batches) {
+ auto compositeKey = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
+ Y_VERIFY(compositeKey && compositeKey->num_rows() > 0);
+ auto keyColumns = std::make_shared<NArrow::TArrayVec>(compositeKey->columns());
+
+ NArrow::TReplaceKey min(keyColumns, 0);
+ NArrow::TReplaceKey max(keyColumns, compositeKey->num_rows() - 1);
+
+ points[min].push_back(batch); // insert start
+ points[max].push_back({}); // insert end
+ }
+
+ int sum = 0;
+ for (auto& [key, vec] : points) {
+ if (!sum) { // count(start) == count(end), start new range
+ rangesSlices.push_back({});
+ rangesSlices.back().reserve(batches.size());
+ }
+
+ for (auto& batch : vec) {
+ if (batch) {
+ ++sum;
+ rangesSlices.back().push_back(batch);
+ } else {
+ --sum;
+ }
+ }
+ }
+ }
+ return rangesSlices;
+}
+
+std::vector<std::shared_ptr<arrow::RecordBatch>> TTaskGranulePreparation::SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo, const std::shared_ptr<NArrow::TSortDescription>& description, const THashSet<const void*>& batchesToDedup) {
+ auto rangesSlices = GroupInKeyRanges(batches, indexInfo);
+
+ // Merge slices in ranges
+ std::vector<std::shared_ptr<arrow::RecordBatch>> out;
+ out.reserve(rangesSlices.size());
+ for (auto& slices : rangesSlices) {
+ if (slices.empty()) {
+ continue;
+ }
+
+ // Do not merge slice if it's alone in its key range
+ if (slices.size() == 1) {
+ auto batch = slices[0];
+ if (batchesToDedup.count(batch.get())) {
+ NArrow::DedupSortedBatch(batch, description->ReplaceKey, out);
+ } else {
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey));
+ out.push_back(batch);
+ }
+ continue;
+ }
+ auto batch = NArrow::CombineSortedBatches(slices, description);
+ out.push_back(batch);
+ }
+
+ return out;
+}
+
+bool TTaskGranulePreparation::DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const {
+ indexedDataRead.GetGranuleVerified(GranuleId)->OnGranuleDataPrepared(std::move(BatchesInGranule));
+ return true;
+}
+
+bool TTaskGranulePreparation::DoExecuteImpl() {
+ auto& indexInfo = ReadMetadata->GetIndexInfo();
+ for (auto& batch : BatchesInGranule) {
+ Y_VERIFY(batch->num_rows());
+ Y_VERIFY_DEBUG(NArrow::IsSorted(batch, indexInfo.SortReplaceDescription()->ReplaceKey));
+ }
+ BatchesInGranule = SpecialMergeSorted(BatchesInGranule, indexInfo, indexInfo.SortReplaceDescription(), BatchesToDedup);
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h
new file mode 100644
index 00000000000..7b893f89fbd
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h
@@ -0,0 +1,46 @@
+#pragma once
+#include "conveyor_task.h"
+#include "filling_context.h"
+#include "read_metadata.h"
+#include <ydb/core/formats/arrow/sort_cursor.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TTaskGranulePreparation: public NColumnShard::IDataTasksProcessor::ITask {
+private:
+ using TBase = NColumnShard::IDataTasksProcessor::ITask;
+ mutable std::vector<std::shared_ptr<arrow::RecordBatch>> BatchesInGranule;
+ THashSet<const void*> BatchesToDedup;
+ const ui64 GranuleId;
+ NOlap::TReadMetadata::TConstPtr ReadMetadata;
+
+ static std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>
+ GroupInKeyRanges(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TIndexInfo& indexInfo);
+
+ static std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
+ const TIndexInfo& indexInfo,
+ const std::shared_ptr<NArrow::TSortDescription>& description,
+ const THashSet<const void*>& batchesToDedup);
+
+protected:
+ virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const override;
+ virtual bool DoExecuteImpl() override;
+public:
+
+ virtual TString GetTaskClassIdentifier() const override {
+ return "Reading::GranulePreparation";
+ }
+
+ TTaskGranulePreparation(std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches, THashSet<const void*>&& batchesToDedup,
+ const ui64 granuleId, NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::IDataTasksProcessor::TPtr processor)
+ : TBase(processor)
+ , BatchesInGranule(std::move(batches))
+ , BatchesToDedup(std::move(batchesToDedup))
+ , GranuleId(granuleId)
+ , ReadMetadata(readMetadata) {
+
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make
index 4894dbca62c..a5418d99cc7 100644
--- a/ydb/core/tx/columnshard/engines/reader/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/ya.make
@@ -14,6 +14,7 @@ SRCS(
read_filter_merger.cpp
read_metadata.cpp
read_context.cpp
+ granule_preparation.cpp
)
PEERDIR(