aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-21 09:49:05 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-21 10:07:17 +0300
commitae9bb2cf946979d0a6a8b3e4a2440cdea6622699 (patch)
tree6fbc8d64e347267677069071068a55c92805fff4
parent583e213795443eeca98ba562a63112bcae9a5555 (diff)
downloadydb-ae9bb2cf946979d0a6a8b3e4a2440cdea6622699.tar.gz
KIKIMR-19213: remove memory copy on chunks merging. use separated chunks.
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h1
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp73
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h38
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp22
5 files changed, 87 insertions, 51 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 77e85fa97b..14a6887295 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -22,6 +22,10 @@ std::optional<NOlap::TPartialReadResult> TColumnShardScanIterator::GetBatch() {
return ReadyResults.pop_front();
}
+void TColumnShardScanIterator::PrepareResults() {
+ FillReadyResults();
+}
+
std::shared_ptr<NOlap::NBlobOperations::NRead::ITask> TColumnShardScanIterator::GetNextTaskToRead() {
return IndexedData->ExtractNextReadTask(ReadyResults.size());
}
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 905d31fb7e..2664631c23 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -101,6 +101,7 @@ public:
}
std::optional<NOlap::TPartialReadResult> GetBatch() override;
+ virtual void PrepareResults() override;
virtual std::shared_ptr<NOlap::NBlobOperations::NRead::ITask> GetNextTaskToRead() override;
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index ea1aad6abe..59b6d22c16 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -186,13 +186,14 @@ private:
Y_ABORT_UNLESS(!Finished);
Y_ABORT_UNLESS(ScanIterator);
- if (!ChunksLimiter.HasMore()) {
- ACFL_DEBUG("stage", "bytes limit exhausted")("limit", ChunksLimiter.DebugString());
+ if (ScanIterator->Finished()) {
+ ACFL_DEBUG("stage", "scan iterator is finished")("iterator", ScanIterator->DebugString());
return false;
}
- if (ScanIterator->Finished()) {
- ACFL_DEBUG("stage", "scan iterator is finished")("iterator", ScanIterator->DebugString());
+ if (!ChunksLimiter.HasMore()) {
+ ScanIterator->PrepareResults();
+ ACFL_DEBUG("stage", "bytes limit exhausted")("limit", ChunksLimiter.DebugString());
return false;
}
@@ -1007,3 +1008,67 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext
}
}
+
+namespace NKikimr::NOlap {
+
+class TCurrentBatch {
+private:
+ std::vector<std::shared_ptr<arrow::RecordBatch>> Batches;
+ ui32 RecordsCount = 0;
+public:
+ void AddChunk(const std::shared_ptr<arrow::RecordBatch>& chunk) {
+ AFL_VERIFY(chunk);
+ AFL_VERIFY(chunk->num_rows());
+ Batches.emplace_back(chunk);
+ RecordsCount += chunk->num_rows();
+ }
+
+ ui32 GetRecordsCount() const {
+ return RecordsCount;
+ }
+
+ void FillResult(std::vector<TPartialReadResult>& result, const bool mergePartsToMax) const {
+ AFL_VERIFY(Batches.size());
+ if (mergePartsToMax) {
+ auto res = NArrow::CombineBatches(Batches);
+ AFL_VERIFY(res);
+ result.emplace_back(TPartialReadResult(nullptr, res));
+ } else {
+ for (auto&& i : Batches) {
+ result.emplace_back(TPartialReadResult(nullptr, i));
+ }
+ }
+ }
+};
+
+std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(const std::vector<TPartialReadResult>& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) {
+ TCurrentBatch currentBatch;
+ std::vector<TCurrentBatch> resultBatches;
+ for (auto&& i : resultsExt) {
+ std::shared_ptr<arrow::RecordBatch> currentBatchSplitting = i.ResultBatch;
+ while (currentBatchSplitting && currentBatchSplitting->num_rows()) {
+ const ui32 currentRecordsCount = currentBatch.GetRecordsCount();
+ if (currentRecordsCount + currentBatchSplitting->num_rows() < maxRecordsInResult) {
+ currentBatch.AddChunk(currentBatchSplitting);
+ currentBatchSplitting = nullptr;
+ } else {
+ auto currentSlice = currentBatchSplitting->Slice(0, maxRecordsInResult - currentRecordsCount);
+ currentBatch.AddChunk(currentSlice);
+ resultBatches.emplace_back(std::move(currentBatch));
+ currentBatch = TCurrentBatch();
+ currentBatchSplitting = currentBatchSplitting->Slice(maxRecordsInResult - currentRecordsCount);
+ }
+ }
+ }
+ if (currentBatch.GetRecordsCount()) {
+ resultBatches.emplace_back(std::move(currentBatch));
+ }
+
+ std::vector<TPartialReadResult> result;
+ for (auto&& i : resultBatches) {
+ i.FillResult(result, mergePartsToMax);
+ }
+ return result;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index 22109c1796..5ec177b6ae 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -47,40 +47,7 @@ public:
}
}
- static std::vector<TPartialReadResult> SplitResults(const std::vector<TPartialReadResult>& resultsExt, const ui32 maxRecordsInResult) {
- std::vector<TPartialReadResult> result;
- std::shared_ptr<arrow::RecordBatch> currentBatch;
- for (auto&& i : resultsExt) {
- std::shared_ptr<arrow::RecordBatch> currentBatchSplitting = i.ResultBatch;
- while (currentBatchSplitting && currentBatchSplitting->num_rows()) {
- const ui32 currentRecordsCount = currentBatch ? currentBatch->num_rows() : 0;
- if (currentRecordsCount + currentBatchSplitting->num_rows() < maxRecordsInResult) {
- if (!currentBatch) {
- currentBatch = currentBatchSplitting;
- } else {
- currentBatch = NArrow::CombineBatches({currentBatch, currentBatchSplitting});
- }
- currentBatchSplitting = nullptr;
- } else {
- auto currentSlice = currentBatchSplitting->Slice(0, maxRecordsInResult - currentRecordsCount);
- AFL_VERIFY(currentSlice);
- if (!currentBatch) {
- currentBatch = currentSlice;
- } else {
- currentBatch = NArrow::CombineBatches({currentBatch, currentSlice});
- AFL_VERIFY(currentBatch);
- }
- result.emplace_back(TPartialReadResult(nullptr, currentBatch));
- currentBatch = nullptr;
- currentBatchSplitting = currentBatchSplitting->Slice(maxRecordsInResult - currentRecordsCount);
- }
- }
- }
- if (currentBatch && currentBatch->num_rows()) {
- result.emplace_back(TPartialReadResult(nullptr, currentBatch));
- }
- return result;
- }
+ static std::vector<TPartialReadResult> SplitResults(const std::vector<TPartialReadResult>& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax);
void Slice(const ui32 offset, const ui32 length) {
ResultBatch = ResultBatch->Slice(offset, length);
@@ -151,6 +118,9 @@ public:
}
virtual bool Finished() const = 0;
virtual std::optional<NOlap::TPartialReadResult> GetBatch() = 0;
+ virtual void PrepareResults() {
+
+ }
virtual std::shared_ptr<NOlap::NBlobOperations::NRead::ITask> GetNextTaskToRead() { return nullptr; }
virtual TString DebugString() const {
return "NO_DATA";
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
index 02c13c2c07..501c714751 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
@@ -66,7 +66,7 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPlainReadData::DoExtractReadyRe
}
ReadyResultsCount = 0;
- auto result = TPartialReadResult::SplitResults(PartialResults, maxRowsInBatch);
+ auto result = TPartialReadResult::SplitResults(PartialResults, maxRowsInBatch, GetContext().GetIsInternalRead());
PartialResults.clear();
ui32 count = 0;
for (auto&& r: result) {
@@ -113,25 +113,21 @@ void TPlainReadData::OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch)
}
NKikimr::NOlap::NPlainReader::TFetchingPlan TPlainReadData::GetColumnsFetchingPlan(const bool exclusiveSource) const {
+ if (GetContext().GetIsInternalRead()) {
+ return TFetchingPlan(PKFFColumns, EmptyColumns, exclusiveSource);
+ }
+
if (exclusiveSource) {
- if (Context.GetIsInternalRead()) {
+ if (TrivialEFFlag) {
return TFetchingPlan(FFColumns, EmptyColumns, true);
} else {
- if (TrivialEFFlag) {
- return TFetchingPlan(FFColumns, EmptyColumns, true);
- } else {
- return TFetchingPlan(EFColumns, FFMinusEFColumns, true);
- }
+ return TFetchingPlan(EFColumns, FFMinusEFColumns, true);
}
} else {
- if (GetContext().GetIsInternalRead()) {
+ if (TrivialEFFlag) {
return TFetchingPlan(PKFFColumns, EmptyColumns, false);
} else {
- if (TrivialEFFlag) {
- return TFetchingPlan(PKFFColumns, EmptyColumns, false);
- } else {
- return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false);
- }
+ return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false);
}
}
}