diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-30 16:54:08 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-30 16:54:08 +0300 |
commit | 0f76e6d63e6c15b24f4127cd3e3196e67c79ff09 (patch) | |
tree | a465bc2c5ca805f593d123467f69385ba9163f1c | |
parent | 0bce9a145edc59b530712f3d84f057d9a1f8f1c3 (diff) | |
download | ydb-0f76e6d63e6c15b24f4127cd3e3196e67c79ff09.tar.gz |
sorting as separated module for control fetching process
35 files changed, 1056 insertions, 512 deletions
diff --git a/ydb/core/formats/arrow_filter.cpp b/ydb/core/formats/arrow_filter.cpp index 28fe9892eaf..014c032ec74 100644 --- a/ydb/core/formats/arrow_filter.cpp +++ b/ydb/core/formats/arrow_filter.cpp @@ -132,7 +132,7 @@ void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatc } } -std::shared_ptr<arrow::BooleanArray> TColumnFilter::MakeFilter() const { +std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter() const { arrow::BooleanBuilder builder; auto res = builder.Reserve(Count); Y_VERIFY_OK(res); @@ -320,7 +320,7 @@ bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) { if (IsTotalAllowFilter()) { return true; } - auto res = arrow::compute::Filter(batch, MakeFilter()); + auto res = arrow::compute::Filter(batch, BuildArrowFilter()); Y_VERIFY_S(res.ok(), res.status().message()); Y_VERIFY((*res).kind() == arrow::Datum::RECORD_BATCH); batch = (*res).record_batch(); @@ -438,7 +438,7 @@ ui32 TColumnFilter::GetInactiveHeadSize() const { } } -std::vector<bool> TColumnFilter::BuildFilter() const { +std::vector<bool> TColumnFilter::BuildSimpleFilter() const { std::vector<bool> result; result.reserve(Count); bool currentValue = GetStartValue(); diff --git a/ydb/core/formats/arrow_filter.h b/ydb/core/formats/arrow_filter.h index 43aaed58d09..a119ed59b1a 100644 --- a/ydb/core/formats/arrow_filter.h +++ b/ydb/core/formats/arrow_filter.h @@ -116,11 +116,11 @@ public: void CutInactiveHead(); - std::vector<bool> BuildFilter() const; + std::vector<bool> BuildSimpleFilter() const; TColumnFilter() = default; - std::shared_ptr<arrow::BooleanArray> MakeFilter() const; + std::shared_ptr<arrow::BooleanArray> BuildArrowFilter() const; bool IsTotalAllowFilter() const; diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index c1f2165b1d7..64f1f0f6c7b 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -708,9 +708,13 @@ bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<a } bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) { + return ScalarCompare(x, y) < 0; +} + +int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y) { Y_VERIFY_S(x.type->Equals(y.type), x.type->ToString() + " vs " + y.type->ToString()); - return SwitchType(x.type->id(), [&](const auto& type) { + return SwitchTypeImpl<int, 0>(x.type->id(), [&](const auto& type) { using TWrap = std::decay_t<decltype(type)>; using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType; using TValue = std::decay_t<decltype(static_cast<const TScalar&>(x).value)>; @@ -722,18 +726,36 @@ bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) { Y_VERIFY(yval); TStringBuf xBuf(reinterpret_cast<const char*>(xval->data()), xval->size()); TStringBuf yBuf(reinterpret_cast<const char*>(yval->data()), yval->size()); - return xBuf < yBuf; + if (xBuf < yBuf) { + return -1; + } else if (yBuf < xBuf) { + return 1; + } else { + return 0; + } } if constexpr (std::is_arithmetic_v<TValue>) { const auto& xval = static_cast<const TScalar&>(x).value; const auto& yval = static_cast<const TScalar&>(y).value; - return xval < yval; + if (xval < yval) { + return -1; + } else if (yval < xval) { + return 1; + } else { + return 0; + } } Y_VERIFY(false); // TODO: non primitive types - return false; + return 0; }); } +int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) { + Y_VERIFY(x); + Y_VERIFY(y); + return ScalarCompare(*x, *y); +} + std::shared_ptr<arrow::UInt64Array> MakePermutation(int size, bool reverse) { if (size < 1) { return {}; diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index d96fd11b0a9..97b381e98eb 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -130,6 +130,8 @@ std::pair<int, int> FindMinMaxPosition(const std::shared_ptr<arrow::Array>& colu std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type); std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position); bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x); +int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y); +int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y); bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y); bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y); diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp index 35760b7ab9b..e82fdbf8899 100644 --- a/ydb/core/formats/program.cpp +++ b/ydb/core/formats/program.cpp @@ -659,7 +659,7 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { } } - auto filter = bits.MakeFilter(); + auto filter = bits.BuildArrowFilter(); for (int64_t i = 0; i < batch.Schema->num_fields(); ++i) { bool needed = (allColumns || neededColumns.contains(batch.Schema->field(i)->name())); if (batch.Datums[i].is_array() && needed) { diff --git a/ydb/core/formats/switch_type.h b/ydb/core/formats/switch_type.h index 5c78dc2014f..558589b71ad 100644 --- a/ydb/core/formats/switch_type.h +++ b/ydb/core/formats/switch_type.h @@ -10,8 +10,8 @@ struct TTypeWrapper using T = TType; }; -template <typename TFunc, bool EnableNull = false> -bool SwitchType(arrow::Type::type typeId, TFunc&& f) { +template <class TResult, TResult defaultValue, typename TFunc, bool EnableNull = false> +TResult SwitchTypeImpl(arrow::Type::type typeId, TFunc&& f) { switch (typeId) { case arrow::Type::NA: { if constexpr (EnableNull) { @@ -84,7 +84,12 @@ bool SwitchType(arrow::Type::type typeId, TFunc&& f) { break; } - return false; + return defaultValue; +} + +template <typename TFunc, bool EnableNull = false> +bool SwitchType(arrow::Type::type typeId, TFunc&& f) { + return SwitchTypeImpl<bool, false, TFunc, EnableNull>(typeId, std::move(f)); } template <typename TFunc> diff --git a/ydb/core/formats/ut_arrow.cpp b/ydb/core/formats/ut_arrow.cpp index 020c0e5a2dd..5efa2fd9e11 100644 --- a/ydb/core/formats/ut_arrow.cpp +++ b/ydb/core/formats/ut_arrow.cpp @@ -632,10 +632,10 @@ Y_UNIT_TEST_SUITE(ArrowTest) { const NArrow::TColumnFilter gt = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER); const NArrow::TColumnFilter ge = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER_OR_EQUAL); - UNIT_ASSERT(CheckFilter(lt.BuildFilter(), 234, true)); - UNIT_ASSERT(CheckFilter(le.BuildFilter(), 235, true)); - UNIT_ASSERT(CheckFilter(gt.BuildFilter(), 235, false)); - UNIT_ASSERT(CheckFilter(ge.BuildFilter(), 234, false)); + UNIT_ASSERT(CheckFilter(lt.BuildSimpleFilter(), 234, true)); + UNIT_ASSERT(CheckFilter(le.BuildSimpleFilter(), 235, true)); + UNIT_ASSERT(CheckFilter(gt.BuildSimpleFilter(), 235, false)); + UNIT_ASSERT(CheckFilter(ge.BuildSimpleFilter(), 234, false)); } Y_UNIT_TEST(SortWithCompositeKey) { diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 6e562d290b3..45edb94bd0d 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NColumnShard { TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters) : ReadMetadata(readMetadata) - , IndexedData(ReadMetadata, FetchBlobsQueue, false, scanCounters) + , IndexedData(ReadMetadata, FetchBlobsQueue, false, scanCounters, processor) , DataTasksProcessor(processor) , ScanCounters(scanCounters) { @@ -21,7 +21,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP auto& blobId = cmtBlob.BlobId; FetchBlobsQueue.emplace_back(TBlobRange(blobId, 0, blobId.BlobSize())); } - IndexedData.InitRead(batchNo, true); + IndexedData.InitRead(batchNo); // Add cached batches without read for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 }); @@ -42,7 +42,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data) { const auto& blobId = blobRange.BlobId; if (IndexedData.IsIndexedBlob(blobRange)) { - IndexedData.AddIndexed(blobRange, data, DataTasksProcessor); + IndexedData.AddIndexed(blobRange, data); } else { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 }); if (cmt.empty()) { @@ -115,7 +115,7 @@ void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) { if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped()) { return; } - Y_VERIFY(task->Apply(IndexedData)); + Y_VERIFY(task->Apply(IndexedData.GetGranulesContext())); } } diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 54fe14916f2..3353ad31262 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -2,7 +2,20 @@ #include "blob_cache.h" #include "engines/reader/conveyor_task.h" -#include "engines/indexed_read_data.h" +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NOlap { +// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation +struct TPartialReadResult { + std::shared_ptr<arrow::RecordBatch> ResultBatch; + + // This 1-row batch contains the last key that was read while producing the ResultBatch. + // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit + std::shared_ptr<arrow::RecordBatch> LastReadKey; + + std::string ErrorString; +}; +} namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index bce22aa077f..81504523507 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -2,6 +2,8 @@ #include "columnshard__scan.h" #include "columnshard_common.h" +#include "engines/reader/read_metadata.h" + #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/sys_view/common/schema.h> #include <ydb/core/formats/custom_registry.h> diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h index 8ad4b38abda..dabf291aca7 100644 --- a/ydb/core/tx/columnshard/engines/columns_table.h +++ b/ydb/core/tx/columnshard/engines/columns_table.h @@ -19,8 +19,8 @@ struct TColumnRecord { TBlobRange BlobRange; TString Metadata; - ui32 GetRowsCount() const { - return 0; + std::optional<ui32> GetChunkRowsCount() const { + return {}; } bool operator == (const TColumnRecord& rec) const { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 2fa4c309227..4f4b18f5045 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -113,71 +113,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v } -std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const { - return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters); -} - -std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { - std::set<ui32> result; - if (LessPredicate) { - for (auto&& i : LessPredicate->ColumnNames()) { - result.emplace(IndexInfo.GetColumnId(i)); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); - } - } - if (GreaterPredicate) { - for (auto&& i : GreaterPredicate->ColumnNames()) { - result.emplace(IndexInfo.GetColumnId(i)); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); - } - } - if (Program) { - for (auto&& i : Program->GetEarlyFilterColumns()) { - auto id = IndexInfo.GetColumnIdOptional(i); - if (id) { - result.emplace(*id); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); - } - } - } - if (PlanStep) { - auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); - for (auto&& i : snapSchema->fields()) { - result.emplace(IndexInfo.GetColumnId(i->name())); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name()); - } - } - return result; -} - -std::set<ui32> TReadMetadata::GetUsedColumnIds() const { - std::set<ui32> result; - if (PlanStep) { - auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); - for (auto&& i : snapSchema->fields()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name()); - result.emplace(IndexInfo.GetColumnId(i->name())); - } - } - for (auto&& f : LoadSchema->fields()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name()); - result.emplace(IndexInfo.GetColumnId(f->name())); - } - return result; -} - -TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetResultYqlSchema() const { - return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, ResultColumnIds); -} - -TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYqlSchema() const { - return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns); -} - -std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TScanCounters& /*scanCounters*/) const { - return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this()); -} - void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { Y_VERIFY(IndexedBlobs.emplace(range).second); Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second); @@ -192,20 +127,7 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader:: } } -bool TIndexedReadData::PredictManyResultsAfterFilter(const TPortionInfo& portionInfo) const { - if (!portionInfo.AllowEarlyFilter()) { - return true; - } - if (EarlyFilterColumns.empty()) { - return true; - } - if (TIndexInfo::IsSpecialColumns(EarlyFilterColumns)) { - return true; - } - return false; -} - -void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) { +void TIndexedReadData::InitRead(ui32 inputBatch) { Y_VERIFY(ReadMetadata->BlobSchema); Y_VERIFY(ReadMetadata->LoadSchema); Y_VERIFY(ReadMetadata->ResultSchema); @@ -217,38 +139,18 @@ void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) { NotIndexed.resize(inputBatch); ui32 batchNo = inputBatch; - Batches.resize(inputBatch + ReadMetadata->SelectInfo->Portions.size(), nullptr); - + Y_VERIFY(!GranulesContext); + GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode, inputBatch + ReadMetadata->SelectInfo->Portions.size()); ui64 portionsBytes = 0; for (auto& portionInfo : ReadMetadata->SelectInfo->Portions) { portionsBytes += portionInfo.BlobsBytes(); - Y_VERIFY_S(portionInfo.Records.size() > 0, "ReadMeatadata: " << *ReadMetadata); - - ui64 granule = portionInfo.Records[0].Granule; + Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata); - auto itGranule = Granules.find(granule); - if (itGranule == Granules.end()) { - itGranule = Granules.emplace(granule, NIndexedReader::TGranule(granule, *this)).first; - } - - NIndexedReader::TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo); - if (!OnePhaseReadMode && !PredictManyResultsAfterFilter(portionInfo)) { - currentBatch.ResetNoFilter(&EarlyFilterColumns); - } else { - currentBatch.ResetNoFilter(&UsedColumns); - } - Batches[batchNo] = ¤tBatch; - ++batchNo; + NIndexedReader::TGranule& granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule); + granule.AddBatch(batchNo++, portionInfo); } + GranulesContext->PrepareForStart(); - auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); - for (ui64 granule : granulesOrder) { - auto it = Granules.find(granule); - Y_VERIFY(it != Granules.end()); - if (inGranulesOrder) { - GranulesOutOrder.emplace_back(&it->second); - } - } Counters.GetPortionBytes()->Add(portionsBytes); auto& stats = ReadMetadata->ReadStats; stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size(); @@ -256,12 +158,13 @@ void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) { stats->IndexBatches = ReadMetadata->NumIndexedBlobs(); stats->CommittedBatches = ReadMetadata->CommittedBlobs.size(); stats->SchemaColumns = ReadMetadata->LoadSchema->num_fields(); - stats->FilterColumns = EarlyFilterColumns.size(); - stats->AdditionalColumns = PostFilterColumns.size(); + stats->FilterColumns = GranulesContext->GetEarlyFilterColumns().size(); + stats->AdditionalColumns = GranulesContext->GetPostFilterColumns().size(); stats->PortionsBytes = portionsBytes; } -void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data, NColumnShard::TDataTasksProcessorContainer processor) { +void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data) { + Y_VERIFY(GranulesContext); NIndexedReader::TBatch* portionBatch = nullptr; { auto it = IndexedBlobSubscriber.find(blobRange); @@ -276,8 +179,8 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da return; } if (portionBatch->IsFetchingReady()) { - if (auto batch = portionBatch->AssembleTask(processor.GetObject(), ReadMetadata)) { - processor.Add(*this, batch); + if (auto batch = portionBatch->AssembleTask(TasksProcessor.GetObject(), ReadMetadata)) { + TasksProcessor.Add(*GranulesContext, batch); } } } @@ -336,7 +239,8 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR } // Extract ready to out granules: ready granules that are not blocked by other (not ready) granules - const bool requireResult = !IsInProgress(); // not indexed or the last indexed read (even if it's empty) + Y_VERIFY(GranulesContext); + const bool requireResult = !GranulesContext->IsInProgress(); // not indexed or the last indexed read (even if it's empty) auto out = MakeResult(ReadyToOut(), maxRowsInBatch); if (requireResult && out.empty()) { out.push_back(TPartialReadResult{ @@ -346,36 +250,14 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR return out; } -std::vector<NIndexedReader::TGranule*> TIndexedReadData::DetachReadyInOrder() { - std::vector<NIndexedReader::TGranule*> out; - out.reserve(GranulesToOut.size()); - - if (GranulesOutOrder.empty()) { - for (auto& [_, granule] : GranulesToOut) { - out.emplace_back(granule); - } - GranulesToOut.clear(); - } else { - while (GranulesOutOrder.size()) { - NIndexedReader::TGranule* granule = GranulesOutOrder.front(); - if (!granule->IsReady()) { - break; - } - out.emplace_back(granule); - Y_VERIFY(GranulesToOut.erase(granule->GetGranuleId())); - GranulesOutOrder.pop_front(); - } - } - - return out; -} - /// @return batches that are not blocked by others std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::ReadyToOut() { Y_VERIFY(SortReplaceDescription); + Y_VERIFY(GranulesContext); + std::vector<NIndexedReader::TGranule*> ready = GranulesContext->DetachReadyInOrder(); std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out; - out.reserve(GranulesToOut.size() + 1); + out.reserve(ready.size() + 1); // Prepend not indexed data (less then first granule) before granules for ASC sorting if (ReadMetadata->IsAscSorted() && OutNotIndexed.count(0)) { @@ -384,13 +266,13 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: OutNotIndexed.erase(0); } - std::vector<NIndexedReader::TGranule*> ready = DetachReadyInOrder(); for (auto&& granule : ready) { bool canHaveDups = granule->IsDuplicationsAvailable(); std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = granule->GetReadyBatches(); // Append not indexed data to granules - if (OutNotIndexed.count(granule->GetGranuleId())) { - auto batch = OutNotIndexed[granule->GetGranuleId()]; + auto itNotIndexed = OutNotIndexed.find(granule->GetGranuleId()); + if (itNotIndexed != OutNotIndexed.end()) { + auto batch = itNotIndexed->second; if (batch && batch->num_rows()) { // TODO: check why it could be empty inGranule.push_back(batch); canHaveDups = true; @@ -420,7 +302,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: } // Append not indexed data (less then first granule) after granules for DESC sorting - if (ReadMetadata->IsDescSorted() && GranulesOutOrder.empty() && OutNotIndexed.count(0)) { + if (GranulesContext->GetSortingPolicy()->ReadyForAddNotIndexedToEnd() && OutNotIndexed.count(0)) { out.push_back({}); out.back().push_back(OutNotIndexed[0]); OutNotIndexed.erase(0); @@ -571,26 +453,14 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco return out; } -NIndexedReader::TBatch& TIndexedReadData::GetBatchInfo(const ui32 batchNo) { - Y_VERIFY(batchNo < Batches.size()); - auto ptr = Batches[batchNo]; - Y_VERIFY(ptr); - return *ptr; -} - TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, - const bool internalRead, const NColumnShard::TScanCounters& counters) + const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor) : Counters(counters) + , TasksProcessor(tasksProcessor) , FetchBlobsQueue(fetchBlobsQueue) , ReadMetadata(readMetadata) , OnePhaseReadMode(internalRead) { - UsedColumns = ReadMetadata->GetUsedColumnIds(); - PostFilterColumns = ReadMetadata->GetUsedColumnIds(); - EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(); - for (auto&& i : EarlyFilterColumns) { - PostFilterColumns.erase(i); - } Y_VERIFY(ReadMetadata->SelectInfo); } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index b33355b1296..e8dba569740 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -5,6 +5,8 @@ #include "reader/queue.h" #include "reader/granule.h" #include "reader/batch.h" +#include "reader/filling_context.h" +#include "reader/read_metadata.h" #include <ydb/library/accessor/accessor.h> #include <ydb/core/tx/columnshard/counters.h> @@ -15,231 +17,43 @@ class TScanIteratorBase; namespace NKikimr::NOlap { -struct TReadStats { - TInstant BeginTimestamp; - ui32 SelectedIndex{0}; - ui64 IndexGranules{0}; - ui64 IndexPortions{0}; - ui64 IndexBatches{0}; - ui64 CommittedBatches{0}; - ui64 PortionsBytes{ 0 }; - ui64 DataFilterBytes{ 0 }; - ui64 DataAdditionalBytes{ 0 }; - - ui32 SchemaColumns = 0; - ui32 FilterColumns = 0; - ui32 AdditionalColumns = 0; - - ui32 SelectedRows = 0; - - TReadStats(ui32 indexNo) - : BeginTimestamp(TInstant::Now()) - , SelectedIndex(indexNo) - {} - - void PrintToLog() { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) - ("event", "statistic") - ("begin", BeginTimestamp) - ("selected", SelectedIndex) - ("index_granules", IndexGranules) - ("index_portions", IndexPortions) - ("index_batches", IndexBatches) - ("committed_batches", CommittedBatches) - ("schema_columns", SchemaColumns) - ("filter_columns", FilterColumns) - ("additional_columns", AdditionalColumns) - ("portions_bytes", PortionsBytes) - ("data_filter_bytes", DataFilterBytes) - ("data_additional_bytes", DataAdditionalBytes) - ("delta_bytes", PortionsBytes - DataFilterBytes - DataAdditionalBytes) - ("selected_rows", SelectedRows) - ; - } - - TDuration Duration() { - return TInstant::Now() - BeginTimestamp; - } -}; - -// Holds all metadata that is needed to perform read/scan -struct TReadMetadataBase { - using TConstPtr = std::shared_ptr<const TReadMetadataBase>; - - enum class ESorting { - NONE = 0, - ASC, - DESC, - }; - - virtual ~TReadMetadataBase() = default; - - std::shared_ptr<NOlap::TPredicate> LessPredicate; - std::shared_ptr<NOlap::TPredicate> GreaterPredicate; - std::shared_ptr<arrow::Schema> BlobSchema; - std::shared_ptr<arrow::Schema> LoadSchema; // ResultSchema + required for intermediate operations - std::shared_ptr<arrow::Schema> ResultSchema; // TODO: add Program modifications - std::shared_ptr<NSsa::TProgram> Program; - std::shared_ptr<const THashSet<TUnifiedBlobId>> ExternBlobs; - ESorting Sorting{ESorting::ASC}; // Sorting inside returned batches - ui64 Limit{0}; // TODO - - bool IsAscSorted() const { return Sorting == ESorting::ASC; } - bool IsDescSorted() const { return Sorting == ESorting::DESC; } - bool IsSorted() const { return IsAscSorted() || IsDescSorted(); } - void SetDescSorting() { Sorting = ESorting::DESC; } - - virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0; - virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0; - virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const = 0; - virtual void Dump(IOutputStream& out) const { Y_UNUSED(out); }; - - // TODO: can this only be done for base class? - friend IOutputStream& operator << (IOutputStream& out, const TReadMetadataBase& meta) { - meta.Dump(out); - return out; - } -}; - -// Holds all metadata that is needed to perform read/scan -struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadMetadata> { - using TConstPtr = std::shared_ptr<const TReadMetadata>; - - TIndexInfo IndexInfo; - ui64 PlanStep = 0; - ui64 TxId = 0; - std::shared_ptr<TSelectInfo> SelectInfo; - std::vector<TCommittedBlob> CommittedBlobs; - THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches; - std::shared_ptr<TReadStats> ReadStats; - - TReadMetadata(const TIndexInfo& info) - : IndexInfo(info) - , ReadStats(std::make_shared<TReadStats>(info.GetId())) - {} - - std::vector<std::string> GetColumnsOrder() const { - std::vector<std::string> result; - for (auto&& i : LoadSchema->fields()) { - result.emplace_back(i->name()); - } - return result; - } - - std::set<ui32> GetEarlyFilterColumnIds() const; - std::set<ui32> GetUsedColumnIds() const; - - bool Empty() const { - Y_VERIFY(SelectInfo); - return SelectInfo->Portions.empty() && CommittedBlobs.empty(); - } - - std::shared_ptr<arrow::Schema> GetSortingKey() const { - return IndexInfo.GetSortingKey(); - } - - std::shared_ptr<arrow::Schema> GetReplaceKey() const { - return IndexInfo.GetReplaceKey(); - } - - TVector<TNameTypeInfo> GetResultYqlSchema() const override { - TVector<NTable::TTag> columnIds; - columnIds.reserve(ResultSchema->num_fields()); - for (const auto& field: ResultSchema->fields()) { - TString name = TStringBuilder() << field->name(); - columnIds.emplace_back(IndexInfo.GetColumnId(name)); - } - return IndexInfo.GetColumns(columnIds); - } - - TVector<TNameTypeInfo> GetKeyYqlSchema() const override { - return IndexInfo.GetPrimaryKey(); - } - - size_t NumIndexedRecords() const { - Y_VERIFY(SelectInfo); - return SelectInfo->NumRecords(); - } - - size_t NumIndexedBlobs() const { - Y_VERIFY(SelectInfo); - return SelectInfo->Stats().Blobs; - } - - std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; - - void Dump(IOutputStream& out) const override { - out << "columns: " << (LoadSchema ? LoadSchema->num_fields() : 0) - << " index records: " << NumIndexedRecords() - << " index blobs: " << NumIndexedBlobs() - << " committed blobs: " << CommittedBlobs.size() - << " with program steps: " << (Program ? Program->Steps.size() : 0) - << (Sorting == ESorting::NONE ? " not" : (Sorting == ESorting::ASC ? " asc" : " desc")) - << " sorted, at snapshot: " << PlanStep << ":" << TxId; - if (GreaterPredicate) { - out << " from{" << *GreaterPredicate << "}"; - } - if (LessPredicate) { - out << " to{" << *LessPredicate << "}"; - } - if (SelectInfo) { - out << ", " << *SelectInfo; - } - } - - friend IOutputStream& operator << (IOutputStream& out, const TReadMetadata& meta) { - meta.Dump(out); - return out; - } -}; - -struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadStatsMetadata> { - using TConstPtr = std::shared_ptr<const TReadStatsMetadata>; - - const ui64 TabletId; - TVector<ui32> ReadColumnIds; - TVector<ui32> ResultColumnIds; - THashMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats; - - explicit TReadStatsMetadata(ui64 tabletId) - : TabletId(tabletId) - {} - - TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const override; - - TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override; - - std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; -}; - -// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation -struct TPartialReadResult { - std::shared_ptr<arrow::RecordBatch> ResultBatch; - - // This 1-row batch contains the last key that was read while producing the ResultBatch. - // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit - std::shared_ptr<arrow::RecordBatch> LastReadKey; - - std::string ErrorString; -}; - class TIndexedReadData { private: - YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns); - YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns); - std::set<ui32> UsedColumns; - bool AbortedFlag = false; + std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext; + YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters); - std::vector<NIndexedReader::TBatch*> Batches; + YDB_READONLY_DEF(NColumnShard::TDataTasksProcessorContainer, TasksProcessor); TFetchBlobsQueue& FetchBlobsQueue; - bool PredictManyResultsAfterFilter(const TPortionInfo& portionInfo) const; + NOlap::TReadMetadata::TConstPtr ReadMetadata; + bool OnePhaseReadMode = false; + std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; + + THashSet<const void*> BatchesToDedup; + THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch + THashSet<TBlobRange> IndexedBlobs; + ui32 ReadyNotIndexed{ 0 }; + THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append + std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; + public: - TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, const bool internalRead, const NColumnShard::TScanCounters& counters); + TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, + const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor); - NIndexedReader::TBatch& GetBatchInfo(const ui32 batchNo); + NIndexedReader::TGranulesFillingContext& GetGranulesContext() { + Y_VERIFY(GranulesContext); + return *GranulesContext; + } /// Initial FetchBlobsQueue filling (queue from external scan iterator). Granules could be read independently - void InitRead(ui32 numNotIndexed, bool inGranulesOrder = false); + void InitRead(ui32 numNotIndexed); + void Abort() { + Y_VERIFY(GranulesContext); + return GranulesContext->Abort(); + } + bool IsInProgress() const { + Y_VERIFY(GranulesContext); + return GranulesContext->IsInProgress(); + } /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); @@ -257,20 +71,10 @@ public: NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId); } - void AddIndexed(const TBlobRange& blobRange, const TString& column, NColumnShard::TDataTasksProcessorContainer processor); - bool IsInProgress() const { return Granules.size() > ReadyGranulesAccumulator.size(); } + void AddIndexed(const TBlobRange& blobRange, const TString& column); bool IsIndexedBlob(const TBlobRange& blobRange) const { return IndexedBlobs.contains(blobRange); } - void Abort() { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort"); - for (auto&& i : Granules) { - ReadyGranulesAccumulator.emplace(i.first); - } - AbortedFlag = true; - Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size()); - Y_VERIFY(!IsInProgress()); - } NOlap::TReadMetadata::TConstPtr GetReadMetadata() const { return ReadMetadata; } @@ -288,30 +92,7 @@ public: } } - void OnGranuleReady(NIndexedReader::TGranule& granule) { - Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second); - Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag); - } - private: - NOlap::TReadMetadata::TConstPtr ReadMetadata; - bool OnePhaseReadMode = false; - - std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; - - THashSet<const void*> BatchesToDedup; - THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch - THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut; - std::set<ui64> ReadyGranulesAccumulator; - THashSet<TBlobRange> IndexedBlobs; - ui32 ReadyNotIndexed{0}; - THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append - THashMap<ui64, NIndexedReader::TGranule> Granules; - TDeque<NIndexedReader::TGranule*> GranulesOutOrder; - std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; - - std::vector<NIndexedReader::TGranule*> DetachReadyInOrder(); - const TIndexInfo& IndexInfo() const { return ReadMetadata->IndexInfo; } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index ed6a1880552..e3cd2e4244d 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -62,6 +62,11 @@ struct TPortionInfo { bool CanIntersectOthers() const { return !Valid() || IsInserted(); } size_t NumRecords() const { return Records.size(); } + bool IsSortableInGranule() const { + return Meta.Produced == TPortionMeta::COMPACTED + || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; + } + bool AllowEarlyFilter() const { return Meta.Produced == TPortionMeta::COMPACTED || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; @@ -202,6 +207,51 @@ struct TPortionInfo { } return Meta.ColumnMeta.find(columnId)->second.HasMinMax(); } +private: + class TMinGetter { + public: + static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) { + return portionInfo.MinValue(columnId); + } + }; + + class TMaxGetter { + public: + static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) { + return portionInfo.MaxValue(columnId); + } + }; + + template <class TSelfGetter, class TItemGetter = TSelfGetter> + int CompareByColumnIdsImpl(const TPortionInfo& item, const TVector<ui32>& columnIds) const { + for (auto&& i : columnIds) { + std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i); + std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i); + if (!!valueSelf && !!valueItem) { + const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem); + if (cmpResult) { + return cmpResult; + } + } else if (!!valueSelf) { + return 1; + } else if (!!valueItem) { + return -1; + } + } + return 0; + } +public: + int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { + return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns); + } + + int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { + return CompareMinByColumnIds(item, info.KeyColumns); + } + + int CompareMinByColumnIds(const TPortionInfo& item, const TVector<ui32>& columnIds) const { + return CompareByColumnIdsImpl<TMinGetter>(item, columnIds); + } class TAssembleBlobInfo { private: diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index 44908d898bf..ab853275e1b 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -25,4 +25,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index ba709a0cff0..e0b9dfc7be9 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -26,4 +26,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index ba709a0cff0..e0b9dfc7be9 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -26,4 +26,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index 44908d898bf..ab853275e1b 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -25,4 +25,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 3ac9a61cfe2..976dd31dac5 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -51,10 +51,10 @@ bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const { return true; } -ui64 TBatch::GetFetchBytes(const std::set<ui32>* columnIds) { +ui64 TBatch::GetFetchBytes(const std::set<ui32>& columnIds) { ui64 result = 0; for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { - if (columnIds && !columnIds->contains(rec.ColumnId)) { + if (!columnIds.contains(rec.ColumnId)) { continue; } Y_VERIFY(rec.Portion == Portion); @@ -65,15 +65,11 @@ ui64 TBatch::GetFetchBytes(const std::set<ui32>* columnIds) { return result; } -void TBatch::ResetCommon(const std::set<ui32>* columnIds) { - if (!columnIds) { - CurrentColumnIds.reset(); - } else { - CurrentColumnIds = *columnIds; - Y_VERIFY(CurrentColumnIds->size()); - for (auto&& i : *CurrentColumnIds) { - AskedColumnIds.emplace(i); - } +void TBatch::ResetCommon(const std::set<ui32>& columnIds) { + CurrentColumnIds = columnIds; + Y_VERIFY(CurrentColumnIds->size()); + for (auto&& i : *CurrentColumnIds) { + Y_VERIFY(AskedColumnIds.emplace(i).second); } Y_VERIFY(WaitIndexed.empty()); @@ -82,14 +78,13 @@ void TBatch::ResetCommon(const std::set<ui32>* columnIds) { FetchedBytes = 0; } -void TBatch::ResetNoFilter(const std::set<ui32>* columnIds) { +void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) { Y_VERIFY(!Filter); ResetCommon(columnIds); for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { continue; } - AskedColumnIds.emplace(rec.ColumnId); Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second); Owner->AddBlobForFetch(rec.BlobRange, *this); Y_VERIFY(rec.Portion == Portion); @@ -99,7 +94,7 @@ void TBatch::ResetNoFilter(const std::set<ui32>* columnIds) { } } -void TBatch::ResetWithFilter(const std::set<ui32>* columnIds) { +void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { Y_VERIFY(Filter); ResetCommon(columnIds); std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects; @@ -107,7 +102,6 @@ void TBatch::ResetWithFilter(const std::set<ui32>* columnIds) { if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { continue; } - AskedColumnIds.emplace(rec.ColumnId); orderedObjects[rec.ColumnId][rec.Chunk] = &rec; Y_VERIFY(rec.Valid()); Y_VERIFY(Portion == rec.Portion); @@ -122,29 +116,31 @@ void TBatch::ResetWithFilter(const std::set<ui32>* columnIds) { for (auto&& [chunk, rec] : columnInfo.second) { Y_VERIFY(!itFinished); Y_VERIFY(expected++ == chunk); - if (!rec->GetRowsCount()) { + if (!rec->GetChunkRowsCount()) { undefinedShift = true; } - if (!undefinedShift && it.IsBatchForSkip(rec->GetRowsCount())) { - Data.emplace(rec->BlobRange, TPortionInfo::TAssembleBlobInfo(rec->GetRowsCount())); + if (!undefinedShift && it.IsBatchForSkip(*rec->GetChunkRowsCount())) { + Data.emplace(rec->BlobRange, TPortionInfo::TAssembleBlobInfo(*rec->GetChunkRowsCount())); } else { Y_VERIFY(WaitIndexed.emplace(rec->BlobRange).second); Owner->AddBlobForFetch(rec->BlobRange, *this); WaitingBytes += rec->BlobRange.Size; } if (!undefinedShift) { - itFinished = !it.Next(rec->GetRowsCount()); + itFinished = !it.Next(*rec->GetChunkRowsCount()); } } } } -void TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch) { +bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, const ui32 originalRecordsCount) { Y_VERIFY(filter); Y_VERIFY(!Filter); Y_VERIFY(!FilterBatch); Filter = filter; FilterBatch = filterBatch; + OriginalRecordsCount = originalRecordsCount; + return Owner->OnFilterReady(*this); } void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) { @@ -164,4 +160,28 @@ bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) return true; } +bool TBatch::NeedAdditionalData() const { + if (!Filter) { + return true; + } + if (!FilteredBatch || !FilteredBatch->num_rows()) { + return false; + } + if (AskedColumnsAlready(Owner->GetOwner().GetPostFilterColumns())) { + return false; + } + return true; +} + +ui64 TBatch::GetUsefulBytes(const ui64 bytes) const { + if (!FilteredBatch || !FilteredBatch->num_rows()) { + return 0; + } + Y_VERIFY_DEBUG(OriginalRecordsCount); + if (!OriginalRecordsCount) { + return 0; + } + return bytes * FilteredBatch->num_rows() / OriginalRecordsCount; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 55f3417d265..4198c864543 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -29,6 +29,8 @@ private: YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch); YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch); YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); + ui32 OriginalRecordsCount = 0; + YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; TGranule* Owner; @@ -36,17 +38,42 @@ private: YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds); std::set<ui32> AskedColumnIds; - void ResetCommon(const std::set<ui32>* columnIds); + void ResetCommon(const std::set<ui32>& columnIds); + ui64 GetUsefulBytes(const ui64 bytes) const; + public: + ui64 GetUsefulWaitingBytes() const { + return GetUsefulBytes(WaitingBytes); + } + + ui64 GetUsefulFetchedBytes() const { + return GetUsefulBytes(FetchedBytes); + } + + bool NeedAdditionalData() const; + bool IsSortableInGranule() const { + return PortionInfo->IsSortableInGranule(); + } TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo); bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData); bool AskedColumnsAlready(const std::set<ui32>& columnIds) const; - void ResetNoFilter(const std::set<ui32>* columnIds); - void ResetWithFilter(const std::set<ui32>* columnIds); - ui64 GetFetchBytes(const std::set<ui32>* columnIds); + void ResetNoFilter(const std::set<ui32>& columnIds); + void ResetWithFilter(const std::set<ui32>& columnIds); + ui64 GetFetchBytes(const std::set<ui32>& columnIds); - void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch); + bool IsFiltered() const { + return !!Filter; + } + ui32 GetFilteredRecordsCount() const { + Y_VERIFY(IsFiltered()); + if (!FilterBatch) { + return 0; + } else { + return FilterBatch->num_rows(); + } + } + bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, const ui32 originalRecordsCount); void InitBatch(std::shared_ptr<arrow::RecordBatch> batch); NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata); diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp index c3738a9c663..a26e4daf1b9 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp @@ -12,7 +12,7 @@ bool IDataTasksProcessor::ITask::DoExecute() { } } -bool IDataTasksProcessor::ITask::Apply(NOlap::TIndexedReadData& indexedDataRead) const { +bool IDataTasksProcessor::ITask::Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const { if (OwnerOperator) { OwnerOperator->ReplyReceived(); if (OwnerOperator->IsStopped()) { @@ -38,12 +38,12 @@ bool IDataTasksProcessor::Add(ITask::TPtr task) { } -void TDataTasksProcessorContainer::Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task) { +void TDataTasksProcessorContainer::Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task) { if (Object) { Object->Add(task); } else { task->Execute(); - task->Apply(indexedDataRead); + task->Apply(context); } } diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h index 90a8f98a10d..fa0a8b1ce2c 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h @@ -2,8 +2,8 @@ #include <ydb/core/tx/conveyor/usage/abstract.h> #include <ydb/library/accessor/accessor.h> -namespace NKikimr::NOlap { -class TIndexedReadData; +namespace NKikimr::NOlap::NIndexedReader { +class TGranulesFillingContext; } namespace NKikimr::NColumnShard { @@ -23,7 +23,7 @@ public: YDB_READONLY_FLAG(DataProcessed, false); protected: TDataTasksProcessorContainer GetTasksProcessorContainer() const; - virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0; + virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const = 0; virtual bool DoExecuteImpl() = 0; virtual bool DoExecute() override final; @@ -34,7 +34,7 @@ public: } using TPtr = std::shared_ptr<ITask>; virtual ~ITask() = default; - bool Apply(NOlap::TIndexedReadData& indexedDataRead) const; + bool Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const; }; protected: virtual bool DoAdd(ITask::TPtr task) = 0; @@ -84,7 +84,7 @@ public: return Object && Object->IsStopped(); } - void Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task); + void Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task); }; } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp new file mode 100644 index 00000000000..83293a384d1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -0,0 +1,56 @@ +#include "filling_context.h" +#include "filling_context.h" +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> + +namespace NKikimr::NOlap::NIndexedReader { + +TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount) + : ReadMetadata(readMetadata) + , InternalReading(internalReading) + , Owner(owner) + , Counters(owner.GetCounters()) +{ + Batches.resize(batchesCount, nullptr); + SortingPolicy = InternalReading ? std::make_shared<TNonSorting>(ReadMetadata) : ReadMetadata->BuildSortingPolicy(); + + UsedColumns = ReadMetadata->GetUsedColumnIds(); + PostFilterColumns = ReadMetadata->GetUsedColumnIds(); + EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(); + for (auto&& i : EarlyFilterColumns) { + PostFilterColumns.erase(i); + } +} + +bool TGranulesFillingContext::PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const { + if (!portionInfo.AllowEarlyFilter()) { + return false; + } + if (EarlyFilterColumns.empty()) { + return false; + } + if (TIndexInfo::IsSpecialColumns(EarlyFilterColumns)) { + return false; + } + return true; +} + +void TGranulesFillingContext::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { + return Owner.AddBlobForFetch(range, batch); +} + +void TGranulesFillingContext::OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { + return Owner.OnBatchReady(batchInfo, batch); +} + +NKikimr::NOlap::NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfo(const ui32 batchNo) { + Y_VERIFY(batchNo < Batches.size()); + auto ptr = Batches[batchNo]; + Y_VERIFY(ptr); + return *ptr; +} + +NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const { + return Owner.GetTasksProcessor(); +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h new file mode 100644 index 00000000000..2b71adcb028 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -0,0 +1,91 @@ +#pragma once +#include "conveyor_task.h" +#include "granule.h" +#include "order_controller.h" +#include <util/generic/hash.h> + +namespace NKikimr::NOlap { +class TIndexedReadData; +} + +namespace NKikimr::NOlap::NIndexedReader { + +class TGranulesFillingContext { +private: + bool AbortedFlag = false; + YDB_READONLY_DEF(TReadMetadata::TConstPtr, ReadMetadata); + const bool InternalReading = false; + TIndexedReadData& Owner; + THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut; + std::set<ui64> ReadyGranulesAccumulator; + THashMap<ui64, NIndexedReader::TGranule> Granules; + YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns); + YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns); + std::set<ui32> UsedColumns; + YDB_READONLY_DEF(IOrderPolicy::TPtr, SortingPolicy); + YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters); + std::vector<NIndexedReader::TBatch*> Batches; + + bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const; + +public: + TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount); + + NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const; + + TBatch& GetBatchInfo(const ui32 batchNo); + + void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); + void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); + + NIndexedReader::TGranule& GetGranuleVerified(const ui64 granuleId) { + auto it = Granules.find(granuleId); + Y_VERIFY(it != Granules.end()); + return it->second; + } + + bool IsInProgress() const { return Granules.size() > ReadyGranulesAccumulator.size(); } + + void OnNewBatch(TBatch& batch) { + if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) { + batch.ResetNoFilter(EarlyFilterColumns); + } else { + batch.ResetNoFilter(UsedColumns); + } + Batches[batch.GetBatchNo()] = &batch; + } + + std::vector<TGranule*> DetachReadyInOrder() { + Y_VERIFY(SortingPolicy); + return SortingPolicy->DetachReadyGranules(GranulesToOut); + } + + void Abort() { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort"); + for (auto&& i : Granules) { + ReadyGranulesAccumulator.emplace(i.first); + } + AbortedFlag = true; + Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size()); + Y_VERIFY(!IsInProgress()); + } + + TGranule& UpsertGranule(const ui64 granuleId) { + auto itGranule = Granules.find(granuleId); + if (itGranule == Granules.end()) { + itGranule = Granules.emplace(granuleId, NIndexedReader::TGranule(granuleId, *this)).first; + } + return itGranule->second; + } + + void OnGranuleReady(TGranule& granule) { + Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second); + Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag); + } + + void PrepareForStart() { + SortingPolicy->Fill(*this); + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index 1ae809aa491..bff7044c5b7 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -46,49 +46,12 @@ bool TAssembleFilter::DoExecuteImpl() { return true; } -bool TAssembleFilter::DoApply(TIndexedReadData& owner) const { +bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const { TBatch& batch = owner.GetBatchInfo(BatchNo); Y_VERIFY(OriginalCount); owner.GetCounters().GetOriginalRowsCount()->Add(OriginalCount); - batch.InitFilter(Filter, FilteredBatch); owner.GetCounters().GetAssembleFilterCount()->Add(1); - if (!FilteredBatch || FilteredBatch->num_rows() == 0) { - owner.GetCounters().GetEmptyFilterCount()->Add(1); - owner.GetCounters().GetEmptyFilterFetchedBytes()->Add(batch.GetFetchedBytes()); - owner.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(&owner.GetPostFilterColumns())); - batch.InitBatch(FilteredBatch); - } else { - owner.GetCounters().GetFilteredRowsCount()->Add(FilteredBatch->num_rows()); - if (batch.AskedColumnsAlready(owner.GetPostFilterColumns())) { - owner.GetCounters().GetFilterOnlyCount()->Add(1); - owner.GetCounters().GetFilterOnlyFetchedBytes()->Add(batch.GetFetchedBytes()); - owner.GetCounters().GetFilterOnlyUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount); - owner.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(&owner.GetPostFilterColumns())); - - batch.InitBatch(FilteredBatch); - } else { - owner.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes()); - owner.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount); - - batch.ResetWithFilter(&owner.GetPostFilterColumns()); - if (batch.IsFetchingReady()) { - auto processor = GetTasksProcessorContainer(); - if (auto assembleBatchTask = batch.AssembleTask(processor.GetObject(), owner.GetReadMetadata())) { - processor.Add(owner, assembleBatchTask); - } - } - - owner.GetCounters().GetTwoPhasesCount()->Add(1); - owner.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes()); - owner.GetCounters().GetTwoPhasesPostFilterUsefulBytes()->Add(batch.GetWaitingBytes() * FilteredBatch->num_rows() / OriginalCount); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") - ("filtered_count", FilteredBatch->num_rows()) - ("blobs_count", batch.GetWaitingBlobs().size()) - ("columns_count", batch.GetCurrentColumnIds()->size()) - ("fetch_size", batch.GetWaitingBytes()) - ; - } - } + batch.InitFilter(Filter, FilteredBatch, OriginalCount); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h index 7ec1b9af0cc..c2f65c6d6dc 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h @@ -21,7 +21,7 @@ namespace NKikimr::NOlap::NIndexedReader { bool AllowEarlyFilter = false; std::set<ui32> FilterColumnIds; protected: - virtual bool DoApply(TIndexedReadData& owner) const override; + virtual bool DoApply(TGranulesFillingContext& owner) const override; virtual bool DoExecuteImpl() override; public: TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index e7c4d80f82e..7302e6add52 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -1,10 +1,16 @@ #include "granule.h" +#include "filling_context.h" #include <ydb/core/tx/columnshard/engines/portion_info.h> #include <ydb/core/tx/columnshard/engines/indexed_read_data.h> namespace NKikimr::NOlap::NIndexedReader { void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { + if (Owner->GetSortingPolicy()->CanInterrupt()) { + if (ReadyFlag) { + return; + } + } Y_VERIFY(!ReadyFlag); Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo())); if (batch && batch->num_rows()) { @@ -20,9 +26,10 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) { Y_VERIFY(!ReadyFlag); WaitBatches.emplace(batchNo); - auto infoEmplace = Batches.emplace(batchNo, TBatch(batchNo, *this, portionInfo)); - Y_VERIFY(infoEmplace.second); - return infoEmplace.first->second; + Batches.emplace_back(TBatch(batchNo, *this, portionInfo)); + Y_VERIFY(GranuleBatchNumbers.emplace(batchNo).second); + Owner->OnNewBatch(Batches.back()); + return Batches.back(); } void TGranule::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const { @@ -33,4 +40,44 @@ const std::set<ui32>& TGranule::GetEarlyFilterColumns() const { return Owner->GetEarlyFilterColumns(); } +bool TGranule::OnFilterReady(TBatch& batchInfo) { + if (ReadyFlag) { + return false; + } + return Owner->GetSortingPolicy()->OnFilterReady(batchInfo, *this, *Owner); +} + +std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata) { + std::deque<TBatch*> batches; + for (auto&& i : Batches) { + batches.emplace_back(&i); + } + const int reverseKff = reverse ? -1 : 0; + const auto pred = [reverseKff, readMetadata](const TBatch* l, const TBatch* r) { + if (l->IsSortableInGranule() && r->IsSortableInGranule()) { + return l->GetPortionInfo().CompareMinByPk(r->GetPortionInfo(), readMetadata->IndexInfo) * reverseKff < 0; + } else if (l->IsSortableInGranule()) { + return false; + } else if (r->IsSortableInGranule()) { + return true; + } else { + return false; + } + }; + std::sort(batches.begin(), batches.end(), pred); + bool nonCompactedSerial = true; + for (ui32 i = 0; i + 1 < batches.size(); ++i) { + if (batches[i]->IsSortableInGranule()) { + auto& l = *batches[i]; + auto& r = *batches[i + 1]; + Y_VERIFY(r.IsSortableInGranule()); + Y_VERIFY(l.GetPortionInfo().CompareSelfMaxItemMinByPk(r.GetPortionInfo(), readMetadata->IndexInfo) * reverseKff <= 0); + nonCompactedSerial = false; + } else { + Y_VERIFY(nonCompactedSerial); + } + } + return batches; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index 6c32ea426e0..b7400339d30 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -1,5 +1,6 @@ #pragma once #include "batch.h" +#include "read_metadata.h" #include <ydb/library/accessor/accessor.h> #include <ydb/core/tx/columnshard/engines/portion_info.h> @@ -8,24 +9,32 @@ namespace NKikimr::NOlap::NIndexedReader { +class TGranulesFillingContext; + class TGranule { private: YDB_READONLY(ui64, GranuleId, 0); YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, ReadyBatches); YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); YDB_READONLY_FLAG(Ready, false); - THashMap<ui32, TBatch> Batches; + std::deque<TBatch> Batches; std::set<ui32> WaitBatches; - TIndexedReadData* Owner = nullptr; + std::set<ui32> GranuleBatchNumbers; + TGranulesFillingContext* Owner = nullptr; public: - TGranule(const ui64 granuleId, TIndexedReadData& owner) + TGranule(const ui64 granuleId, TGranulesFillingContext& owner) : GranuleId(granuleId) , Owner(&owner) { + } + const TGranulesFillingContext& GetOwner() const { + return *Owner; } + std::deque<TBatch*> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata); const std::set<ui32>& GetEarlyFilterColumns() const; void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); + bool OnFilterReady(TBatch& batchInfo); TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const; diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp new file mode 100644 index 00000000000..1c667356728 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp @@ -0,0 +1,146 @@ +#include "order_controller.h" +#include "filling_context.h" + +namespace NKikimr::NOlap::NIndexedReader { + +void TAnySorting::DoFill(TGranulesFillingContext& context) { + auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); + for (ui64 granule : granulesOrder) { + TGranule& g = context.GetGranuleVerified(granule); + GranulesOutOrder.emplace_back(&g); + } +} + +std::vector<TGranule*> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + std::vector<TGranule*> result; + while (GranulesOutOrder.size()) { + NIndexedReader::TGranule* granule = GranulesOutOrder.front(); + if (!granule->IsReady()) { + break; + } + result.emplace_back(granule); + Y_VERIFY(granulesToOut.erase(granule->GetGranuleId())); + GranulesOutOrder.pop_front(); + } + return result; +} + +std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + std::vector<TGranule*> result; + result.reserve(granulesToOut.size()); + for (auto&& i : granulesToOut) { + result.emplace_back(i.second); + } + granulesToOut.clear(); + return result; +} + +bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) { + Y_VERIFY(ReadMetadata->Limit); + if (!CurrentItemsLimit) { + return false; + } + Y_VERIFY(GranulesOutOrderForPortions.size()); + if (granule.GetGranuleId() != GranulesOutOrderForPortions.front()->GetGranuleId()) { + return false; + } + while (GranulesOutOrderForPortions.size()) { + auto it = OrderedBatches.find(GranulesOutOrderForPortions.front()->GetGranuleId()); + Y_VERIFY(it != OrderedBatches.end()); + while (it->second.size() && it->second.front()->IsFiltered() && CurrentItemsLimit) { + auto b = it->second.front(); + if (b->IsSortableInGranule()) { + if (CurrentItemsLimit <= b->GetFilteredRecordsCount()) { + CurrentItemsLimit = 0; + } else { + CurrentItemsLimit -= b->GetFilteredRecordsCount(); + } + } else { + CurrentItemsLimit += b->GetFilteredRecordsCount(); + } + OnBatchFilterInitialized(*b, context); + + it->second.pop_front(); + } + if (!CurrentItemsLimit || it->second.empty()) { + while (it->second.size()) { + auto b = it->second.front(); + context.GetCounters().GetSkippedBytes()->Add(b->GetFetchBytes(context.GetPostFilterColumns())); + b->InitBatch(nullptr); + it->second.pop_front(); + } + OrderedBatches.erase(it); + GranulesOutOrderForPortions.pop_front(); + } else { + break; + } + } + return false; +} + +void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { + auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); + for (ui64 granule : granulesOrder) { + TGranule& g = context.GetGranuleVerified(granule); + GranulesOutOrder.emplace_back(&g); + Y_VERIFY(OrderedBatches.emplace(granule, g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata)).second); + } + GranulesOutOrderForPortions = GranulesOutOrder; +} + +std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + std::vector<TGranule*> result; + while (GranulesOutOrder.size()) { + NIndexedReader::TGranule* granule = GranulesOutOrder.front(); + if (!granule->IsReady()) { + break; + } + result.emplace_back(granule); + Y_VERIFY(granulesToOut.erase(granule->GetGranuleId())); + GranulesOutOrder.pop_front(); + } + return result; +} + +void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context) { + Y_VERIFY(!!batch.GetFilter()); + if (!batch.GetFilteredRecordsCount()) { + context.GetCounters().GetEmptyFilterCount()->Add(1); + context.GetCounters().GetEmptyFilterFetchedBytes()->Add(batch.GetFetchedBytes()); + context.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(context.GetPostFilterColumns())); + batch.InitBatch(nullptr); + } else { + context.GetCounters().GetFilteredRowsCount()->Add(batch.GetFilterBatch()->num_rows()); + if (batch.AskedColumnsAlready(context.GetPostFilterColumns())) { + context.GetCounters().GetFilterOnlyCount()->Add(1); + context.GetCounters().GetFilterOnlyFetchedBytes()->Add(batch.GetFetchedBytes()); + context.GetCounters().GetFilterOnlyUsefulBytes()->Add(batch.GetUsefulFetchedBytes()); + context.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(context.GetPostFilterColumns())); + + batch.InitBatch(batch.GetFilterBatch()); + } else { + context.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes()); + context.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetUsefulFetchedBytes()); + + batch.ResetWithFilter(context.GetPostFilterColumns()); + if (batch.IsFetchingReady()) { + auto processor = context.GetTasksProcessor(); + if (auto assembleBatchTask = batch.AssembleTask(processor.GetObject(), context.GetReadMetadata())) { + processor.Add(context, assembleBatchTask); + } + } + + context.GetCounters().GetTwoPhasesCount()->Add(1); + context.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes()); + context.GetCounters().GetTwoPhasesPostFilterUsefulBytes()->Add(batch.GetUsefulWaitingBytes()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") + ("filtered_count", batch.GetFilterBatch()->num_rows()) + ("blobs_count", batch.GetWaitingBlobs().size()) + ("columns_count", batch.GetCurrentColumnIds()->size()) + ("fetch_size", batch.GetWaitingBytes()) + ; + } + } +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h new file mode 100644 index 00000000000..ab14fef95ca --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h @@ -0,0 +1,112 @@ +#pragma once +#include "granule.h" +#include "read_metadata.h" + +namespace NKikimr::NOlap::NIndexedReader { + +class TGranulesFillingContext; + +class IOrderPolicy { +protected: + TReadMetadata::TConstPtr ReadMetadata; + virtual void DoFill(TGranulesFillingContext& context) = 0; + virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0; + virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) { + OnBatchFilterInitialized(batchInfo, context); + return true; + } + + void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context); +public: + using TPtr = std::shared_ptr<IOrderPolicy>; + virtual ~IOrderPolicy() = default; + + IOrderPolicy(TReadMetadata::TConstPtr readMetadata) + : ReadMetadata(readMetadata) + { + + } + + virtual bool CanInterrupt() const { + return false; + } + + bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) { + return DoOnFilterReady(batchInfo, granule, context); + } + + + virtual bool ReadyForAddNotIndexedToEnd() const = 0; + + std::vector<TGranule*> DetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { + return DoDetachReadyGranules(granulesToOut); + } + + void Fill(TGranulesFillingContext& context) { + DoFill(context); + } +}; + +class TNonSorting: public IOrderPolicy { +private: + using TBase = IOrderPolicy; +protected: + virtual void DoFill(TGranulesFillingContext& /*context*/) override { + } + + virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; +public: + TNonSorting(TReadMetadata::TConstPtr readMetadata) + :TBase(readMetadata) + { + + } + + virtual bool ReadyForAddNotIndexedToEnd() const override { + return true; + } +}; + +class TAnySorting: public IOrderPolicy { +private: + using TBase = IOrderPolicy; + std::deque<TGranule*> GranulesOutOrder; +protected: + virtual void DoFill(TGranulesFillingContext& context) override; + virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; +public: + TAnySorting(TReadMetadata::TConstPtr readMetadata) + :TBase(readMetadata) { + + } + virtual bool ReadyForAddNotIndexedToEnd() const override { + return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); + } +}; + +class TPKSortingWithLimit: public IOrderPolicy { +private: + using TBase = IOrderPolicy; + std::deque<TGranule*> GranulesOutOrder; + std::deque<TGranule*> GranulesOutOrderForPortions; + THashMap<ui64, std::deque<TBatch*>> OrderedBatches; + ui32 CurrentItemsLimit = 0; +protected: + virtual void DoFill(TGranulesFillingContext& context) override; + virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; + virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override; +public: + virtual bool CanInterrupt() const override { + return true; + } + + TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) + :TBase(readMetadata) { + CurrentItemsLimit = ReadMetadata->Limit; + } + virtual bool ReadyForAddNotIndexedToEnd() const override { + return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp index 45310d0d228..460dc1297d8 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp @@ -31,7 +31,7 @@ bool TAssembleBatch::DoExecuteImpl() { return true; } -bool TAssembleBatch::DoApply(TIndexedReadData& owner) const { +bool TAssembleBatch::DoApply(TGranulesFillingContext& owner) const { TBatch& batch = owner.GetBatchInfo(BatchNo); batch.InitBatch(FullBatch); return true; diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h index 616b079e016..494b55e10f5 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h @@ -20,7 +20,7 @@ private: const ui32 BatchNo; protected: - virtual bool DoApply(TIndexedReadData& owner) const override; + virtual bool DoApply(TGranulesFillingContext& owner) const override; virtual bool DoExecuteImpl() override; public: TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor, diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp new file mode 100644 index 00000000000..eff5afd970a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -0,0 +1,112 @@ +#include "read_metadata.h" +#include "order_controller.h" +#include <ydb/core/tx/columnshard/columnshard__index_scan.h> +#include <ydb/core/tx/columnshard/columnshard__stats_scan.h> + +namespace NKikimr::NOlap { + +std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const { + return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters); +} + +std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { + std::set<ui32> result; + if (LessPredicate) { + for (auto&& i : LessPredicate->ColumnNames()) { + result.emplace(IndexInfo.GetColumnId(i)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); + } + } + if (GreaterPredicate) { + for (auto&& i : GreaterPredicate->ColumnNames()) { + result.emplace(IndexInfo.GetColumnId(i)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); + } + } + if (Program) { + for (auto&& i : Program->GetEarlyFilterColumns()) { + auto id = IndexInfo.GetColumnIdOptional(i); + if (id) { + result.emplace(*id); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); + } + } + } + if (PlanStep) { + auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); + for (auto&& i : snapSchema->fields()) { + result.emplace(IndexInfo.GetColumnId(i->name())); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name()); + } + } + return result; +} + +std::set<ui32> TReadMetadata::GetUsedColumnIds() const { + std::set<ui32> result; + if (PlanStep) { + auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); + for (auto&& i : snapSchema->fields()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name()); + result.emplace(IndexInfo.GetColumnId(i->name())); + } + } + for (auto&& f : LoadSchema->fields()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name()); + result.emplace(IndexInfo.GetColumnId(f->name())); + } + return result; +} + +TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetResultYqlSchema() const { + return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, ResultColumnIds); +} + +TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYqlSchema() const { + return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns); +} + +std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TScanCounters& /*scanCounters*/) const { + return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this()); +} + +void TReadStats::PrintToLog() { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) + ("event", "statistic") + ("begin", BeginTimestamp) + ("selected", SelectedIndex) + ("index_granules", IndexGranules) + ("index_portions", IndexPortions) + ("index_batches", IndexBatches) + ("committed_batches", CommittedBatches) + ("schema_columns", SchemaColumns) + ("filter_columns", FilterColumns) + ("additional_columns", AdditionalColumns) + ("portions_bytes", PortionsBytes) + ("data_filter_bytes", DataFilterBytes) + ("data_additional_bytes", DataAdditionalBytes) + ("delta_bytes", PortionsBytes - DataFilterBytes - DataAdditionalBytes) + ("selected_rows", SelectedRows) + ; +} + +NIndexedReader::IOrderPolicy::TPtr TReadMetadata::BuildSortingPolicy() const { + if (Limit && Sorting != ESorting::NONE && IndexInfo.IsSorted() && IndexInfo.GetSortingKey()->num_fields()) { + ui32 idx = 0; + for (auto&& i : IndexInfo.GetPrimaryKey()) { + if (idx >= IndexInfo.GetSortingKey()->fields().size()) { + break; + } + if (IndexInfo.GetSortingKey()->fields()[idx]->name() != i.first) { + return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this()); + } + ++idx; + } + + return std::make_shared<NIndexedReader::TPKSortingWithLimit>(this->shared_from_this()); + } else { + return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this()); + } +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h new file mode 100644 index 00000000000..7ea2898dadc --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -0,0 +1,204 @@ +#pragma once +#include "conveyor_task.h" +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/counters.h> +#include <ydb/core/tx/columnshard/columnshard__scan.h> +#include <ydb/core/tx/columnshard/engines/predicate.h> +#include <ydb/core/tx/columnshard/engines/column_engine.h> +#include <ydb/core/scheme_types/scheme_type_info.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> + +namespace NKikimr::NColumnShard { +class TScanIteratorBase; +} + +namespace NKikimr::NOlap { + +namespace NIndexedReader { +class IOrderPolicy; +} + +struct TReadStats { + TInstant BeginTimestamp; + ui32 SelectedIndex{0}; + ui64 IndexGranules{0}; + ui64 IndexPortions{0}; + ui64 IndexBatches{0}; + ui64 CommittedBatches{0}; + ui64 PortionsBytes{ 0 }; + ui64 DataFilterBytes{ 0 }; + ui64 DataAdditionalBytes{ 0 }; + + ui32 SchemaColumns = 0; + ui32 FilterColumns = 0; + ui32 AdditionalColumns = 0; + + ui32 SelectedRows = 0; + + TReadStats(ui32 indexNo) + : BeginTimestamp(TInstant::Now()) + , SelectedIndex(indexNo) + {} + + void PrintToLog(); + + TDuration Duration() { + return TInstant::Now() - BeginTimestamp; + } +}; + +// Holds all metadata that is needed to perform read/scan +struct TReadMetadataBase { + using TConstPtr = std::shared_ptr<const TReadMetadataBase>; + + enum class ESorting { + NONE = 0, + ASC, + DESC, + }; + + virtual ~TReadMetadataBase() = default; + + std::shared_ptr<NOlap::TPredicate> LessPredicate; + std::shared_ptr<NOlap::TPredicate> GreaterPredicate; + std::shared_ptr<arrow::Schema> BlobSchema; + std::shared_ptr<arrow::Schema> LoadSchema; // ResultSchema + required for intermediate operations + std::shared_ptr<arrow::Schema> ResultSchema; // TODO: add Program modifications + std::shared_ptr<NSsa::TProgram> Program; + std::shared_ptr<const THashSet<TUnifiedBlobId>> ExternBlobs; + ESorting Sorting{ESorting::ASC}; // Sorting inside returned batches + ui64 Limit{0}; // TODO + + bool IsAscSorted() const { return Sorting == ESorting::ASC; } + bool IsDescSorted() const { return Sorting == ESorting::DESC; } + bool IsSorted() const { return IsAscSorted() || IsDescSorted(); } + void SetDescSorting() { Sorting = ESorting::DESC; } + + virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0; + virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0; + virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const = 0; + virtual void Dump(IOutputStream& out) const { Y_UNUSED(out); }; + + // TODO: can this only be done for base class? + friend IOutputStream& operator << (IOutputStream& out, const TReadMetadataBase& meta) { + meta.Dump(out); + return out; + } +}; + +// Holds all metadata that is needed to perform read/scan +struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadMetadata> { + using TConstPtr = std::shared_ptr<const TReadMetadata>; + + TIndexInfo IndexInfo; + ui64 PlanStep = 0; + ui64 TxId = 0; + std::shared_ptr<TSelectInfo> SelectInfo; + std::vector<TCommittedBlob> CommittedBlobs; + THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches; + std::shared_ptr<TReadStats> ReadStats; + + std::shared_ptr<NIndexedReader::IOrderPolicy> BuildSortingPolicy() const; + + TReadMetadata(const TIndexInfo& info) + : IndexInfo(info) + , ReadStats(std::make_shared<TReadStats>(info.GetId())) + {} + + std::vector<std::string> GetColumnsOrder() const { + std::vector<std::string> result; + for (auto&& i : LoadSchema->fields()) { + result.emplace_back(i->name()); + } + return result; + } + + std::set<ui32> GetEarlyFilterColumnIds() const; + std::set<ui32> GetUsedColumnIds() const; + + bool Empty() const { + Y_VERIFY(SelectInfo); + return SelectInfo->Portions.empty() && CommittedBlobs.empty(); + } + + std::shared_ptr<arrow::Schema> GetSortingKey() const { + return IndexInfo.GetSortingKey(); + } + + std::shared_ptr<arrow::Schema> GetReplaceKey() const { + return IndexInfo.GetReplaceKey(); + } + + TVector<TNameTypeInfo> GetResultYqlSchema() const override { + TVector<NTable::TTag> columnIds; + columnIds.reserve(ResultSchema->num_fields()); + for (const auto& field: ResultSchema->fields()) { + TString name = TStringBuilder() << field->name(); + columnIds.emplace_back(IndexInfo.GetColumnId(name)); + } + return IndexInfo.GetColumns(columnIds); + } + + TVector<TNameTypeInfo> GetKeyYqlSchema() const override { + return IndexInfo.GetPrimaryKey(); + } + + size_t NumIndexedRecords() const { + Y_VERIFY(SelectInfo); + return SelectInfo->NumRecords(); + } + + size_t NumIndexedBlobs() const { + Y_VERIFY(SelectInfo); + return SelectInfo->Stats().Blobs; + } + + std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; + + void Dump(IOutputStream& out) const override { + out << "columns: " << (LoadSchema ? LoadSchema->num_fields() : 0) + << " index records: " << NumIndexedRecords() + << " index blobs: " << NumIndexedBlobs() + << " committed blobs: " << CommittedBlobs.size() + << " with program steps: " << (Program ? Program->Steps.size() : 0) + << (Sorting == ESorting::NONE ? " not" : (Sorting == ESorting::ASC ? " asc" : " desc")) + << " sorted, at snapshot: " << PlanStep << ":" << TxId; + if (GreaterPredicate) { + out << " from{" << *GreaterPredicate << "}"; + } + if (LessPredicate) { + out << " to{" << *LessPredicate << "}"; + } + if (SelectInfo) { + out << ", " << *SelectInfo; + } + } + + friend IOutputStream& operator << (IOutputStream& out, const TReadMetadata& meta) { + meta.Dump(out); + return out; + } +}; + +struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadStatsMetadata> { + using TConstPtr = std::shared_ptr<const TReadStatsMetadata>; + + const ui64 TabletId; + TVector<ui32> ReadColumnIds; + TVector<ui32> ResultColumnIds; + THashMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats; + + explicit TReadStatsMetadata(ui64 tabletId) + : TabletId(tabletId) + {} + + TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const override; + + TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override; + + std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; +}; + +} diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 358a52ac016..0d2b7e728f1 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -23,7 +23,7 @@ public: , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) , Result(std::move(event)) , ReadMetadata(readMetadata) - , IndexedData(ReadMetadata, IndexedBlobsForFetch, true, counters) + , IndexedData(ReadMetadata, IndexedBlobsForFetch, true, counters, TDataTasksProcessorContainer()) , Deadline(deadline) , ColumnShardActorId(columnShardActorId) , RequestCookie(requestCookie) @@ -51,7 +51,7 @@ public: return; // ignore duplicate parts } WaitIndexed.erase(event.BlobRange); - IndexedData.AddIndexed(event.BlobRange, event.Data, NColumnShard::TDataTasksProcessorContainer()); + IndexedData.AddIndexed(event.BlobRange, event.Data); } else if (CommittedBlobs.contains(blobId)) { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0}); if (cmt.empty()) { |