diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-30 18:17:59 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-30 18:17:59 +0300 |
commit | b454398de8bd6fda1251382513891ede660e6755 (patch) | |
tree | f1681f40e18deb1466e5ddaa3c42ec31ef8a3b57 | |
parent | 2ff48f930125529fff0923dcbfc98ceb0e688c69 (diff) | |
download | ydb-b454398de8bd6fda1251382513891ede660e6755.tar.gz |
incapsulate result objects for memory control
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__index_scan.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__index_scan.h | 54 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.h | 44 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__stats_scan.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__stats_scan.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/read_actor.cpp | 2 |
8 files changed, 115 insertions, 62 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index ccbb1217c75..cb5fd9b6df9 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -6,6 +6,7 @@ namespace NKikimr::NColumnShard { TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, const NOlap::TReadContext& context) : Context(context) + , ReadyResults(context.GetCounters()) , ReadMetadata(readMetadata) , IndexedData(ReadMetadata, false, context) { @@ -23,15 +24,7 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { FillReadyResults(); - - if (ReadyResults.empty()) { - return {}; - } - - auto result(std::move(ReadyResults.front())); - ReadyResults.pop_front(); - - return result; + return ReadyResults.pop_front(); } NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() { @@ -42,19 +35,17 @@ void TColumnShardScanIterator::FillReadyResults() { auto ready = IndexedData.GetReadyResults(MaxRowsInBatch); i64 limitLeft = ReadMetadata->Limit == 0 ? INT64_MAX : ReadMetadata->Limit - ItemsRead; for (size_t i = 0; i < ready.size() && limitLeft; ++i) { - if (ready[i].ResultBatch->num_rows() == 0 && !ready[i].LastReadKey) { + if (ready[i].GetResultBatch()->num_rows() == 0 && !ready[i].GetLastReadKey()) { Y_VERIFY(i + 1 == ready.size(), "Only last batch can be empty!"); break; } - ReadyResults.emplace_back(std::move(ready[i])); - auto& batch = ReadyResults.back(); - if (batch.ResultBatch->num_rows() > limitLeft) { - // Trim the last batch if total row count exceeds the requested limit - batch.ResultBatch = batch.ResultBatch->Slice(0, limitLeft); + auto& batch = ReadyResults.emplace_back(std::move(ready[i])); + if (batch.GetResultBatch()->num_rows() > limitLeft) { + batch.Slice(0, limitLeft); } - limitLeft -= batch.ResultBatch->num_rows(); - ItemsRead += batch.ResultBatch->num_rows(); + limitLeft -= batch.GetResultBatch()->num_rows(); + ItemsRead += batch.GetResultBatch()->num_rows(); } if (limitLeft == 0) { diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 4484ee85744..9e295cc3d79 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -27,13 +27,61 @@ public: using NOlap::TUnifiedBlobId; using NOlap::TBlobRange; +class TReadyResults { +private: + const NColumnShard::TConcreteScanCounters Counters; + std::deque<NOlap::TPartialReadResult> Data; + i64 SumSize = 0; + i64 RecordsCount = 0; +public: + TString DebugString() const { + TStringBuilder sb; + sb + << "count:" << Data.size() << ";" + << "records_count:" << RecordsCount << ";" + << "sum_size:" << SumSize << ";" + ; + if (Data.size()) { + sb << "schema=" << Data.front().GetResultBatch()->schema()->ToString() << ";"; + } + return sb; + } + TReadyResults(const NColumnShard::TConcreteScanCounters& counters) + : Counters(counters) + { + + } + NOlap::TPartialReadResult& emplace_back(NOlap::TPartialReadResult&& v) { + SumSize += v.GetSize(); + RecordsCount += v.GetResultBatch()->num_rows(); + Data.emplace_back(std::move(v)); + return Data.back(); + } + NOlap::TPartialReadResult pop_front() { + if (Data.empty()) { + return NOlap::TPartialReadResult(); + } + auto result = std::move(Data.front()); + SumSize -= result.GetSize(); + RecordsCount -= result.GetResultBatch()->num_rows(); + Data.pop_front(); + return result; + } + bool empty() const { + return Data.empty(); + } + size_t size() const { + return Data.size(); + } +}; + class TColumnShardScanIterator: public TScanIteratorBase { private: NOlap::TReadContext Context; + TReadyResults ReadyResults; NOlap::TReadMetadata::TConstPtr ReadMetadata; NOlap::TIndexedReadData IndexedData; std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted; - TDeque<NOlap::TPartialReadResult> ReadyResults; ui64 ItemsRead = 0; const i64 MaxRowsInBatch = 5000; public: @@ -64,10 +112,6 @@ public: TBlobRange GetNextBlobToRead() override; - size_t ReadyResultsCount() const override { - return ReadyResults.size(); - } - private: void FillReadyResults(); }; diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index cbb1c75b5bd..c726d849ca5 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -268,12 +268,12 @@ private: ResultYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetResultYqlSchema(); } - auto& batch = result.ResultBatch; - if (!batch) { + if (!result.GetResultBatch()) { ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString()); return false; } + auto& batch = result.GetResultBatch(); int numRows = batch->num_rows(); int numColumns = batch->num_columns(); if (!numRows) { @@ -302,8 +302,8 @@ private: break; } } // switch DataFormat - if (result.LastReadKey) { - Result->LastKey = ConvertLastKey(result.LastReadKey); + if (result.GetLastReadKey()) { + Result->LastKey = ConvertLastKey(result.GetLastReadKey()); } else { Y_VERIFY(numRows == 0, "Got non-empty result batch without last key"); } diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index efd721b093a..eee62328ab9 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -2,18 +2,59 @@ #include "blob_cache.h" #include "engines/reader/conveyor_task.h" +#include <ydb/core/formats/arrow/size_calcer.h> +#include <ydb/core/tx/program/program.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::NOlap { // Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation -struct TPartialReadResult { +class TPartialReadResult { +private: std::shared_ptr<arrow::RecordBatch> ResultBatch; // This 1-row batch contains the last key that was read while producing the ResultBatch. // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit std::shared_ptr<arrow::RecordBatch> LastReadKey; +public: + void Slice(const ui32 offset, const ui32 length) { + ResultBatch = ResultBatch->Slice(offset, length); + } + + void ApplyProgram(const NOlap::TProgramContainer& program) { + auto status = program.ApplyProgram(ResultBatch); + if (!status.ok()) { + ErrorString = status.message(); + } + } + + ui64 GetSize() const { + return NArrow::GetBatchDataSize(ResultBatch); + } + + const std::shared_ptr<arrow::RecordBatch>& GetResultBatch() const { + return ResultBatch; + } + + const std::shared_ptr<arrow::RecordBatch>& GetLastReadKey() const { + return LastReadKey; + } + std::string ErrorString; + + TPartialReadResult() = default; + + explicit TPartialReadResult(std::shared_ptr<arrow::RecordBatch> batch) + : ResultBatch(batch) + { + } + + explicit TPartialReadResult( + std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey) + : ResultBatch(batch) + , LastReadKey(lastKey) + { + } }; } @@ -34,7 +75,6 @@ public: virtual bool Finished() const = 0; virtual NOlap::TPartialReadResult GetBatch() = 0; virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); } - virtual size_t ReadyResultsCount() const = 0; virtual TString DebugString() const { return "NO_DATA"; } diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp index ffe33890fc0..b7ae2bf23bf 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp @@ -15,16 +15,10 @@ NKikimr::NOlap::TPartialReadResult TStatsIterator::GetBatch() { // Leave only requested columns auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema); - NOlap::TPartialReadResult out{ - .ResultBatch = std::move(resultBatch), - .LastReadKey = std::move(lastKey) - }; - - auto status = ReadMetadata->GetProgram().ApplyProgram(out.ResultBatch); - if (!status.ok()) { - out.ErrorString = status.message(); - } - return out; + NOlap::TPartialReadResult out(resultBatch, lastKey); + + out.ApplyProgram(ReadMetadata->GetProgram()); + return std::move(out); } std::shared_ptr<arrow::RecordBatch> TStatsIterator::FillStatsBatch() { diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index c6c36fd3e2e..825846f308f 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -55,10 +55,6 @@ public: NOlap::TPartialReadResult GetBatch() override; - size_t ReadyResultsCount() const override { - return IndexStats.empty() ? 0 : 1; - } - private: NOlap::TReadStatsMetadata::TConstPtr ReadMetadata; bool Reverse{false}; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index ab1f584a0e3..cdd06250f34 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -253,9 +253,7 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t auto out = MakeResult(ReadyToOut(), maxRowsInBatch); const bool requireResult = GranulesContext->IsFinished(); // not indexed or the last indexed read (even if it's empty) if (requireResult && out.empty()) { - out.push_back(TPartialReadResult{ - .ResultBatch = NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema()) - }); + out.push_back(TPartialReadResult(NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema()))); } return out; } @@ -338,7 +336,7 @@ static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) { i64 sumRows = 0; for (auto& result : out) { - sumRows += result.ResultBatch->num_rows(); + sumRows += result.GetResultBatch()->num_rows(); } if (sumRows / out.size() > 100) { return; @@ -347,7 +345,7 @@ static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) { std::vector<std::shared_ptr<arrow::RecordBatch>> batches; batches.reserve(out.size()); for (auto& batch : out) { - batches.push_back(batch.ResultBatch); + batches.push_back(batch.GetResultBatch()); } auto res = arrow::Table::FromRecordBatches(batches); @@ -364,15 +362,11 @@ static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) { auto batch = NArrow::ToBatch(*res); std::vector<TPartialReadResult> merged; - merged.emplace_back(TPartialReadResult{ - .ResultBatch = std::move(batch), - .LastReadKey = std::move(out.back().LastReadKey) - }); + merged.emplace_back(TPartialReadResult(batch, out.back().GetLastReadKey())); out.swap(merged); } -std::vector<TPartialReadResult> -TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, +std::vector<TPartialReadResult> TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, int64_t maxRowsInBatch) const { Y_VERIFY(ReadMetadata->IsSorted()); Y_VERIFY(SortReplaceDescription); @@ -423,20 +417,14 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco // Leave only requested columns auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema()); - out.emplace_back(TPartialReadResult{ - .ResultBatch = std::move(resultBatch), - .LastReadKey = std::move(lastKey) - }); + out.emplace_back(TPartialReadResult(resultBatch, lastKey)); } } if (ReadMetadata->GetProgram().HasProgram()) { MergeTooSmallBatches(out); for (auto& result : out) { - auto status = ReadMetadata->GetProgram().ApplyProgram(result.ResultBatch); - if (!status.ok()) { - result.ErrorString = status.message(); - } + result.ApplyProgram(ReadMetadata->GetProgram()); } } return out; diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 5c90910b8b4..93e1049a1e8 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -15,7 +15,7 @@ private: size_t next = 1; for (auto it = ready.begin(); it != ready.end(); ++it, ++next) { const bool lastOne = IndexedData.IsFinished() && (next == ready.size()); - SendResult(ctx, it->ResultBatch, lastOne); + SendResult(ctx, it->GetResultBatch(), lastOne); } } public: |