diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-21 09:49:05 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-21 10:07:17 +0300 |
commit | ae9bb2cf946979d0a6a8b3e4a2440cdea6622699 (patch) | |
tree | 6fbc8d64e347267677069071068a55c92805fff4 | |
parent | 583e213795443eeca98ba562a63112bcae9a5555 (diff) | |
download | ydb-ae9bb2cf946979d0a6a8b3e4a2440cdea6622699.tar.gz |
KIKIMR-19213: remove memory copy on chunks merging. use separated chunks.
5 files changed, 87 insertions, 51 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 77e85fa97b..14a6887295 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -22,6 +22,10 @@ std::optional<NOlap::TPartialReadResult> TColumnShardScanIterator::GetBatch() { return ReadyResults.pop_front(); } +void TColumnShardScanIterator::PrepareResults() { + FillReadyResults(); +} + std::shared_ptr<NOlap::NBlobOperations::NRead::ITask> TColumnShardScanIterator::GetNextTaskToRead() { return IndexedData->ExtractNextReadTask(ReadyResults.size()); } diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 905d31fb7e..2664631c23 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -101,6 +101,7 @@ public: } std::optional<NOlap::TPartialReadResult> GetBatch() override; + virtual void PrepareResults() override; virtual std::shared_ptr<NOlap::NBlobOperations::NRead::ITask> GetNextTaskToRead() override; diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index ea1aad6abe..59b6d22c16 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -186,13 +186,14 @@ private: Y_ABORT_UNLESS(!Finished); Y_ABORT_UNLESS(ScanIterator); - if (!ChunksLimiter.HasMore()) { - ACFL_DEBUG("stage", "bytes limit exhausted")("limit", ChunksLimiter.DebugString()); + if (ScanIterator->Finished()) { + ACFL_DEBUG("stage", "scan iterator is finished")("iterator", ScanIterator->DebugString()); return false; } - if (ScanIterator->Finished()) { - ACFL_DEBUG("stage", "scan iterator is finished")("iterator", ScanIterator->DebugString()); + if (!ChunksLimiter.HasMore()) { + ScanIterator->PrepareResults(); + ACFL_DEBUG("stage", "bytes limit exhausted")("limit", ChunksLimiter.DebugString()); return false; } @@ -1007,3 +1008,67 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext } } + +namespace NKikimr::NOlap { + +class TCurrentBatch { +private: + std::vector<std::shared_ptr<arrow::RecordBatch>> Batches; + ui32 RecordsCount = 0; +public: + void AddChunk(const std::shared_ptr<arrow::RecordBatch>& chunk) { + AFL_VERIFY(chunk); + AFL_VERIFY(chunk->num_rows()); + Batches.emplace_back(chunk); + RecordsCount += chunk->num_rows(); + } + + ui32 GetRecordsCount() const { + return RecordsCount; + } + + void FillResult(std::vector<TPartialReadResult>& result, const bool mergePartsToMax) const { + AFL_VERIFY(Batches.size()); + if (mergePartsToMax) { + auto res = NArrow::CombineBatches(Batches); + AFL_VERIFY(res); + result.emplace_back(TPartialReadResult(nullptr, res)); + } else { + for (auto&& i : Batches) { + result.emplace_back(TPartialReadResult(nullptr, i)); + } + } + } +}; + +std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(const std::vector<TPartialReadResult>& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) { + TCurrentBatch currentBatch; + std::vector<TCurrentBatch> resultBatches; + for (auto&& i : resultsExt) { + std::shared_ptr<arrow::RecordBatch> currentBatchSplitting = i.ResultBatch; + while (currentBatchSplitting && currentBatchSplitting->num_rows()) { + const ui32 currentRecordsCount = currentBatch.GetRecordsCount(); + if (currentRecordsCount + currentBatchSplitting->num_rows() < maxRecordsInResult) { + currentBatch.AddChunk(currentBatchSplitting); + currentBatchSplitting = nullptr; + } else { + auto currentSlice = currentBatchSplitting->Slice(0, maxRecordsInResult - currentRecordsCount); + currentBatch.AddChunk(currentSlice); + resultBatches.emplace_back(std::move(currentBatch)); + currentBatch = TCurrentBatch(); + currentBatchSplitting = currentBatchSplitting->Slice(maxRecordsInResult - currentRecordsCount); + } + } + } + if (currentBatch.GetRecordsCount()) { + resultBatches.emplace_back(std::move(currentBatch)); + } + + std::vector<TPartialReadResult> result; + for (auto&& i : resultBatches) { + i.FillResult(result, mergePartsToMax); + } + return result; +} + +} diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 22109c1796..5ec177b6ae 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -47,40 +47,7 @@ public: } } - static std::vector<TPartialReadResult> SplitResults(const std::vector<TPartialReadResult>& resultsExt, const ui32 maxRecordsInResult) { - std::vector<TPartialReadResult> result; - std::shared_ptr<arrow::RecordBatch> currentBatch; - for (auto&& i : resultsExt) { - std::shared_ptr<arrow::RecordBatch> currentBatchSplitting = i.ResultBatch; - while (currentBatchSplitting && currentBatchSplitting->num_rows()) { - const ui32 currentRecordsCount = currentBatch ? currentBatch->num_rows() : 0; - if (currentRecordsCount + currentBatchSplitting->num_rows() < maxRecordsInResult) { - if (!currentBatch) { - currentBatch = currentBatchSplitting; - } else { - currentBatch = NArrow::CombineBatches({currentBatch, currentBatchSplitting}); - } - currentBatchSplitting = nullptr; - } else { - auto currentSlice = currentBatchSplitting->Slice(0, maxRecordsInResult - currentRecordsCount); - AFL_VERIFY(currentSlice); - if (!currentBatch) { - currentBatch = currentSlice; - } else { - currentBatch = NArrow::CombineBatches({currentBatch, currentSlice}); - AFL_VERIFY(currentBatch); - } - result.emplace_back(TPartialReadResult(nullptr, currentBatch)); - currentBatch = nullptr; - currentBatchSplitting = currentBatchSplitting->Slice(maxRecordsInResult - currentRecordsCount); - } - } - } - if (currentBatch && currentBatch->num_rows()) { - result.emplace_back(TPartialReadResult(nullptr, currentBatch)); - } - return result; - } + static std::vector<TPartialReadResult> SplitResults(const std::vector<TPartialReadResult>& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax); void Slice(const ui32 offset, const ui32 length) { ResultBatch = ResultBatch->Slice(offset, length); @@ -151,6 +118,9 @@ public: } virtual bool Finished() const = 0; virtual std::optional<NOlap::TPartialReadResult> GetBatch() = 0; + virtual void PrepareResults() { + + } virtual std::shared_ptr<NOlap::NBlobOperations::NRead::ITask> GetNextTaskToRead() { return nullptr; } virtual TString DebugString() const { return "NO_DATA"; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp index 02c13c2c07..501c714751 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp @@ -66,7 +66,7 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPlainReadData::DoExtractReadyRe } ReadyResultsCount = 0; - auto result = TPartialReadResult::SplitResults(PartialResults, maxRowsInBatch); + auto result = TPartialReadResult::SplitResults(PartialResults, maxRowsInBatch, GetContext().GetIsInternalRead()); PartialResults.clear(); ui32 count = 0; for (auto&& r: result) { @@ -113,25 +113,21 @@ void TPlainReadData::OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch) } NKikimr::NOlap::NPlainReader::TFetchingPlan TPlainReadData::GetColumnsFetchingPlan(const bool exclusiveSource) const { + if (GetContext().GetIsInternalRead()) { + return TFetchingPlan(PKFFColumns, EmptyColumns, exclusiveSource); + } + if (exclusiveSource) { - if (Context.GetIsInternalRead()) { + if (TrivialEFFlag) { return TFetchingPlan(FFColumns, EmptyColumns, true); } else { - if (TrivialEFFlag) { - return TFetchingPlan(FFColumns, EmptyColumns, true); - } else { - return TFetchingPlan(EFColumns, FFMinusEFColumns, true); - } + return TFetchingPlan(EFColumns, FFMinusEFColumns, true); } } else { - if (GetContext().GetIsInternalRead()) { + if (TrivialEFFlag) { return TFetchingPlan(PKFFColumns, EmptyColumns, false); } else { - if (TrivialEFFlag) { - return TFetchingPlan(PKFFColumns, EmptyColumns, false); - } else { - return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false); - } + return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false); } } } |