aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-30 18:17:59 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-30 18:17:59 +0300
commitb454398de8bd6fda1251382513891ede660e6755 (patch)
treef1681f40e18deb1466e5ddaa3c42ec31ef8a3b57
parent2ff48f930125529fff0923dcbfc98ceb0e688c69 (diff)
downloadydb-b454398de8bd6fda1251382513891ede660e6755.tar.gz
incapsulate result objects for memory control
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp25
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h54
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h44
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h4
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp26
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp2
8 files changed, 115 insertions, 62 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index ccbb1217c75..cb5fd9b6df9 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -6,6 +6,7 @@ namespace NKikimr::NColumnShard {
TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, const NOlap::TReadContext& context)
: Context(context)
+ , ReadyResults(context.GetCounters())
, ReadMetadata(readMetadata)
, IndexedData(ReadMetadata, false, context)
{
@@ -23,15 +24,7 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data
NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
FillReadyResults();
-
- if (ReadyResults.empty()) {
- return {};
- }
-
- auto result(std::move(ReadyResults.front()));
- ReadyResults.pop_front();
-
- return result;
+ return ReadyResults.pop_front();
}
NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() {
@@ -42,19 +35,17 @@ void TColumnShardScanIterator::FillReadyResults() {
auto ready = IndexedData.GetReadyResults(MaxRowsInBatch);
i64 limitLeft = ReadMetadata->Limit == 0 ? INT64_MAX : ReadMetadata->Limit - ItemsRead;
for (size_t i = 0; i < ready.size() && limitLeft; ++i) {
- if (ready[i].ResultBatch->num_rows() == 0 && !ready[i].LastReadKey) {
+ if (ready[i].GetResultBatch()->num_rows() == 0 && !ready[i].GetLastReadKey()) {
Y_VERIFY(i + 1 == ready.size(), "Only last batch can be empty!");
break;
}
- ReadyResults.emplace_back(std::move(ready[i]));
- auto& batch = ReadyResults.back();
- if (batch.ResultBatch->num_rows() > limitLeft) {
- // Trim the last batch if total row count exceeds the requested limit
- batch.ResultBatch = batch.ResultBatch->Slice(0, limitLeft);
+ auto& batch = ReadyResults.emplace_back(std::move(ready[i]));
+ if (batch.GetResultBatch()->num_rows() > limitLeft) {
+ batch.Slice(0, limitLeft);
}
- limitLeft -= batch.ResultBatch->num_rows();
- ItemsRead += batch.ResultBatch->num_rows();
+ limitLeft -= batch.GetResultBatch()->num_rows();
+ ItemsRead += batch.GetResultBatch()->num_rows();
}
if (limitLeft == 0) {
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 4484ee85744..9e295cc3d79 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -27,13 +27,61 @@ public:
using NOlap::TUnifiedBlobId;
using NOlap::TBlobRange;
+class TReadyResults {
+private:
+ const NColumnShard::TConcreteScanCounters Counters;
+ std::deque<NOlap::TPartialReadResult> Data;
+ i64 SumSize = 0;
+ i64 RecordsCount = 0;
+public:
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb
+ << "count:" << Data.size() << ";"
+ << "records_count:" << RecordsCount << ";"
+ << "sum_size:" << SumSize << ";"
+ ;
+ if (Data.size()) {
+ sb << "schema=" << Data.front().GetResultBatch()->schema()->ToString() << ";";
+ }
+ return sb;
+ }
+ TReadyResults(const NColumnShard::TConcreteScanCounters& counters)
+ : Counters(counters)
+ {
+
+ }
+ NOlap::TPartialReadResult& emplace_back(NOlap::TPartialReadResult&& v) {
+ SumSize += v.GetSize();
+ RecordsCount += v.GetResultBatch()->num_rows();
+ Data.emplace_back(std::move(v));
+ return Data.back();
+ }
+ NOlap::TPartialReadResult pop_front() {
+ if (Data.empty()) {
+ return NOlap::TPartialReadResult();
+ }
+ auto result = std::move(Data.front());
+ SumSize -= result.GetSize();
+ RecordsCount -= result.GetResultBatch()->num_rows();
+ Data.pop_front();
+ return result;
+ }
+ bool empty() const {
+ return Data.empty();
+ }
+ size_t size() const {
+ return Data.size();
+ }
+};
+
class TColumnShardScanIterator: public TScanIteratorBase {
private:
NOlap::TReadContext Context;
+ TReadyResults ReadyResults;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
NOlap::TIndexedReadData IndexedData;
std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted;
- TDeque<NOlap::TPartialReadResult> ReadyResults;
ui64 ItemsRead = 0;
const i64 MaxRowsInBatch = 5000;
public:
@@ -64,10 +112,6 @@ public:
TBlobRange GetNextBlobToRead() override;
- size_t ReadyResultsCount() const override {
- return ReadyResults.size();
- }
-
private:
void FillReadyResults();
};
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index cbb1c75b5bd..c726d849ca5 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -268,12 +268,12 @@ private:
ResultYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetResultYqlSchema();
}
- auto& batch = result.ResultBatch;
- if (!batch) {
+ if (!result.GetResultBatch()) {
ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
return false;
}
+ auto& batch = result.GetResultBatch();
int numRows = batch->num_rows();
int numColumns = batch->num_columns();
if (!numRows) {
@@ -302,8 +302,8 @@ private:
break;
}
} // switch DataFormat
- if (result.LastReadKey) {
- Result->LastKey = ConvertLastKey(result.LastReadKey);
+ if (result.GetLastReadKey()) {
+ Result->LastKey = ConvertLastKey(result.GetLastReadKey());
} else {
Y_VERIFY(numRows == 0, "Got non-empty result batch without last key");
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index efd721b093a..eee62328ab9 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -2,18 +2,59 @@
#include "blob_cache.h"
#include "engines/reader/conveyor_task.h"
+#include <ydb/core/formats/arrow/size_calcer.h>
+#include <ydb/core/tx/program/program.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::NOlap {
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
-struct TPartialReadResult {
+class TPartialReadResult {
+private:
std::shared_ptr<arrow::RecordBatch> ResultBatch;
// This 1-row batch contains the last key that was read while producing the ResultBatch.
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
std::shared_ptr<arrow::RecordBatch> LastReadKey;
+public:
+ void Slice(const ui32 offset, const ui32 length) {
+ ResultBatch = ResultBatch->Slice(offset, length);
+ }
+
+ void ApplyProgram(const NOlap::TProgramContainer& program) {
+ auto status = program.ApplyProgram(ResultBatch);
+ if (!status.ok()) {
+ ErrorString = status.message();
+ }
+ }
+
+ ui64 GetSize() const {
+ return NArrow::GetBatchDataSize(ResultBatch);
+ }
+
+ const std::shared_ptr<arrow::RecordBatch>& GetResultBatch() const {
+ return ResultBatch;
+ }
+
+ const std::shared_ptr<arrow::RecordBatch>& GetLastReadKey() const {
+ return LastReadKey;
+ }
+
std::string ErrorString;
+
+ TPartialReadResult() = default;
+
+ explicit TPartialReadResult(std::shared_ptr<arrow::RecordBatch> batch)
+ : ResultBatch(batch)
+ {
+ }
+
+ explicit TPartialReadResult(
+ std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey)
+ : ResultBatch(batch)
+ , LastReadKey(lastKey)
+ {
+ }
};
}
@@ -34,7 +75,6 @@ public:
virtual bool Finished() const = 0;
virtual NOlap::TPartialReadResult GetBatch() = 0;
virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); }
- virtual size_t ReadyResultsCount() const = 0;
virtual TString DebugString() const {
return "NO_DATA";
}
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
index ffe33890fc0..b7ae2bf23bf 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
@@ -15,16 +15,10 @@ NKikimr::NOlap::TPartialReadResult TStatsIterator::GetBatch() {
// Leave only requested columns
auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema);
- NOlap::TPartialReadResult out{
- .ResultBatch = std::move(resultBatch),
- .LastReadKey = std::move(lastKey)
- };
-
- auto status = ReadMetadata->GetProgram().ApplyProgram(out.ResultBatch);
- if (!status.ok()) {
- out.ErrorString = status.message();
- }
- return out;
+ NOlap::TPartialReadResult out(resultBatch, lastKey);
+
+ out.ApplyProgram(ReadMetadata->GetProgram());
+ return std::move(out);
}
std::shared_ptr<arrow::RecordBatch> TStatsIterator::FillStatsBatch() {
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index c6c36fd3e2e..825846f308f 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -55,10 +55,6 @@ public:
NOlap::TPartialReadResult GetBatch() override;
- size_t ReadyResultsCount() const override {
- return IndexStats.empty() ? 0 : 1;
- }
-
private:
NOlap::TReadStatsMetadata::TConstPtr ReadMetadata;
bool Reverse{false};
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index ab1f584a0e3..cdd06250f34 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -253,9 +253,7 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t
auto out = MakeResult(ReadyToOut(), maxRowsInBatch);
const bool requireResult = GranulesContext->IsFinished(); // not indexed or the last indexed read (even if it's empty)
if (requireResult && out.empty()) {
- out.push_back(TPartialReadResult{
- .ResultBatch = NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema())
- });
+ out.push_back(TPartialReadResult(NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema())));
}
return out;
}
@@ -338,7 +336,7 @@ static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) {
i64 sumRows = 0;
for (auto& result : out) {
- sumRows += result.ResultBatch->num_rows();
+ sumRows += result.GetResultBatch()->num_rows();
}
if (sumRows / out.size() > 100) {
return;
@@ -347,7 +345,7 @@ static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) {
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
batches.reserve(out.size());
for (auto& batch : out) {
- batches.push_back(batch.ResultBatch);
+ batches.push_back(batch.GetResultBatch());
}
auto res = arrow::Table::FromRecordBatches(batches);
@@ -364,15 +362,11 @@ static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) {
auto batch = NArrow::ToBatch(*res);
std::vector<TPartialReadResult> merged;
- merged.emplace_back(TPartialReadResult{
- .ResultBatch = std::move(batch),
- .LastReadKey = std::move(out.back().LastReadKey)
- });
+ merged.emplace_back(TPartialReadResult(batch, out.back().GetLastReadKey()));
out.swap(merged);
}
-std::vector<TPartialReadResult>
-TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules,
+std::vector<TPartialReadResult> TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules,
int64_t maxRowsInBatch) const {
Y_VERIFY(ReadMetadata->IsSorted());
Y_VERIFY(SortReplaceDescription);
@@ -423,20 +417,14 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
// Leave only requested columns
auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema());
- out.emplace_back(TPartialReadResult{
- .ResultBatch = std::move(resultBatch),
- .LastReadKey = std::move(lastKey)
- });
+ out.emplace_back(TPartialReadResult(resultBatch, lastKey));
}
}
if (ReadMetadata->GetProgram().HasProgram()) {
MergeTooSmallBatches(out);
for (auto& result : out) {
- auto status = ReadMetadata->GetProgram().ApplyProgram(result.ResultBatch);
- if (!status.ok()) {
- result.ErrorString = status.message();
- }
+ result.ApplyProgram(ReadMetadata->GetProgram());
}
}
return out;
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 5c90910b8b4..93e1049a1e8 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -15,7 +15,7 @@ private:
size_t next = 1;
for (auto it = ready.begin(); it != ready.end(); ++it, ++next) {
const bool lastOne = IndexedData.IsFinished() && (next == ready.size());
- SendResult(ctx, it->ResultBatch, lastOne);
+ SendResult(ctx, it->GetResultBatch(), lastOne);
}
}
public: