diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-19 23:41:42 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-19 23:54:21 +0300 |
commit | 0bffcf36ed900c47b3abe5b0aeac86dcfb00a16f (patch) | |
tree | c716c1b61ec379ffcebd7e85dfa7c2701f252ab1 | |
parent | c0e09264adc48044d0fd225374245e9b06a4d78c (diff) | |
download | ydb-0bffcf36ed900c47b3abe5b0aeac86dcfb00a16f.tar.gz |
KIKIMR-19880: move interval data construction into separated thread. dont merge results into one pack
17 files changed, 221 insertions, 178 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 3e3b9a30fe..6dbcc045ee 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -188,7 +188,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr auto srcSchema = srcBatch->schema(); for (auto& name : columnNames) { const int pos = srcSchema->GetFieldIndex(name); - AFL_VERIFY(pos >= 0)("field_name", name)("fields", srcBatch->schema()->ToString()); + AFL_VERIFY(pos >= 0)("field_name", name)("names", JoinSeq(",", columnNames))("fields", JoinSeq(",", srcBatch->schema()->field_names())); fields.push_back(srcSchema->field(pos)); columns.push_back(srcBatch->column(pos)); } diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp index 270a6c505b..ce92ca26c7 100644 --- a/ydb/core/formats/arrow/permutations.cpp +++ b/ydb/core/formats/arrow/permutations.cpp @@ -4,6 +4,7 @@ #include "size_calcer.h" #include <ydb/core/formats/arrow/common/validation.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> +#include <ydb/library/services/services.pb.h> #include <library/cpp/actors/core/log.h> namespace NKikimr::NArrow { @@ -181,6 +182,7 @@ bool THashConstructor::BuildHashUI64(std::shared_ptr<arrow::RecordBatch>& batch, if (fieldNames.size() == 1) { auto column = batch->GetColumnByName(fieldNames.front()); if (!column) { + AFL_WARN(NKikimrServices::ARROW_HELPER)("event", "cannot_build_hash")("reason", "field_not_found")("field_name", fieldNames.front()); return false; } Y_ABORT_UNLESS(column); @@ -202,6 +204,7 @@ bool THashConstructor::BuildHashUI64(std::shared_ptr<arrow::RecordBatch>& batch, } } if (columns.empty()) { + AFL_WARN(NKikimrServices::ARROW_HELPER)("event", "cannot_build_hash")("reason", "fields_not_found")("field_names", JoinSeq(",", fieldNames)); return false; } for (i64 i = 0; i < batch->num_rows(); ++i) { @@ -298,25 +301,37 @@ ui64 TShardedRecordBatch::GetMemorySize() const { return NArrow::GetBatchMemorySize(RecordBatch); } +TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch): RecordBatch(batch) { + AFL_VERIFY(RecordBatch); + SplittedByShards = {RecordBatch}; +} + +TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards) + : RecordBatch(batch) + , SplittedByShards(std::move(splittedByShards)) +{ + AFL_VERIFY(RecordBatch); + AFL_VERIFY(SplittedByShards.size()); +} + std::vector<std::shared_ptr<arrow::RecordBatch>> TShardingSplitIndex::Apply(const std::shared_ptr<arrow::RecordBatch>& input) { - Y_ABORT_UNLESS(input); - Y_ABORT_UNLESS(input->num_rows() == RecordsCount); + AFL_VERIFY(input); + AFL_VERIFY(input->num_rows() == RecordsCount); auto permutation = BuildPermutation(); auto resultBatch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(input, *permutation)).record_batch(); - Y_ABORT_UNLESS(resultBatch->num_rows() == RecordsCount); + AFL_VERIFY(resultBatch->num_rows() == RecordsCount); std::vector<std::shared_ptr<arrow::RecordBatch>> result; ui64 startIndex = 0; for (auto&& i : Remapping) { result.emplace_back(resultBatch->Slice(startIndex, i.size())); startIndex += i.size(); } + AFL_VERIFY(startIndex == RecordsCount); return result; } NKikimr::NArrow::TShardedRecordBatch TShardingSplitIndex::Apply(const ui32 shardsCount, const std::shared_ptr<arrow::RecordBatch>& input, const std::string& hashColumnName) { - if (!input) { - return TShardedRecordBatch(); - } + AFL_VERIFY(input); if (shardsCount == 1) { return TShardedRecordBatch(input); } @@ -336,7 +351,8 @@ NKikimr::NArrow::TShardedRecordBatch TShardingSplitIndex::Apply(const ui32 shard } else { Y_ABORT_UNLESS(false); } - return TShardedRecordBatch(input, splitter->Apply(input)); + auto resultBatch = NArrow::TStatusValidator::GetValid(input->RemoveColumn(input->schema()->GetFieldIndex(hashColumnName))); + return TShardedRecordBatch(resultBatch, splitter->Apply(resultBatch)); } std::shared_ptr<arrow::UInt64Array> TShardingSplitIndex::BuildPermutation() const { diff --git a/ydb/core/formats/arrow/permutations.h b/ydb/core/formats/arrow/permutations.h index 6cba30c2bb..8acf5240fe 100644 --- a/ydb/core/formats/arrow/permutations.h +++ b/ydb/core/formats/arrow/permutations.h @@ -18,16 +18,30 @@ private: YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, RecordBatch); YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, SplittedByShards); public: - TShardedRecordBatch() = default; - TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch) - : RecordBatch(batch) { - SplittedByShards = {RecordBatch}; + TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch); + + void Cut(const ui32 limit) { + RecordBatch = RecordBatch->Slice(0, limit); + for (auto&& i : SplittedByShards) { + if (i->num_rows() > limit) { + i = i->Slice(0, limit); + } + } + } + + bool IsSharded() const { + return SplittedByShards.size() > 1; } - TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards) - : RecordBatch(batch) - , SplittedByShards(std::move(splittedByShards)) - { + TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards); + + void StripColumns(const std::shared_ptr<arrow::Schema>& schema) { + if (RecordBatch) { + RecordBatch = NArrow::ExtractColumns(RecordBatch, schema); + } + for (auto&& i : SplittedByShards) { + i = NArrow::ExtractColumns(i, schema); + } } ui64 GetMemorySize() const; diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index 1ca0ee34e0..66906851f8 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -895,12 +895,13 @@ std::shared_ptr<NArrow::TColumnFilter> TProgram::ApplyEarlyFilter(std::shared_pt NArrow::TStatusValidator::Validate(step->ApplyAssignes(*datumBatch, NArrow::GetCustomExecContext())); NArrow::TColumnFilter local = NArrow::TColumnFilter::BuildAllowFilter(); NArrow::TStatusValidator::Validate(step->MakeCombinedFilter(*datumBatch, local)); - *filter = filter->CombineSequentialAnd(local); if (!useFilter) { - break; - } - if (!local.Apply(batch)) { - break; + *filter = filter->And(local); + } else { + *filter = filter->CombineSequentialAnd(local); + if (!local.Apply(batch)) { + break; + } } } catch (const std::exception& ex) { AFL_VERIFY(false); diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index dd52217086..79349592d1 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -34,14 +34,9 @@ void TColumnShardScanIterator::FillReadyResults() { auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch); i64 limitLeft = Context->GetReadMetadata()->Limit == 0 ? INT64_MAX : Context->GetReadMetadata()->Limit - ItemsRead; for (size_t i = 0; i < ready.size() && limitLeft; ++i) { - if (ready[i].GetResultBatch().num_rows() == 0 && !ready[i].GetLastReadKey()) { - Y_ABORT_UNLESS(i + 1 == ready.size(), "Only last batch can be empty!"); - break; - } - auto& batch = ReadyResults.emplace_back(std::move(ready[i])); if (batch.GetResultBatch().num_rows() > limitLeft) { - batch.Slice(0, limitLeft); + batch.Cut(limitLeft); } limitLeft -= batch.GetResultBatch().num_rows(); ItemsRead += batch.GetResultBatch().num_rows(); diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 22c9e4f06f..0a54eb28b4 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -97,6 +97,7 @@ public: , Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/)) , ComputeShardingPolicy(computeShardingPolicy) { + KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema(); } void Bootstrap(const TActorContext& ctx) { @@ -219,40 +220,42 @@ private: return false; } - auto batch = result.GetResultBatchPtrVerified(); - int numRows = batch->num_rows(); - int numColumns = batch->num_columns(); - if (!numRows) { + if (!result.GetRecordsCount()) { ACFL_DEBUG("stage", "got empty batch")("iterator", ScanIterator->DebugString()); return true; } - ACFL_DEBUG("stage", "ready result")("iterator", ScanIterator->DebugString())("format", NKikimrDataEvents::EDataFormat_Name(DataFormat)) - ("columns", numColumns)("rows", numRows); + auto& shardedBatch = result.GetShardedBatch(); + auto batch = shardedBatch.GetRecordBatch(); + int numRows = batch->num_rows(); + int numColumns = batch->num_columns(); + ACFL_DEBUG("stage", "ready result")("iterator", ScanIterator->DebugString())("columns", numColumns)("rows", result.GetRecordsCount()); - std::optional<NArrow::TShardedRecordBatch> shardedBatch; - if (NArrow::THashConstructor::BuildHashUI64(batch, ComputeShardingPolicy.GetColumnNames(), "__compute_sharding_hash")) { - shardedBatch = NArrow::TShardingSplitIndex::Apply(ComputeShardingPolicy.GetShardsCount(), batch, "__compute_sharding_hash"); - } AFL_VERIFY(DataFormat == NKikimrDataEvents::FORMAT_ARROW); { MakeResult(0); - if (ComputeShardingPolicy.IsEnabled() && shardedBatch) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_success")("count", shardedBatch->GetSplittedByShards().size()); - Result->SplittedBatches = shardedBatch->GetSplittedByShards(); + if (shardedBatch.IsSharded()) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_success")("count", shardedBatch.GetSplittedByShards().size())("info", ComputeShardingPolicy.DebugString()); + Result->SplittedBatches = shardedBatch.GetSplittedByShards(); + Result->ArrowBatch = shardedBatch.GetRecordBatch(); } else { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_problems")("sb", !!shardedBatch)("policy", ComputeShardingPolicy.IsEnabled()); - Result->ArrowBatch = batch; + if (ComputeShardingPolicy.IsEnabled()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_problems")("info", ComputeShardingPolicy.DebugString()); + } + Result->ArrowBatch = shardedBatch.GetRecordBatch(); } Rows += batch->num_rows(); Bytes += NArrow::GetBatchDataSize(batch); ACFL_DEBUG("stage", "data_format")("batch_size", NArrow::GetBatchDataSize(batch))("num_rows", numRows)("batch_columns", JoinSeq(",", batch->schema()->field_names())); } - if (result.GetLastReadKey()) { - Result->LastKey = ConvertLastKey(result.GetLastReadKey()); - } else { - Y_ABORT_UNLESS(numRows == 0, "Got non-empty result batch without last key"); + if (CurrentLastReadKey) { + NOlap::NIndexedReader::TSortableBatchPosition pNew(result.GetLastReadKey(), 0, result.GetLastReadKey()->schema()->field_names(), {}, false); + NOlap::NIndexedReader::TSortableBatchPosition pOld(CurrentLastReadKey, 0, CurrentLastReadKey->schema()->field_names(), {}, false); + AFL_VERIFY(pOld < pNew); } + CurrentLastReadKey = result.GetLastReadKey(); + + Result->LastKey = ConvertLastKey(result.GetLastReadKey()); SendResult(false, false); ACFL_DEBUG("stage", "finished")("iterator", ScanIterator->DebugString()); return true; @@ -533,6 +536,7 @@ private: TChunksLimiter ChunksLimiter; THolder<TEvKqpCompute::TEvScanData> Result; + std::shared_ptr<arrow::RecordBatch> CurrentLastReadKey; i64 InFlightReads = 0; bool Finished = false; @@ -874,50 +878,48 @@ namespace NKikimr::NOlap { class TCurrentBatch { private: std::vector<TPartialReadResult> Results; + ui64 RecordsCount = 0; public: + ui64 GetRecordsCount() const { + return RecordsCount; + } + void AddChunk(TPartialReadResult&& res) { + RecordsCount += res.GetRecordsCount(); Results.emplace_back(std::move(res)); } - void FillResult(std::vector<TPartialReadResult>& result, const bool /*mergePartsToMax*/) const { + void FillResult(std::vector<TPartialReadResult>& result, const bool mergePartsToMax) const { if (Results.empty()) { return; } - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>> guards; - for (auto&& i : Results) { - batches.emplace_back(i.GetResultBatchPtrVerified()); - guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end()); + if (mergePartsToMax) { + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>> guards; + for (auto&& i : Results) { + batches.emplace_back(i.GetResultBatchPtrVerified()); + guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end()); + } + auto res = NArrow::CombineBatches(batches); + AFL_VERIFY(res); + result.emplace_back(TPartialReadResult(guards, NArrow::TShardedRecordBatch(res), Results.back().GetLastReadKey())); + } else { + for (auto&& i : Results) { + result.emplace_back(std::move(i)); + } } - auto res = NArrow::CombineBatches(batches); - AFL_VERIFY(res); - result.emplace_back(TPartialReadResult(guards, res, Results.back().GetLastReadKey())); } }; -std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 /*maxRecordsInResult*/, const bool /*mergePartsToMax*/) { +std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) { + std::vector<TCurrentBatch> resultBatches; TCurrentBatch currentBatch; for (auto&& i : resultsExt) { + AFL_VERIFY(i.GetRecordsCount()); currentBatch.AddChunk(std::move(i)); - } - std::vector<TPartialReadResult> result; - currentBatch.FillResult(result, true); - /* - std::vector<TCurrentBatch> resultBatches; - for (auto&& i : resultsExt) { - std::shared_ptr<arrow::RecordBatch> currentBatchSplitting = i.ResultBatch; - while (currentBatchSplitting && currentBatchSplitting->num_rows()) { - const ui32 currentRecordsCount = currentBatch.GetRecordsCount(); - if (currentRecordsCount + currentBatchSplitting->num_rows() < maxRecordsInResult) { - currentBatch.AddChunk(currentBatchSplitting, i.GetResourcesGuards()); - currentBatchSplitting = nullptr; - } else { - auto currentSlice = currentBatchSplitting->Slice(0, maxRecordsInResult - currentRecordsCount); - currentBatch.AddChunk(currentSlice, i.GetResourcesGuards()); - resultBatches.emplace_back(std::move(currentBatch)); - currentBatch = TCurrentBatch(); - currentBatchSplitting = currentBatchSplitting->Slice(maxRecordsInResult - currentRecordsCount); - } + if (currentBatch.GetRecordsCount() >= maxRecordsInResult) { + resultBatches.emplace_back(std::move(currentBatch)); + currentBatch = TCurrentBatch(); } } if (currentBatch.GetRecordsCount()) { @@ -926,10 +928,8 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults std::vector<TPartialReadResult> result; for (auto&& i : resultBatches) { - Y_UNUSED(mergePartsToMax); - i.FillResult(result, true); + i.FillResult(result, mergePartsToMax); } - */ return result; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index d8269927b1..7576cb3247 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -13,13 +13,25 @@ namespace NKikimr::NOlap { class TPartialReadResult { private: YDB_READONLY_DEF(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>, ResourcesGuards); - std::shared_ptr<arrow::RecordBatch> ResultBatch; + NArrow::TShardedRecordBatch ResultBatch; // This 1-row batch contains the last key that was read while producing the ResultBatch. // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit std::shared_ptr<arrow::RecordBatch> LastReadKey; public: + void Cut(const ui32 limit) { + ResultBatch.Cut(limit); + } + + const arrow::RecordBatch& GetResultBatch() const { + return *ResultBatch.GetRecordBatch(); + } + + const std::shared_ptr<arrow::RecordBatch>& GetResultBatchPtrVerified() const { + return ResultBatch.GetRecordBatch(); + } + const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& GetResourcesGuardOnly() const { AFL_VERIFY(ResourcesGuards.size() == 1); AFL_VERIFY(!!ResourcesGuards.front()); @@ -27,45 +39,19 @@ public: } ui64 GetMemorySize() const { - return NArrow::GetBatchMemorySize(ResultBatch); + return ResultBatch.GetMemorySize(); } ui64 GetRecordsCount() const { - return ResultBatch ? ResultBatch->num_rows() : 0; - } - - void StripColumns(const std::shared_ptr<arrow::Schema>& schema) { - if (ResultBatch) { - ResultBatch = NArrow::ExtractColumns(ResultBatch, schema); - } + return ResultBatch.GetRecordsCount(); } static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax); - void Slice(const ui32 offset, const ui32 length) { - ResultBatch = ResultBatch->Slice(offset, length); - } - - void ApplyProgram(const NOlap::TProgramContainer& program) { - if (!program.HasProgram()) { - return; - } - auto status = program.ApplyProgram(ResultBatch); - if (!status.ok()) { - ErrorString = status.message(); - } - } - - const std::shared_ptr<arrow::RecordBatch>& GetResultBatchPtrVerified() const { - Y_ABORT_UNLESS(ResultBatch); + const NArrow::TShardedRecordBatch& GetShardedBatch() const { return ResultBatch; } - const arrow::RecordBatch& GetResultBatch() const { - Y_ABORT_UNLESS(ResultBatch); - return *ResultBatch; - } - const std::shared_ptr<arrow::RecordBatch>& GetLastReadKey() const { return LastReadKey; } @@ -74,26 +60,26 @@ public: explicit TPartialReadResult( const std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>& resourcesGuards, - std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey) + const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey) : ResourcesGuards(resourcesGuards) , ResultBatch(batch) , LastReadKey(lastKey) { for (auto&& i : ResourcesGuards) { AFL_VERIFY(i); } - Y_ABORT_UNLESS(ResultBatch); - Y_ABORT_UNLESS(ResultBatch->num_rows()); + Y_ABORT_UNLESS(ResultBatch.GetRecordsCount()); Y_ABORT_UNLESS(LastReadKey); + Y_ABORT_UNLESS(LastReadKey->num_rows() == 1); } explicit TPartialReadResult( const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuards, - std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey) + const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey) : TPartialReadResult(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>({resourcesGuards}), batch, lastKey) { AFL_VERIFY(resourcesGuards); } - explicit TPartialReadResult(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey) + explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey) : TPartialReadResult(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>(), batch, lastKey) { } }; diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp index aaad1e70c8..05fe129905 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp @@ -16,10 +16,12 @@ std::optional<NOlap::TPartialReadResult> TStatsIterator::GetBatch() { } // Leave only requested columns auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema); - + NArrow::TStatusValidator::Validate(ReadMetadata->GetProgram().ApplyProgram(resultBatch)); + if (!resultBatch->num_rows()) { + return {}; + } NOlap::TPartialReadResult out(resultBatch, lastKey); - out.ApplyProgram(ReadMetadata->GetProgram()); return std::move(out); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp index 5b65afa0d0..1a94bbb315 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp @@ -15,9 +15,6 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS for (auto&& i : external.ColumnIds) { result.ColumnIds.erase(i); } - for (auto&& i : external.ColumnNames) { - result.ColumnNames.erase(i); - } arrow::FieldVector fields; for (auto&& i : Schema->fields()) { if (!external.Schema->GetFieldByName(i->name())) { @@ -25,13 +22,13 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS } } result.Schema = std::make_shared<arrow::Schema>(fields); + result.Rebuild(); return result; } NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsSet& external) const { TColumnsSet result = *this; result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end()); - result.ColumnNames.insert(external.ColumnNames.begin(), external.ColumnNames.end()); auto fields = result.Schema->fields(); for (auto&& i : external.Schema->fields()) { if (!result.Schema->GetFieldByName(i->name())) { @@ -39,6 +36,7 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsS } } result.Schema = std::make_shared<arrow::Schema>(fields); + result.Rebuild(); return result; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h index 9da046b6e7..58e44c3d09 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h @@ -8,20 +8,23 @@ class TColumnsSet { private: YDB_READONLY_DEF(std::set<ui32>, ColumnIds); YDB_READONLY_DEF(std::set<TString>, ColumnNames); - mutable std::optional<std::vector<TString>> ColumnNamesVector; + std::vector<TString> ColumnNamesVector; YDB_READONLY_DEF(std::shared_ptr<arrow::Schema>, Schema); + + void Rebuild() { + ColumnNamesVector.clear(); + ColumnNames.clear(); + for (auto&& i : Schema->field_names()) { + ColumnNamesVector.emplace_back(i); + ColumnNames.emplace(i); + } + } + public: TColumnsSet() = default; const std::vector<TString>& GetColumnNamesVector() const { - if (!ColumnNamesVector) { - std::vector<TString> result; - for (auto&& i : Schema->field_names()) { - result.emplace_back(i); - } - ColumnNamesVector = std::move(result); - } - return *ColumnNamesVector; + return ColumnNamesVector; } ui32 GetSize() const { @@ -30,20 +33,18 @@ public: bool ColumnsOnly(const std::vector<std::string>& fieldNames) const; - TColumnsSet(const std::set<ui32>& columnIds, const TIndexInfo& indexInfo) { - ColumnIds = columnIds; + TColumnsSet(const std::set<ui32>& columnIds, const TIndexInfo& indexInfo) + : ColumnIds(columnIds) + { Schema = indexInfo.GetColumnsSchema(ColumnIds); - for (auto&& i : ColumnIds) { - ColumnNames.emplace(indexInfo.GetColumnName(i)); - } + Rebuild(); } - TColumnsSet(const std::vector<ui32>& columnIds, const TIndexInfo& indexInfo) { - for (auto&& i : columnIds) { - Y_ABORT_UNLESS(ColumnIds.emplace(i).second); - ColumnNames.emplace(indexInfo.GetColumnName(i)); - } + TColumnsSet(const std::vector<ui32>& columnIds, const TIndexInfo& indexInfo) + : ColumnIds(columnIds.begin(), columnIds.end()) + { Schema = indexInfo.GetColumnsSchema(ColumnIds); + Rebuild(); } bool Contains(const std::shared_ptr<TColumnsSet>& columnsSet) const { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp index febfa79e71..f6e7c92632 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp @@ -13,7 +13,7 @@ bool TAssembleFilter::DoExecute() { TPortionInfo::TPreparedBatchData::TAssembleOptions options; options.IncludedColumnIds = FilterColumnIds; ui32 needSnapshotColumnsRestore = 0; - const bool needSnapshotsFilter = ReadMetadata->GetSnapshot() < RecordsMaxSnapshot; + const bool needSnapshotsFilter = true;// ReadMetadata->GetSnapshot() <= RecordsMaxSnapshot; if (!needSnapshotsFilter && UseFilter) { for (auto&& i : TIndexInfo::GetSpecialColumnIds()) { needSnapshotColumnsRestore += options.IncludedColumnIds->erase(i) ? 1 : 0; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 7f0056d49d..c115c8b1f1 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -15,29 +15,54 @@ private: std::map<ui32, std::shared_ptr<IDataSource>> Sources; std::shared_ptr<TMergingContext> MergingContext; const ui32 IntervalIdx; + std::optional<NArrow::TShardedRecordBatch> ShardedBatch; + + void PrepareResultBatch() { + if (!ResultBatch || ResultBatch->num_rows() == 0) { + ResultBatch = nullptr; + LastPK = nullptr; + return; + } + { + ResultBatch = NArrow::ExtractColumns(ResultBatch, Context->GetReadMetadata()->GetResultSchema()); + AFL_VERIFY(ResultBatch->num_columns() == Context->GetReadMetadata()->GetResultSchema()->num_fields()); + NArrow::TStatusValidator::Validate(Context->GetReadMetadata()->GetProgram().ApplyProgram(ResultBatch)); + } + if (ResultBatch->num_rows()) { + const auto& shardingPolicy = Context->GetCommonContext()->GetComputeShardingPolicy(); + if (NArrow::THashConstructor::BuildHashUI64(ResultBatch, shardingPolicy.GetColumnNames(), "__compute_sharding_hash")) { + ShardedBatch = NArrow::TShardingSplitIndex::Apply(shardingPolicy.GetShardsCount(), ResultBatch, "__compute_sharding_hash"); + } else { + ShardedBatch = NArrow::TShardedRecordBatch(ResultBatch); + } + AFL_VERIFY(!!LastPK == !!ShardedBatch->GetRecordsCount())("lpk", !!LastPK)("sb", ShardedBatch->GetRecordsCount()); + } else { + ResultBatch = nullptr; + LastPK = nullptr; + } + } protected: virtual bool DoApply(NOlap::IDataReader& indexedDataRead) const override { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoApply")("interval_idx", MergingContext->GetIntervalIdx()); auto& reader = static_cast<TPlainReadData&>(indexedDataRead); - reader.MutableScanner().OnIntervalResult(ResultBatch, LastPK, IntervalIdx, reader); + reader.MutableScanner().OnIntervalResult(ShardedBatch, LastPK, IntervalIdx, reader); return true; } virtual bool DoExecute() override { - if (MergingContext->IsExclusiveInterval(Sources.size())) { + if (MergingContext->IsExclusiveInterval()) { ResultBatch = Sources.begin()->second->GetBatch(); - LastPK = Sources.begin()->second->GetLastPK(); - Sources.clear(); if (ResultBatch && ResultBatch->num_rows()) { + LastPK = Sources.begin()->second->GetLastPK(); ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetResultColumns()->GetColumnNamesVector()); AFL_VERIFY(ResultBatch)("info", Context->GetResultColumns()->GetSchema()->ToString()); Context->GetCommonContext()->GetCounters().OnNoScanInterval(ResultBatch->num_rows()); if (Context->GetCommonContext()->IsReverse()) { ResultBatch = NArrow::ReverseRecords(ResultBatch); } - } else { - ResultBatch = nullptr; - LastPK = nullptr; + PrepareResultBatch(); } + Sources.clear(); + AFL_VERIFY(!!LastPK == (!!ResultBatch && ResultBatch->num_rows())); return true; } std::shared_ptr<NIndexedReader::TMergePartialStream> merger = Context->BuildMerger(); @@ -73,10 +98,11 @@ protected: Context->GetCommonContext()->GetCounters().OnLinearScanInterval(rbBuilder->GetRecordsCount()); ResultBatch = rbBuilder->Finalize(); } - AFL_VERIFY(!!lastResultPosition == (!!ResultBatch && ResultBatch->num_rows())); if (lastResultPosition) { LastPK = lastResultPosition->ExtractSortingPosition(); } + AFL_VERIFY(!!LastPK == (!!ResultBatch && ResultBatch->num_rows())); + PrepareResultBatch(); return true; } public: @@ -129,9 +155,9 @@ void TFetchingInterval::OnSourceFilterStageReady(const ui32 /*sourceIdx*/) { TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const std::shared_ptr<TSpecialReadContext>& context, - const bool includeFinish, const bool includeStart) - : TTaskBase(0, context->GetMemoryForSources(sources, TMergingContext::IsExclusiveInterval(sources.size(), includeStart, includeFinish)), "", context->GetCommonContext()->GetResourcesTaskContext()) - , MergingContext(std::make_shared<TMergingContext>(start, finish, intervalIdx, includeFinish, includeStart)) + const bool includeFinish, const bool includeStart, const bool isExclusiveInterval) + : TTaskBase(0, context->GetMemoryForSources(sources, isExclusiveInterval), "", context->GetCommonContext()->GetResourcesTaskContext()) + , MergingContext(std::make_shared<TMergingContext>(start, finish, intervalIdx, includeFinish, includeStart, isExclusiveInterval)) , Context(context) , TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) , Sources(sources) @@ -148,7 +174,7 @@ void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptr<NResourceBro ("resources", guard->DebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size()); OnInitResourcesGuard(guard); for (auto&& [_, i] : Sources) { - i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval(Sources.size())), i); + i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval()), i); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h index b8be9a1243..4c72476158 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h @@ -14,23 +14,22 @@ protected: YDB_READONLY(bool, IncludeFinish, true); YDB_READONLY(bool, IncludeStart, false); YDB_READONLY(ui32, IntervalIdx, 0); + bool IsExclusiveIntervalFlag = false; public: - bool IsExclusiveInterval(const ui32 sourcesCount) const { - return IsExclusiveInterval(sourcesCount, IncludeStart, IncludeFinish); - } - - static bool IsExclusiveInterval(const ui32 sourcesCount, const bool includeStart, const bool includeFinish) { - return includeFinish && includeStart && sourcesCount == 1; - } - TMergingContext(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, - const ui32 intervalIdx, const bool includeFinish, const bool includeStart) + const ui32 intervalIdx, const bool includeFinish, const bool includeStart, const bool isExclusiveInterval) : Start(start) , Finish(finish) , IncludeFinish(includeFinish) , IncludeStart(includeStart) - , IntervalIdx(intervalIdx) { + , IntervalIdx(intervalIdx) + , IsExclusiveIntervalFlag(isExclusiveInterval) + { + + } + bool IsExclusiveInterval() const { + return IsExclusiveIntervalFlag; } NJson::TJsonValue DebugJson() const { @@ -39,6 +38,7 @@ public: result.InsertValue("idx", IntervalIdx); result.InsertValue("finish", Finish.DebugJson()); result.InsertValue("include_finish", IncludeFinish); + result.InsertValue("exclusive", IsExclusiveIntervalFlag); return result; } @@ -116,7 +116,7 @@ public: TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const std::shared_ptr<TSpecialReadContext>& context, - const bool includeFinish, const bool includeStart); + const bool includeFinish, const bool includeStart, const bool isExclusiveInterval); }; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp index 54b006f660..e818904dc4 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp @@ -51,19 +51,19 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte } std::vector<NKikimr::NOlap::TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) { - if ((ReadyResultsCount < maxRowsInBatch || (GetContext().GetIsInternalRead() && ReadyResultsCount < maxRowsInBatch)) && !Scanner->IsFinished()) { + if ((GetContext().GetIsInternalRead() && ReadyResultsCount < maxRowsInBatch) && !Scanner->IsFinished()) { return {}; } - ReadyResultsCount = 0; - auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch, GetContext().GetIsInternalRead()); - PartialResults.clear(); ui32 count = 0; for (auto&& r: result) { - r.StripColumns(GetReadMetadata()->GetResultSchema()); count += r.GetRecordsCount(); - r.ApplyProgram(GetReadMetadata()->GetProgram()); } + AFL_VERIFY(count == ReadyResultsCount); + + ReadyResultsCount = 0; + PartialResults.clear(); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)("finished", Scanner->IsFinished()); return result; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp index a468e978ff..0fd0c4d799 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp @@ -5,18 +5,18 @@ namespace NKikimr::NOlap::NPlainReader { -void TScanHead::OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& newBatch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader) { +void TScanHead::OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch>& newBatch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader) { auto itInterval = FetchingIntervals.find(intervalIdx); AFL_VERIFY(itInterval != FetchingIntervals.end()); if (!Context->GetCommonContext()->GetReadMetadata()->IsSorted()) { - if (newBatch && newBatch->num_rows()) { - reader.OnIntervalResult(std::make_shared<TPartialReadResult>(itInterval->second->GetResourcesGuard(), newBatch, lastPK)); + if (newBatch && newBatch->GetRecordsCount()) { + reader.OnIntervalResult(std::make_shared<TPartialReadResult>(itInterval->second->GetResourcesGuard(), *newBatch, lastPK)); } AFL_VERIFY(FetchingIntervals.erase(intervalIdx)); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result")("interval_idx", intervalIdx)("count", newBatch ? newBatch->num_rows() : 0); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result")("interval_idx", intervalIdx)("count", newBatch ? newBatch->GetRecordsCount() : 0); } else { - if (newBatch && newBatch->num_rows()) { - AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(itInterval->second->GetResourcesGuard(), newBatch, lastPK)).second); + if (newBatch && newBatch->GetRecordsCount()) { + AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(itInterval->second->GetResourcesGuard(), *newBatch, lastPK)).second); } else { AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second); } @@ -72,7 +72,6 @@ bool TScanHead::BuildNextInterval() { while (BorderPoints.size()) { auto firstBorderPointInfo = std::move(BorderPoints.begin()->second); bool includeStart = firstBorderPointInfo.GetStartSources().size(); - for (auto&& i : firstBorderPointInfo.GetStartSources()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("add_source", i->GetSourceIdx()); AFL_VERIFY(CurrentSegments.emplace(i->GetSourceIdx(), i).second)("idx", i->GetSourceIdx()); @@ -83,7 +82,7 @@ bool TScanHead::BuildNextInterval() { const ui32 intervalIdx = SegmentIdxCounter++; auto it = FetchingIntervals.emplace(intervalIdx, std::make_shared<TFetchingInterval>( BorderPoints.begin()->first, BorderPoints.begin()->first, intervalIdx, CurrentSegments, - Context, true, true)).first; + Context, true, true, false)).first; IntervalStats.emplace_back(CurrentSegments.size(), true); NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), it->second); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", it->second->DebugJson()); @@ -100,9 +99,10 @@ bool TScanHead::BuildNextInterval() { Y_ABORT_UNLESS(BorderPoints.size()); const bool includeFinish = BorderPoints.begin()->second.GetStartSources().empty(); const ui32 intervalIdx = SegmentIdxCounter++; + const bool isExclusiveInterval = (CurrentSegments.size() == 1) && includeStart && includeFinish; auto it = FetchingIntervals.emplace(intervalIdx, std::make_shared<TFetchingInterval>( *CurrentStart, BorderPoints.begin()->first, intervalIdx, CurrentSegments, - Context, includeFinish, includeStart)).first; + Context, includeFinish, includeStart, isExclusiveInterval)).first; IntervalStats.emplace_back(CurrentSegments.size(), false); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", it->second->DebugJson()); NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), it->second); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h index bc505f416c..bd646f68d0 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h @@ -74,7 +74,7 @@ public: return sb; } - void OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader); + void OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch>& batch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader); TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context); diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h index 2583693146..1e5b27f8a5 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/read_context.h @@ -17,6 +17,10 @@ private: YDB_READONLY(ui32, ShardsCount, 0); YDB_READONLY_DEF(std::vector<std::string>, ColumnNames); public: + TString DebugString() const { + return TStringBuilder() << "shards_count:" << ShardsCount << ";columns=" << JoinSeq(",", ColumnNames) << ";"; + } + TComputeShardingPolicy() = default; bool DeserializeFromProto(const NKikimrTxDataShard::TComputeShardingPolicy& policy) { ShardsCount = policy.GetShardsCount(); |