aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-30 16:54:08 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-30 16:54:08 +0300
commit0f76e6d63e6c15b24f4127cd3e3196e67c79ff09 (patch)
treea465bc2c5ca805f593d123467f69385ba9163f1c
parent0bce9a145edc59b530712f3d84f057d9a1f8f1c3 (diff)
downloadydb-0f76e6d63e6c15b24f4127cd3e3196e67c79ff09.tar.gz
sorting as separated module for control fetching process
-rw-r--r--ydb/core/formats/arrow_filter.cpp6
-rw-r--r--ydb/core/formats/arrow_filter.h4
-rw-r--r--ydb/core/formats/arrow_helpers.cpp30
-rw-r--r--ydb/core/formats/arrow_helpers.h2
-rw-r--r--ydb/core/formats/program.cpp2
-rw-r--r--ydb/core/formats/switch_type.h11
-rw-r--r--ydb/core/formats/ut_arrow.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h15
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h2
-rw-r--r--ydb/core/tx/columnshard/engines/columns_table.h4
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp178
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h283
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h50
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp60
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h37
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.h10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp56
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h91
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp41
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp53
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.cpp146
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.h112
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp112
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h204
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp4
35 files changed, 1056 insertions, 512 deletions
diff --git a/ydb/core/formats/arrow_filter.cpp b/ydb/core/formats/arrow_filter.cpp
index 28fe9892eaf..014c032ec74 100644
--- a/ydb/core/formats/arrow_filter.cpp
+++ b/ydb/core/formats/arrow_filter.cpp
@@ -132,7 +132,7 @@ void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatc
}
}
-std::shared_ptr<arrow::BooleanArray> TColumnFilter::MakeFilter() const {
+std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter() const {
arrow::BooleanBuilder builder;
auto res = builder.Reserve(Count);
Y_VERIFY_OK(res);
@@ -320,7 +320,7 @@ bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) {
if (IsTotalAllowFilter()) {
return true;
}
- auto res = arrow::compute::Filter(batch, MakeFilter());
+ auto res = arrow::compute::Filter(batch, BuildArrowFilter());
Y_VERIFY_S(res.ok(), res.status().message());
Y_VERIFY((*res).kind() == arrow::Datum::RECORD_BATCH);
batch = (*res).record_batch();
@@ -438,7 +438,7 @@ ui32 TColumnFilter::GetInactiveHeadSize() const {
}
}
-std::vector<bool> TColumnFilter::BuildFilter() const {
+std::vector<bool> TColumnFilter::BuildSimpleFilter() const {
std::vector<bool> result;
result.reserve(Count);
bool currentValue = GetStartValue();
diff --git a/ydb/core/formats/arrow_filter.h b/ydb/core/formats/arrow_filter.h
index 43aaed58d09..a119ed59b1a 100644
--- a/ydb/core/formats/arrow_filter.h
+++ b/ydb/core/formats/arrow_filter.h
@@ -116,11 +116,11 @@ public:
void CutInactiveHead();
- std::vector<bool> BuildFilter() const;
+ std::vector<bool> BuildSimpleFilter() const;
TColumnFilter() = default;
- std::shared_ptr<arrow::BooleanArray> MakeFilter() const;
+ std::shared_ptr<arrow::BooleanArray> BuildArrowFilter() const;
bool IsTotalAllowFilter() const;
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index c1f2165b1d7..64f1f0f6c7b 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -708,9 +708,13 @@ bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<a
}
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) {
+ return ScalarCompare(x, y) < 0;
+}
+
+int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y) {
Y_VERIFY_S(x.type->Equals(y.type), x.type->ToString() + " vs " + y.type->ToString());
- return SwitchType(x.type->id(), [&](const auto& type) {
+ return SwitchTypeImpl<int, 0>(x.type->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
using TValue = std::decay_t<decltype(static_cast<const TScalar&>(x).value)>;
@@ -722,18 +726,36 @@ bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) {
Y_VERIFY(yval);
TStringBuf xBuf(reinterpret_cast<const char*>(xval->data()), xval->size());
TStringBuf yBuf(reinterpret_cast<const char*>(yval->data()), yval->size());
- return xBuf < yBuf;
+ if (xBuf < yBuf) {
+ return -1;
+ } else if (yBuf < xBuf) {
+ return 1;
+ } else {
+ return 0;
+ }
}
if constexpr (std::is_arithmetic_v<TValue>) {
const auto& xval = static_cast<const TScalar&>(x).value;
const auto& yval = static_cast<const TScalar&>(y).value;
- return xval < yval;
+ if (xval < yval) {
+ return -1;
+ } else if (yval < xval) {
+ return 1;
+ } else {
+ return 0;
+ }
}
Y_VERIFY(false); // TODO: non primitive types
- return false;
+ return 0;
});
}
+int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) {
+ Y_VERIFY(x);
+ Y_VERIFY(y);
+ return ScalarCompare(*x, *y);
+}
+
std::shared_ptr<arrow::UInt64Array> MakePermutation(int size, bool reverse) {
if (size < 1) {
return {};
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index d96fd11b0a9..97b381e98eb 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -130,6 +130,8 @@ std::pair<int, int> FindMinMaxPosition(const std::shared_ptr<arrow::Array>& colu
std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type);
std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position);
bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
+int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
+int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp
index 35760b7ab9b..e82fdbf8899 100644
--- a/ydb/core/formats/program.cpp
+++ b/ydb/core/formats/program.cpp
@@ -659,7 +659,7 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const {
}
}
- auto filter = bits.MakeFilter();
+ auto filter = bits.BuildArrowFilter();
for (int64_t i = 0; i < batch.Schema->num_fields(); ++i) {
bool needed = (allColumns || neededColumns.contains(batch.Schema->field(i)->name()));
if (batch.Datums[i].is_array() && needed) {
diff --git a/ydb/core/formats/switch_type.h b/ydb/core/formats/switch_type.h
index 5c78dc2014f..558589b71ad 100644
--- a/ydb/core/formats/switch_type.h
+++ b/ydb/core/formats/switch_type.h
@@ -10,8 +10,8 @@ struct TTypeWrapper
using T = TType;
};
-template <typename TFunc, bool EnableNull = false>
-bool SwitchType(arrow::Type::type typeId, TFunc&& f) {
+template <class TResult, TResult defaultValue, typename TFunc, bool EnableNull = false>
+TResult SwitchTypeImpl(arrow::Type::type typeId, TFunc&& f) {
switch (typeId) {
case arrow::Type::NA: {
if constexpr (EnableNull) {
@@ -84,7 +84,12 @@ bool SwitchType(arrow::Type::type typeId, TFunc&& f) {
break;
}
- return false;
+ return defaultValue;
+}
+
+template <typename TFunc, bool EnableNull = false>
+bool SwitchType(arrow::Type::type typeId, TFunc&& f) {
+ return SwitchTypeImpl<bool, false, TFunc, EnableNull>(typeId, std::move(f));
}
template <typename TFunc>
diff --git a/ydb/core/formats/ut_arrow.cpp b/ydb/core/formats/ut_arrow.cpp
index 020c0e5a2dd..5efa2fd9e11 100644
--- a/ydb/core/formats/ut_arrow.cpp
+++ b/ydb/core/formats/ut_arrow.cpp
@@ -632,10 +632,10 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
const NArrow::TColumnFilter gt = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER);
const NArrow::TColumnFilter ge = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER_OR_EQUAL);
- UNIT_ASSERT(CheckFilter(lt.BuildFilter(), 234, true));
- UNIT_ASSERT(CheckFilter(le.BuildFilter(), 235, true));
- UNIT_ASSERT(CheckFilter(gt.BuildFilter(), 235, false));
- UNIT_ASSERT(CheckFilter(ge.BuildFilter(), 234, false));
+ UNIT_ASSERT(CheckFilter(lt.BuildSimpleFilter(), 234, true));
+ UNIT_ASSERT(CheckFilter(le.BuildSimpleFilter(), 235, true));
+ UNIT_ASSERT(CheckFilter(gt.BuildSimpleFilter(), 235, false));
+ UNIT_ASSERT(CheckFilter(ge.BuildSimpleFilter(), 234, false));
}
Y_UNIT_TEST(SortWithCompositeKey) {
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 6e562d290b3..45edb94bd0d 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -7,7 +7,7 @@ namespace NKikimr::NColumnShard {
TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata,
NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters)
: ReadMetadata(readMetadata)
- , IndexedData(ReadMetadata, FetchBlobsQueue, false, scanCounters)
+ , IndexedData(ReadMetadata, FetchBlobsQueue, false, scanCounters, processor)
, DataTasksProcessor(processor)
, ScanCounters(scanCounters)
{
@@ -21,7 +21,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP
auto& blobId = cmtBlob.BlobId;
FetchBlobsQueue.emplace_back(TBlobRange(blobId, 0, blobId.BlobSize()));
}
- IndexedData.InitRead(batchNo, true);
+ IndexedData.InitRead(batchNo);
// Add cached batches without read
for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 });
@@ -42,7 +42,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP
void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data) {
const auto& blobId = blobRange.BlobId;
if (IndexedData.IsIndexedBlob(blobRange)) {
- IndexedData.AddIndexed(blobRange, data, DataTasksProcessor);
+ IndexedData.AddIndexed(blobRange, data);
} else {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 });
if (cmt.empty()) {
@@ -115,7 +115,7 @@ void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped()) {
return;
}
- Y_VERIFY(task->Apply(IndexedData));
+ Y_VERIFY(task->Apply(IndexedData.GetGranulesContext()));
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index 54fe14916f2..3353ad31262 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -2,7 +2,20 @@
#include "blob_cache.h"
#include "engines/reader/conveyor_task.h"
-#include "engines/indexed_read_data.h"
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap {
+// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
+struct TPartialReadResult {
+ std::shared_ptr<arrow::RecordBatch> ResultBatch;
+
+ // This 1-row batch contains the last key that was read while producing the ResultBatch.
+ // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
+ std::shared_ptr<arrow::RecordBatch> LastReadKey;
+
+ std::string ErrorString;
+};
+}
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index bce22aa077f..81504523507 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -2,6 +2,8 @@
#include "columnshard__scan.h"
#include "columnshard_common.h"
+#include "engines/reader/read_metadata.h"
+
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/sys_view/common/schema.h>
#include <ydb/core/formats/custom_registry.h>
diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h
index 8ad4b38abda..dabf291aca7 100644
--- a/ydb/core/tx/columnshard/engines/columns_table.h
+++ b/ydb/core/tx/columnshard/engines/columns_table.h
@@ -19,8 +19,8 @@ struct TColumnRecord {
TBlobRange BlobRange;
TString Metadata;
- ui32 GetRowsCount() const {
- return 0;
+ std::optional<ui32> GetChunkRowsCount() const {
+ return {};
}
bool operator == (const TColumnRecord& rec) const {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 2fa4c309227..4f4b18f5045 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -113,71 +113,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
}
-std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const {
- return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters);
-}
-
-std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
- std::set<ui32> result;
- if (LessPredicate) {
- for (auto&& i : LessPredicate->ColumnNames()) {
- result.emplace(IndexInfo.GetColumnId(i));
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
- }
- }
- if (GreaterPredicate) {
- for (auto&& i : GreaterPredicate->ColumnNames()) {
- result.emplace(IndexInfo.GetColumnId(i));
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
- }
- }
- if (Program) {
- for (auto&& i : Program->GetEarlyFilterColumns()) {
- auto id = IndexInfo.GetColumnIdOptional(i);
- if (id) {
- result.emplace(*id);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
- }
- }
- }
- if (PlanStep) {
- auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
- for (auto&& i : snapSchema->fields()) {
- result.emplace(IndexInfo.GetColumnId(i->name()));
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name());
- }
- }
- return result;
-}
-
-std::set<ui32> TReadMetadata::GetUsedColumnIds() const {
- std::set<ui32> result;
- if (PlanStep) {
- auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
- for (auto&& i : snapSchema->fields()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name());
- result.emplace(IndexInfo.GetColumnId(i->name()));
- }
- }
- for (auto&& f : LoadSchema->fields()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name());
- result.emplace(IndexInfo.GetColumnId(f->name()));
- }
- return result;
-}
-
-TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetResultYqlSchema() const {
- return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, ResultColumnIds);
-}
-
-TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYqlSchema() const {
- return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns);
-}
-
-std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TScanCounters& /*scanCounters*/) const {
- return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this());
-}
-
void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
Y_VERIFY(IndexedBlobs.emplace(range).second);
Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second);
@@ -192,20 +127,7 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::
}
}
-bool TIndexedReadData::PredictManyResultsAfterFilter(const TPortionInfo& portionInfo) const {
- if (!portionInfo.AllowEarlyFilter()) {
- return true;
- }
- if (EarlyFilterColumns.empty()) {
- return true;
- }
- if (TIndexInfo::IsSpecialColumns(EarlyFilterColumns)) {
- return true;
- }
- return false;
-}
-
-void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) {
+void TIndexedReadData::InitRead(ui32 inputBatch) {
Y_VERIFY(ReadMetadata->BlobSchema);
Y_VERIFY(ReadMetadata->LoadSchema);
Y_VERIFY(ReadMetadata->ResultSchema);
@@ -217,38 +139,18 @@ void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) {
NotIndexed.resize(inputBatch);
ui32 batchNo = inputBatch;
- Batches.resize(inputBatch + ReadMetadata->SelectInfo->Portions.size(), nullptr);
-
+ Y_VERIFY(!GranulesContext);
+ GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode, inputBatch + ReadMetadata->SelectInfo->Portions.size());
ui64 portionsBytes = 0;
for (auto& portionInfo : ReadMetadata->SelectInfo->Portions) {
portionsBytes += portionInfo.BlobsBytes();
- Y_VERIFY_S(portionInfo.Records.size() > 0, "ReadMeatadata: " << *ReadMetadata);
-
- ui64 granule = portionInfo.Records[0].Granule;
+ Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata);
- auto itGranule = Granules.find(granule);
- if (itGranule == Granules.end()) {
- itGranule = Granules.emplace(granule, NIndexedReader::TGranule(granule, *this)).first;
- }
-
- NIndexedReader::TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo);
- if (!OnePhaseReadMode && !PredictManyResultsAfterFilter(portionInfo)) {
- currentBatch.ResetNoFilter(&EarlyFilterColumns);
- } else {
- currentBatch.ResetNoFilter(&UsedColumns);
- }
- Batches[batchNo] = &currentBatch;
- ++batchNo;
+ NIndexedReader::TGranule& granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule);
+ granule.AddBatch(batchNo++, portionInfo);
}
+ GranulesContext->PrepareForStart();
- auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
- for (ui64 granule : granulesOrder) {
- auto it = Granules.find(granule);
- Y_VERIFY(it != Granules.end());
- if (inGranulesOrder) {
- GranulesOutOrder.emplace_back(&it->second);
- }
- }
Counters.GetPortionBytes()->Add(portionsBytes);
auto& stats = ReadMetadata->ReadStats;
stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size();
@@ -256,12 +158,13 @@ void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) {
stats->IndexBatches = ReadMetadata->NumIndexedBlobs();
stats->CommittedBatches = ReadMetadata->CommittedBlobs.size();
stats->SchemaColumns = ReadMetadata->LoadSchema->num_fields();
- stats->FilterColumns = EarlyFilterColumns.size();
- stats->AdditionalColumns = PostFilterColumns.size();
+ stats->FilterColumns = GranulesContext->GetEarlyFilterColumns().size();
+ stats->AdditionalColumns = GranulesContext->GetPostFilterColumns().size();
stats->PortionsBytes = portionsBytes;
}
-void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data, NColumnShard::TDataTasksProcessorContainer processor) {
+void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data) {
+ Y_VERIFY(GranulesContext);
NIndexedReader::TBatch* portionBatch = nullptr;
{
auto it = IndexedBlobSubscriber.find(blobRange);
@@ -276,8 +179,8 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da
return;
}
if (portionBatch->IsFetchingReady()) {
- if (auto batch = portionBatch->AssembleTask(processor.GetObject(), ReadMetadata)) {
- processor.Add(*this, batch);
+ if (auto batch = portionBatch->AssembleTask(TasksProcessor.GetObject(), ReadMetadata)) {
+ TasksProcessor.Add(*GranulesContext, batch);
}
}
}
@@ -336,7 +239,8 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
}
// Extract ready to out granules: ready granules that are not blocked by other (not ready) granules
- const bool requireResult = !IsInProgress(); // not indexed or the last indexed read (even if it's empty)
+ Y_VERIFY(GranulesContext);
+ const bool requireResult = !GranulesContext->IsInProgress(); // not indexed or the last indexed read (even if it's empty)
auto out = MakeResult(ReadyToOut(), maxRowsInBatch);
if (requireResult && out.empty()) {
out.push_back(TPartialReadResult{
@@ -346,36 +250,14 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
return out;
}
-std::vector<NIndexedReader::TGranule*> TIndexedReadData::DetachReadyInOrder() {
- std::vector<NIndexedReader::TGranule*> out;
- out.reserve(GranulesToOut.size());
-
- if (GranulesOutOrder.empty()) {
- for (auto& [_, granule] : GranulesToOut) {
- out.emplace_back(granule);
- }
- GranulesToOut.clear();
- } else {
- while (GranulesOutOrder.size()) {
- NIndexedReader::TGranule* granule = GranulesOutOrder.front();
- if (!granule->IsReady()) {
- break;
- }
- out.emplace_back(granule);
- Y_VERIFY(GranulesToOut.erase(granule->GetGranuleId()));
- GranulesOutOrder.pop_front();
- }
- }
-
- return out;
-}
-
/// @return batches that are not blocked by others
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::ReadyToOut() {
Y_VERIFY(SortReplaceDescription);
+ Y_VERIFY(GranulesContext);
+ std::vector<NIndexedReader::TGranule*> ready = GranulesContext->DetachReadyInOrder();
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> out;
- out.reserve(GranulesToOut.size() + 1);
+ out.reserve(ready.size() + 1);
// Prepend not indexed data (less then first granule) before granules for ASC sorting
if (ReadMetadata->IsAscSorted() && OutNotIndexed.count(0)) {
@@ -384,13 +266,13 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
OutNotIndexed.erase(0);
}
- std::vector<NIndexedReader::TGranule*> ready = DetachReadyInOrder();
for (auto&& granule : ready) {
bool canHaveDups = granule->IsDuplicationsAvailable();
std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = granule->GetReadyBatches();
// Append not indexed data to granules
- if (OutNotIndexed.count(granule->GetGranuleId())) {
- auto batch = OutNotIndexed[granule->GetGranuleId()];
+ auto itNotIndexed = OutNotIndexed.find(granule->GetGranuleId());
+ if (itNotIndexed != OutNotIndexed.end()) {
+ auto batch = itNotIndexed->second;
if (batch && batch->num_rows()) { // TODO: check why it could be empty
inGranule.push_back(batch);
canHaveDups = true;
@@ -420,7 +302,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
}
// Append not indexed data (less then first granule) after granules for DESC sorting
- if (ReadMetadata->IsDescSorted() && GranulesOutOrder.empty() && OutNotIndexed.count(0)) {
+ if (GranulesContext->GetSortingPolicy()->ReadyForAddNotIndexedToEnd() && OutNotIndexed.count(0)) {
out.push_back({});
out.back().push_back(OutNotIndexed[0]);
OutNotIndexed.erase(0);
@@ -571,26 +453,14 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
return out;
}
-NIndexedReader::TBatch& TIndexedReadData::GetBatchInfo(const ui32 batchNo) {
- Y_VERIFY(batchNo < Batches.size());
- auto ptr = Batches[batchNo];
- Y_VERIFY(ptr);
- return *ptr;
-}
-
TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue,
- const bool internalRead, const NColumnShard::TScanCounters& counters)
+ const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor)
: Counters(counters)
+ , TasksProcessor(tasksProcessor)
, FetchBlobsQueue(fetchBlobsQueue)
, ReadMetadata(readMetadata)
, OnePhaseReadMode(internalRead)
{
- UsedColumns = ReadMetadata->GetUsedColumnIds();
- PostFilterColumns = ReadMetadata->GetUsedColumnIds();
- EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds();
- for (auto&& i : EarlyFilterColumns) {
- PostFilterColumns.erase(i);
- }
Y_VERIFY(ReadMetadata->SelectInfo);
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index b33355b1296..e8dba569740 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -5,6 +5,8 @@
#include "reader/queue.h"
#include "reader/granule.h"
#include "reader/batch.h"
+#include "reader/filling_context.h"
+#include "reader/read_metadata.h"
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/columnshard/counters.h>
@@ -15,231 +17,43 @@ class TScanIteratorBase;
namespace NKikimr::NOlap {
-struct TReadStats {
- TInstant BeginTimestamp;
- ui32 SelectedIndex{0};
- ui64 IndexGranules{0};
- ui64 IndexPortions{0};
- ui64 IndexBatches{0};
- ui64 CommittedBatches{0};
- ui64 PortionsBytes{ 0 };
- ui64 DataFilterBytes{ 0 };
- ui64 DataAdditionalBytes{ 0 };
-
- ui32 SchemaColumns = 0;
- ui32 FilterColumns = 0;
- ui32 AdditionalColumns = 0;
-
- ui32 SelectedRows = 0;
-
- TReadStats(ui32 indexNo)
- : BeginTimestamp(TInstant::Now())
- , SelectedIndex(indexNo)
- {}
-
- void PrintToLog() {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
- ("event", "statistic")
- ("begin", BeginTimestamp)
- ("selected", SelectedIndex)
- ("index_granules", IndexGranules)
- ("index_portions", IndexPortions)
- ("index_batches", IndexBatches)
- ("committed_batches", CommittedBatches)
- ("schema_columns", SchemaColumns)
- ("filter_columns", FilterColumns)
- ("additional_columns", AdditionalColumns)
- ("portions_bytes", PortionsBytes)
- ("data_filter_bytes", DataFilterBytes)
- ("data_additional_bytes", DataAdditionalBytes)
- ("delta_bytes", PortionsBytes - DataFilterBytes - DataAdditionalBytes)
- ("selected_rows", SelectedRows)
- ;
- }
-
- TDuration Duration() {
- return TInstant::Now() - BeginTimestamp;
- }
-};
-
-// Holds all metadata that is needed to perform read/scan
-struct TReadMetadataBase {
- using TConstPtr = std::shared_ptr<const TReadMetadataBase>;
-
- enum class ESorting {
- NONE = 0,
- ASC,
- DESC,
- };
-
- virtual ~TReadMetadataBase() = default;
-
- std::shared_ptr<NOlap::TPredicate> LessPredicate;
- std::shared_ptr<NOlap::TPredicate> GreaterPredicate;
- std::shared_ptr<arrow::Schema> BlobSchema;
- std::shared_ptr<arrow::Schema> LoadSchema; // ResultSchema + required for intermediate operations
- std::shared_ptr<arrow::Schema> ResultSchema; // TODO: add Program modifications
- std::shared_ptr<NSsa::TProgram> Program;
- std::shared_ptr<const THashSet<TUnifiedBlobId>> ExternBlobs;
- ESorting Sorting{ESorting::ASC}; // Sorting inside returned batches
- ui64 Limit{0}; // TODO
-
- bool IsAscSorted() const { return Sorting == ESorting::ASC; }
- bool IsDescSorted() const { return Sorting == ESorting::DESC; }
- bool IsSorted() const { return IsAscSorted() || IsDescSorted(); }
- void SetDescSorting() { Sorting = ESorting::DESC; }
-
- virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0;
- virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0;
- virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const = 0;
- virtual void Dump(IOutputStream& out) const { Y_UNUSED(out); };
-
- // TODO: can this only be done for base class?
- friend IOutputStream& operator << (IOutputStream& out, const TReadMetadataBase& meta) {
- meta.Dump(out);
- return out;
- }
-};
-
-// Holds all metadata that is needed to perform read/scan
-struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadMetadata> {
- using TConstPtr = std::shared_ptr<const TReadMetadata>;
-
- TIndexInfo IndexInfo;
- ui64 PlanStep = 0;
- ui64 TxId = 0;
- std::shared_ptr<TSelectInfo> SelectInfo;
- std::vector<TCommittedBlob> CommittedBlobs;
- THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches;
- std::shared_ptr<TReadStats> ReadStats;
-
- TReadMetadata(const TIndexInfo& info)
- : IndexInfo(info)
- , ReadStats(std::make_shared<TReadStats>(info.GetId()))
- {}
-
- std::vector<std::string> GetColumnsOrder() const {
- std::vector<std::string> result;
- for (auto&& i : LoadSchema->fields()) {
- result.emplace_back(i->name());
- }
- return result;
- }
-
- std::set<ui32> GetEarlyFilterColumnIds() const;
- std::set<ui32> GetUsedColumnIds() const;
-
- bool Empty() const {
- Y_VERIFY(SelectInfo);
- return SelectInfo->Portions.empty() && CommittedBlobs.empty();
- }
-
- std::shared_ptr<arrow::Schema> GetSortingKey() const {
- return IndexInfo.GetSortingKey();
- }
-
- std::shared_ptr<arrow::Schema> GetReplaceKey() const {
- return IndexInfo.GetReplaceKey();
- }
-
- TVector<TNameTypeInfo> GetResultYqlSchema() const override {
- TVector<NTable::TTag> columnIds;
- columnIds.reserve(ResultSchema->num_fields());
- for (const auto& field: ResultSchema->fields()) {
- TString name = TStringBuilder() << field->name();
- columnIds.emplace_back(IndexInfo.GetColumnId(name));
- }
- return IndexInfo.GetColumns(columnIds);
- }
-
- TVector<TNameTypeInfo> GetKeyYqlSchema() const override {
- return IndexInfo.GetPrimaryKey();
- }
-
- size_t NumIndexedRecords() const {
- Y_VERIFY(SelectInfo);
- return SelectInfo->NumRecords();
- }
-
- size_t NumIndexedBlobs() const {
- Y_VERIFY(SelectInfo);
- return SelectInfo->Stats().Blobs;
- }
-
- std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
-
- void Dump(IOutputStream& out) const override {
- out << "columns: " << (LoadSchema ? LoadSchema->num_fields() : 0)
- << " index records: " << NumIndexedRecords()
- << " index blobs: " << NumIndexedBlobs()
- << " committed blobs: " << CommittedBlobs.size()
- << " with program steps: " << (Program ? Program->Steps.size() : 0)
- << (Sorting == ESorting::NONE ? " not" : (Sorting == ESorting::ASC ? " asc" : " desc"))
- << " sorted, at snapshot: " << PlanStep << ":" << TxId;
- if (GreaterPredicate) {
- out << " from{" << *GreaterPredicate << "}";
- }
- if (LessPredicate) {
- out << " to{" << *LessPredicate << "}";
- }
- if (SelectInfo) {
- out << ", " << *SelectInfo;
- }
- }
-
- friend IOutputStream& operator << (IOutputStream& out, const TReadMetadata& meta) {
- meta.Dump(out);
- return out;
- }
-};
-
-struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadStatsMetadata> {
- using TConstPtr = std::shared_ptr<const TReadStatsMetadata>;
-
- const ui64 TabletId;
- TVector<ui32> ReadColumnIds;
- TVector<ui32> ResultColumnIds;
- THashMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats;
-
- explicit TReadStatsMetadata(ui64 tabletId)
- : TabletId(tabletId)
- {}
-
- TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const override;
-
- TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override;
-
- std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
-};
-
-// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
-struct TPartialReadResult {
- std::shared_ptr<arrow::RecordBatch> ResultBatch;
-
- // This 1-row batch contains the last key that was read while producing the ResultBatch.
- // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
- std::shared_ptr<arrow::RecordBatch> LastReadKey;
-
- std::string ErrorString;
-};
-
class TIndexedReadData {
private:
- YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns);
- YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns);
- std::set<ui32> UsedColumns;
- bool AbortedFlag = false;
+ std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext;
+
YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters);
- std::vector<NIndexedReader::TBatch*> Batches;
+ YDB_READONLY_DEF(NColumnShard::TDataTasksProcessorContainer, TasksProcessor);
TFetchBlobsQueue& FetchBlobsQueue;
- bool PredictManyResultsAfterFilter(const TPortionInfo& portionInfo) const;
+ NOlap::TReadMetadata::TConstPtr ReadMetadata;
+ bool OnePhaseReadMode = false;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
+
+ THashSet<const void*> BatchesToDedup;
+ THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch
+ THashSet<TBlobRange> IndexedBlobs;
+ ui32 ReadyNotIndexed{ 0 };
+ THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append
+ std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
+
public:
- TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, const bool internalRead, const NColumnShard::TScanCounters& counters);
+ TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue,
+ const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor);
- NIndexedReader::TBatch& GetBatchInfo(const ui32 batchNo);
+ NIndexedReader::TGranulesFillingContext& GetGranulesContext() {
+ Y_VERIFY(GranulesContext);
+ return *GranulesContext;
+ }
/// Initial FetchBlobsQueue filling (queue from external scan iterator). Granules could be read independently
- void InitRead(ui32 numNotIndexed, bool inGranulesOrder = false);
+ void InitRead(ui32 numNotIndexed);
+ void Abort() {
+ Y_VERIFY(GranulesContext);
+ return GranulesContext->Abort();
+ }
+ bool IsInProgress() const {
+ Y_VERIFY(GranulesContext);
+ return GranulesContext->IsInProgress();
+ }
/// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
@@ -257,20 +71,10 @@ public:
NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId);
}
- void AddIndexed(const TBlobRange& blobRange, const TString& column, NColumnShard::TDataTasksProcessorContainer processor);
- bool IsInProgress() const { return Granules.size() > ReadyGranulesAccumulator.size(); }
+ void AddIndexed(const TBlobRange& blobRange, const TString& column);
bool IsIndexedBlob(const TBlobRange& blobRange) const {
return IndexedBlobs.contains(blobRange);
}
- void Abort() {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort");
- for (auto&& i : Granules) {
- ReadyGranulesAccumulator.emplace(i.first);
- }
- AbortedFlag = true;
- Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size());
- Y_VERIFY(!IsInProgress());
- }
NOlap::TReadMetadata::TConstPtr GetReadMetadata() const {
return ReadMetadata;
}
@@ -288,30 +92,7 @@ public:
}
}
- void OnGranuleReady(NIndexedReader::TGranule& granule) {
- Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second);
- Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag);
- }
-
private:
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
- bool OnePhaseReadMode = false;
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
-
- THashSet<const void*> BatchesToDedup;
- THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch
- THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut;
- std::set<ui64> ReadyGranulesAccumulator;
- THashSet<TBlobRange> IndexedBlobs;
- ui32 ReadyNotIndexed{0};
- THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append
- THashMap<ui64, NIndexedReader::TGranule> Granules;
- TDeque<NIndexedReader::TGranule*> GranulesOutOrder;
- std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
-
- std::vector<NIndexedReader::TGranule*> DetachReadyInOrder();
-
const TIndexInfo& IndexInfo() const {
return ReadMetadata->IndexInfo;
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index ed6a1880552..e3cd2e4244d 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -62,6 +62,11 @@ struct TPortionInfo {
bool CanIntersectOthers() const { return !Valid() || IsInserted(); }
size_t NumRecords() const { return Records.size(); }
+ bool IsSortableInGranule() const {
+ return Meta.Produced == TPortionMeta::COMPACTED
+ || Meta.Produced == TPortionMeta::SPLIT_COMPACTED;
+ }
+
bool AllowEarlyFilter() const {
return Meta.Produced == TPortionMeta::COMPACTED
|| Meta.Produced == TPortionMeta::SPLIT_COMPACTED;
@@ -202,6 +207,51 @@ struct TPortionInfo {
}
return Meta.ColumnMeta.find(columnId)->second.HasMinMax();
}
+private:
+ class TMinGetter {
+ public:
+ static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
+ return portionInfo.MinValue(columnId);
+ }
+ };
+
+ class TMaxGetter {
+ public:
+ static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
+ return portionInfo.MaxValue(columnId);
+ }
+ };
+
+ template <class TSelfGetter, class TItemGetter = TSelfGetter>
+ int CompareByColumnIdsImpl(const TPortionInfo& item, const TVector<ui32>& columnIds) const {
+ for (auto&& i : columnIds) {
+ std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i);
+ std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i);
+ if (!!valueSelf && !!valueItem) {
+ const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem);
+ if (cmpResult) {
+ return cmpResult;
+ }
+ } else if (!!valueSelf) {
+ return 1;
+ } else if (!!valueItem) {
+ return -1;
+ }
+ }
+ return 0;
+ }
+public:
+ int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
+ return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns);
+ }
+
+ int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
+ return CompareMinByColumnIds(item, info.KeyColumns);
+ }
+
+ int CompareMinByColumnIds(const TPortionInfo& item, const TVector<ui32>& columnIds) const {
+ return CompareByColumnIdsImpl<TMinGetter>(item, columnIds);
+ }
class TAssembleBlobInfo {
private:
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index 44908d898bf..ab853275e1b 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -25,4 +25,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index ba709a0cff0..e0b9dfc7be9 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -26,4 +26,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index ba709a0cff0..e0b9dfc7be9 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -26,4 +26,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index 44908d898bf..ab853275e1b 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -25,4 +25,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 3ac9a61cfe2..976dd31dac5 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -51,10 +51,10 @@ bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const {
return true;
}
-ui64 TBatch::GetFetchBytes(const std::set<ui32>* columnIds) {
+ui64 TBatch::GetFetchBytes(const std::set<ui32>& columnIds) {
ui64 result = 0;
for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
- if (columnIds && !columnIds->contains(rec.ColumnId)) {
+ if (!columnIds.contains(rec.ColumnId)) {
continue;
}
Y_VERIFY(rec.Portion == Portion);
@@ -65,15 +65,11 @@ ui64 TBatch::GetFetchBytes(const std::set<ui32>* columnIds) {
return result;
}
-void TBatch::ResetCommon(const std::set<ui32>* columnIds) {
- if (!columnIds) {
- CurrentColumnIds.reset();
- } else {
- CurrentColumnIds = *columnIds;
- Y_VERIFY(CurrentColumnIds->size());
- for (auto&& i : *CurrentColumnIds) {
- AskedColumnIds.emplace(i);
- }
+void TBatch::ResetCommon(const std::set<ui32>& columnIds) {
+ CurrentColumnIds = columnIds;
+ Y_VERIFY(CurrentColumnIds->size());
+ for (auto&& i : *CurrentColumnIds) {
+ Y_VERIFY(AskedColumnIds.emplace(i).second);
}
Y_VERIFY(WaitIndexed.empty());
@@ -82,14 +78,13 @@ void TBatch::ResetCommon(const std::set<ui32>* columnIds) {
FetchedBytes = 0;
}
-void TBatch::ResetNoFilter(const std::set<ui32>* columnIds) {
+void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) {
Y_VERIFY(!Filter);
ResetCommon(columnIds);
for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
continue;
}
- AskedColumnIds.emplace(rec.ColumnId);
Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second);
Owner->AddBlobForFetch(rec.BlobRange, *this);
Y_VERIFY(rec.Portion == Portion);
@@ -99,7 +94,7 @@ void TBatch::ResetNoFilter(const std::set<ui32>* columnIds) {
}
}
-void TBatch::ResetWithFilter(const std::set<ui32>* columnIds) {
+void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
Y_VERIFY(Filter);
ResetCommon(columnIds);
std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects;
@@ -107,7 +102,6 @@ void TBatch::ResetWithFilter(const std::set<ui32>* columnIds) {
if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
continue;
}
- AskedColumnIds.emplace(rec.ColumnId);
orderedObjects[rec.ColumnId][rec.Chunk] = &rec;
Y_VERIFY(rec.Valid());
Y_VERIFY(Portion == rec.Portion);
@@ -122,29 +116,31 @@ void TBatch::ResetWithFilter(const std::set<ui32>* columnIds) {
for (auto&& [chunk, rec] : columnInfo.second) {
Y_VERIFY(!itFinished);
Y_VERIFY(expected++ == chunk);
- if (!rec->GetRowsCount()) {
+ if (!rec->GetChunkRowsCount()) {
undefinedShift = true;
}
- if (!undefinedShift && it.IsBatchForSkip(rec->GetRowsCount())) {
- Data.emplace(rec->BlobRange, TPortionInfo::TAssembleBlobInfo(rec->GetRowsCount()));
+ if (!undefinedShift && it.IsBatchForSkip(*rec->GetChunkRowsCount())) {
+ Data.emplace(rec->BlobRange, TPortionInfo::TAssembleBlobInfo(*rec->GetChunkRowsCount()));
} else {
Y_VERIFY(WaitIndexed.emplace(rec->BlobRange).second);
Owner->AddBlobForFetch(rec->BlobRange, *this);
WaitingBytes += rec->BlobRange.Size;
}
if (!undefinedShift) {
- itFinished = !it.Next(rec->GetRowsCount());
+ itFinished = !it.Next(*rec->GetChunkRowsCount());
}
}
}
}
-void TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch) {
+bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, const ui32 originalRecordsCount) {
Y_VERIFY(filter);
Y_VERIFY(!Filter);
Y_VERIFY(!FilterBatch);
Filter = filter;
FilterBatch = filterBatch;
+ OriginalRecordsCount = originalRecordsCount;
+ return Owner->OnFilterReady(*this);
}
void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) {
@@ -164,4 +160,28 @@ bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData)
return true;
}
+bool TBatch::NeedAdditionalData() const {
+ if (!Filter) {
+ return true;
+ }
+ if (!FilteredBatch || !FilteredBatch->num_rows()) {
+ return false;
+ }
+ if (AskedColumnsAlready(Owner->GetOwner().GetPostFilterColumns())) {
+ return false;
+ }
+ return true;
+}
+
+ui64 TBatch::GetUsefulBytes(const ui64 bytes) const {
+ if (!FilteredBatch || !FilteredBatch->num_rows()) {
+ return 0;
+ }
+ Y_VERIFY_DEBUG(OriginalRecordsCount);
+ if (!OriginalRecordsCount) {
+ return 0;
+ }
+ return bytes * FilteredBatch->num_rows() / OriginalRecordsCount;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 55f3417d265..4198c864543 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -29,6 +29,8 @@ private:
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch);
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch);
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
+ ui32 OriginalRecordsCount = 0;
+
YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
TGranule* Owner;
@@ -36,17 +38,42 @@ private:
YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds);
std::set<ui32> AskedColumnIds;
- void ResetCommon(const std::set<ui32>* columnIds);
+ void ResetCommon(const std::set<ui32>& columnIds);
+ ui64 GetUsefulBytes(const ui64 bytes) const;
+
public:
+ ui64 GetUsefulWaitingBytes() const {
+ return GetUsefulBytes(WaitingBytes);
+ }
+
+ ui64 GetUsefulFetchedBytes() const {
+ return GetUsefulBytes(FetchedBytes);
+ }
+
+ bool NeedAdditionalData() const;
+ bool IsSortableInGranule() const {
+ return PortionInfo->IsSortableInGranule();
+ }
TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo);
bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData);
bool AskedColumnsAlready(const std::set<ui32>& columnIds) const;
- void ResetNoFilter(const std::set<ui32>* columnIds);
- void ResetWithFilter(const std::set<ui32>* columnIds);
- ui64 GetFetchBytes(const std::set<ui32>* columnIds);
+ void ResetNoFilter(const std::set<ui32>& columnIds);
+ void ResetWithFilter(const std::set<ui32>& columnIds);
+ ui64 GetFetchBytes(const std::set<ui32>& columnIds);
- void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch);
+ bool IsFiltered() const {
+ return !!Filter;
+ }
+ ui32 GetFilteredRecordsCount() const {
+ Y_VERIFY(IsFiltered());
+ if (!FilterBatch) {
+ return 0;
+ } else {
+ return FilterBatch->num_rows();
+ }
+ }
+ bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, const ui32 originalRecordsCount);
void InitBatch(std::shared_ptr<arrow::RecordBatch> batch);
NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata);
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
index c3738a9c663..a26e4daf1b9 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
@@ -12,7 +12,7 @@ bool IDataTasksProcessor::ITask::DoExecute() {
}
}
-bool IDataTasksProcessor::ITask::Apply(NOlap::TIndexedReadData& indexedDataRead) const {
+bool IDataTasksProcessor::ITask::Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const {
if (OwnerOperator) {
OwnerOperator->ReplyReceived();
if (OwnerOperator->IsStopped()) {
@@ -38,12 +38,12 @@ bool IDataTasksProcessor::Add(ITask::TPtr task) {
}
-void TDataTasksProcessorContainer::Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task) {
+void TDataTasksProcessorContainer::Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task) {
if (Object) {
Object->Add(task);
} else {
task->Execute();
- task->Apply(indexedDataRead);
+ task->Apply(context);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
index 90a8f98a10d..fa0a8b1ce2c 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
@@ -2,8 +2,8 @@
#include <ydb/core/tx/conveyor/usage/abstract.h>
#include <ydb/library/accessor/accessor.h>
-namespace NKikimr::NOlap {
-class TIndexedReadData;
+namespace NKikimr::NOlap::NIndexedReader {
+class TGranulesFillingContext;
}
namespace NKikimr::NColumnShard {
@@ -23,7 +23,7 @@ public:
YDB_READONLY_FLAG(DataProcessed, false);
protected:
TDataTasksProcessorContainer GetTasksProcessorContainer() const;
- virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0;
+ virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const = 0;
virtual bool DoExecuteImpl() = 0;
virtual bool DoExecute() override final;
@@ -34,7 +34,7 @@ public:
}
using TPtr = std::shared_ptr<ITask>;
virtual ~ITask() = default;
- bool Apply(NOlap::TIndexedReadData& indexedDataRead) const;
+ bool Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const;
};
protected:
virtual bool DoAdd(ITask::TPtr task) = 0;
@@ -84,7 +84,7 @@ public:
return Object && Object->IsStopped();
}
- void Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task);
+ void Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task);
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
new file mode 100644
index 00000000000..83293a384d1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -0,0 +1,56 @@
+#include "filling_context.h"
+#include "filling_context.h"
+#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount)
+ : ReadMetadata(readMetadata)
+ , InternalReading(internalReading)
+ , Owner(owner)
+ , Counters(owner.GetCounters())
+{
+ Batches.resize(batchesCount, nullptr);
+ SortingPolicy = InternalReading ? std::make_shared<TNonSorting>(ReadMetadata) : ReadMetadata->BuildSortingPolicy();
+
+ UsedColumns = ReadMetadata->GetUsedColumnIds();
+ PostFilterColumns = ReadMetadata->GetUsedColumnIds();
+ EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds();
+ for (auto&& i : EarlyFilterColumns) {
+ PostFilterColumns.erase(i);
+ }
+}
+
+bool TGranulesFillingContext::PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const {
+ if (!portionInfo.AllowEarlyFilter()) {
+ return false;
+ }
+ if (EarlyFilterColumns.empty()) {
+ return false;
+ }
+ if (TIndexInfo::IsSpecialColumns(EarlyFilterColumns)) {
+ return false;
+ }
+ return true;
+}
+
+void TGranulesFillingContext::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
+ return Owner.AddBlobForFetch(range, batch);
+}
+
+void TGranulesFillingContext::OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
+ return Owner.OnBatchReady(batchInfo, batch);
+}
+
+NKikimr::NOlap::NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfo(const ui32 batchNo) {
+ Y_VERIFY(batchNo < Batches.size());
+ auto ptr = Batches[batchNo];
+ Y_VERIFY(ptr);
+ return *ptr;
+}
+
+NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const {
+ return Owner.GetTasksProcessor();
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
new file mode 100644
index 00000000000..2b71adcb028
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -0,0 +1,91 @@
+#pragma once
+#include "conveyor_task.h"
+#include "granule.h"
+#include "order_controller.h"
+#include <util/generic/hash.h>
+
+namespace NKikimr::NOlap {
+class TIndexedReadData;
+}
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TGranulesFillingContext {
+private:
+ bool AbortedFlag = false;
+ YDB_READONLY_DEF(TReadMetadata::TConstPtr, ReadMetadata);
+ const bool InternalReading = false;
+ TIndexedReadData& Owner;
+ THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut;
+ std::set<ui64> ReadyGranulesAccumulator;
+ THashMap<ui64, NIndexedReader::TGranule> Granules;
+ YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns);
+ YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns);
+ std::set<ui32> UsedColumns;
+ YDB_READONLY_DEF(IOrderPolicy::TPtr, SortingPolicy);
+ YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters);
+ std::vector<NIndexedReader::TBatch*> Batches;
+
+ bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const;
+
+public:
+ TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount);
+
+ NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const;
+
+ TBatch& GetBatchInfo(const ui32 batchNo);
+
+ void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
+ void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
+
+ NIndexedReader::TGranule& GetGranuleVerified(const ui64 granuleId) {
+ auto it = Granules.find(granuleId);
+ Y_VERIFY(it != Granules.end());
+ return it->second;
+ }
+
+ bool IsInProgress() const { return Granules.size() > ReadyGranulesAccumulator.size(); }
+
+ void OnNewBatch(TBatch& batch) {
+ if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) {
+ batch.ResetNoFilter(EarlyFilterColumns);
+ } else {
+ batch.ResetNoFilter(UsedColumns);
+ }
+ Batches[batch.GetBatchNo()] = &batch;
+ }
+
+ std::vector<TGranule*> DetachReadyInOrder() {
+ Y_VERIFY(SortingPolicy);
+ return SortingPolicy->DetachReadyGranules(GranulesToOut);
+ }
+
+ void Abort() {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort");
+ for (auto&& i : Granules) {
+ ReadyGranulesAccumulator.emplace(i.first);
+ }
+ AbortedFlag = true;
+ Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size());
+ Y_VERIFY(!IsInProgress());
+ }
+
+ TGranule& UpsertGranule(const ui64 granuleId) {
+ auto itGranule = Granules.find(granuleId);
+ if (itGranule == Granules.end()) {
+ itGranule = Granules.emplace(granuleId, NIndexedReader::TGranule(granuleId, *this)).first;
+ }
+ return itGranule->second;
+ }
+
+ void OnGranuleReady(TGranule& granule) {
+ Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second);
+ Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag);
+ }
+
+ void PrepareForStart() {
+ SortingPolicy->Fill(*this);
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index 1ae809aa491..bff7044c5b7 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -46,49 +46,12 @@ bool TAssembleFilter::DoExecuteImpl() {
return true;
}
-bool TAssembleFilter::DoApply(TIndexedReadData& owner) const {
+bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const {
TBatch& batch = owner.GetBatchInfo(BatchNo);
Y_VERIFY(OriginalCount);
owner.GetCounters().GetOriginalRowsCount()->Add(OriginalCount);
- batch.InitFilter(Filter, FilteredBatch);
owner.GetCounters().GetAssembleFilterCount()->Add(1);
- if (!FilteredBatch || FilteredBatch->num_rows() == 0) {
- owner.GetCounters().GetEmptyFilterCount()->Add(1);
- owner.GetCounters().GetEmptyFilterFetchedBytes()->Add(batch.GetFetchedBytes());
- owner.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(&owner.GetPostFilterColumns()));
- batch.InitBatch(FilteredBatch);
- } else {
- owner.GetCounters().GetFilteredRowsCount()->Add(FilteredBatch->num_rows());
- if (batch.AskedColumnsAlready(owner.GetPostFilterColumns())) {
- owner.GetCounters().GetFilterOnlyCount()->Add(1);
- owner.GetCounters().GetFilterOnlyFetchedBytes()->Add(batch.GetFetchedBytes());
- owner.GetCounters().GetFilterOnlyUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount);
- owner.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(&owner.GetPostFilterColumns()));
-
- batch.InitBatch(FilteredBatch);
- } else {
- owner.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes());
- owner.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount);
-
- batch.ResetWithFilter(&owner.GetPostFilterColumns());
- if (batch.IsFetchingReady()) {
- auto processor = GetTasksProcessorContainer();
- if (auto assembleBatchTask = batch.AssembleTask(processor.GetObject(), owner.GetReadMetadata())) {
- processor.Add(owner, assembleBatchTask);
- }
- }
-
- owner.GetCounters().GetTwoPhasesCount()->Add(1);
- owner.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes());
- owner.GetCounters().GetTwoPhasesPostFilterUsefulBytes()->Add(batch.GetWaitingBytes() * FilteredBatch->num_rows() / OriginalCount);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
- ("filtered_count", FilteredBatch->num_rows())
- ("blobs_count", batch.GetWaitingBlobs().size())
- ("columns_count", batch.GetCurrentColumnIds()->size())
- ("fetch_size", batch.GetWaitingBytes())
- ;
- }
- }
+ batch.InitFilter(Filter, FilteredBatch, OriginalCount);
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
index 7ec1b9af0cc..c2f65c6d6dc 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
@@ -21,7 +21,7 @@ namespace NKikimr::NOlap::NIndexedReader {
bool AllowEarlyFilter = false;
std::set<ui32> FilterColumnIds;
protected:
- virtual bool DoApply(TIndexedReadData& owner) const override;
+ virtual bool DoApply(TGranulesFillingContext& owner) const override;
virtual bool DoExecuteImpl() override;
public:
TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index e7c4d80f82e..7302e6add52 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -1,10 +1,16 @@
#include "granule.h"
+#include "filling_context.h"
#include <ydb/core/tx/columnshard/engines/portion_info.h>
#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
namespace NKikimr::NOlap::NIndexedReader {
void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
+ if (Owner->GetSortingPolicy()->CanInterrupt()) {
+ if (ReadyFlag) {
+ return;
+ }
+ }
Y_VERIFY(!ReadyFlag);
Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo()));
if (batch && batch->num_rows()) {
@@ -20,9 +26,10 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) {
Y_VERIFY(!ReadyFlag);
WaitBatches.emplace(batchNo);
- auto infoEmplace = Batches.emplace(batchNo, TBatch(batchNo, *this, portionInfo));
- Y_VERIFY(infoEmplace.second);
- return infoEmplace.first->second;
+ Batches.emplace_back(TBatch(batchNo, *this, portionInfo));
+ Y_VERIFY(GranuleBatchNumbers.emplace(batchNo).second);
+ Owner->OnNewBatch(Batches.back());
+ return Batches.back();
}
void TGranule::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const {
@@ -33,4 +40,44 @@ const std::set<ui32>& TGranule::GetEarlyFilterColumns() const {
return Owner->GetEarlyFilterColumns();
}
+bool TGranule::OnFilterReady(TBatch& batchInfo) {
+ if (ReadyFlag) {
+ return false;
+ }
+ return Owner->GetSortingPolicy()->OnFilterReady(batchInfo, *this, *Owner);
+}
+
+std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata) {
+ std::deque<TBatch*> batches;
+ for (auto&& i : Batches) {
+ batches.emplace_back(&i);
+ }
+ const int reverseKff = reverse ? -1 : 0;
+ const auto pred = [reverseKff, readMetadata](const TBatch* l, const TBatch* r) {
+ if (l->IsSortableInGranule() && r->IsSortableInGranule()) {
+ return l->GetPortionInfo().CompareMinByPk(r->GetPortionInfo(), readMetadata->IndexInfo) * reverseKff < 0;
+ } else if (l->IsSortableInGranule()) {
+ return false;
+ } else if (r->IsSortableInGranule()) {
+ return true;
+ } else {
+ return false;
+ }
+ };
+ std::sort(batches.begin(), batches.end(), pred);
+ bool nonCompactedSerial = true;
+ for (ui32 i = 0; i + 1 < batches.size(); ++i) {
+ if (batches[i]->IsSortableInGranule()) {
+ auto& l = *batches[i];
+ auto& r = *batches[i + 1];
+ Y_VERIFY(r.IsSortableInGranule());
+ Y_VERIFY(l.GetPortionInfo().CompareSelfMaxItemMinByPk(r.GetPortionInfo(), readMetadata->IndexInfo) * reverseKff <= 0);
+ nonCompactedSerial = false;
+ } else {
+ Y_VERIFY(nonCompactedSerial);
+ }
+ }
+ return batches;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index 6c32ea426e0..b7400339d30 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -1,5 +1,6 @@
#pragma once
#include "batch.h"
+#include "read_metadata.h"
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/columnshard/engines/portion_info.h>
@@ -8,24 +9,32 @@
namespace NKikimr::NOlap::NIndexedReader {
+class TGranulesFillingContext;
+
class TGranule {
private:
YDB_READONLY(ui64, GranuleId, 0);
YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, ReadyBatches);
YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
YDB_READONLY_FLAG(Ready, false);
- THashMap<ui32, TBatch> Batches;
+ std::deque<TBatch> Batches;
std::set<ui32> WaitBatches;
- TIndexedReadData* Owner = nullptr;
+ std::set<ui32> GranuleBatchNumbers;
+ TGranulesFillingContext* Owner = nullptr;
public:
- TGranule(const ui64 granuleId, TIndexedReadData& owner)
+ TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
: GranuleId(granuleId)
, Owner(&owner) {
+ }
+ const TGranulesFillingContext& GetOwner() const {
+ return *Owner;
}
+ std::deque<TBatch*> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata);
const std::set<ui32>& GetEarlyFilterColumns() const;
void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
+ bool OnFilterReady(TBatch& batchInfo);
TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const;
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
new file mode 100644
index 00000000000..1c667356728
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
@@ -0,0 +1,146 @@
+#include "order_controller.h"
+#include "filling_context.h"
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+void TAnySorting::DoFill(TGranulesFillingContext& context) {
+ auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
+ for (ui64 granule : granulesOrder) {
+ TGranule& g = context.GetGranuleVerified(granule);
+ GranulesOutOrder.emplace_back(&g);
+ }
+}
+
+std::vector<TGranule*> TAnySorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ std::vector<TGranule*> result;
+ while (GranulesOutOrder.size()) {
+ NIndexedReader::TGranule* granule = GranulesOutOrder.front();
+ if (!granule->IsReady()) {
+ break;
+ }
+ result.emplace_back(granule);
+ Y_VERIFY(granulesToOut.erase(granule->GetGranuleId()));
+ GranulesOutOrder.pop_front();
+ }
+ return result;
+}
+
+std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ std::vector<TGranule*> result;
+ result.reserve(granulesToOut.size());
+ for (auto&& i : granulesToOut) {
+ result.emplace_back(i.second);
+ }
+ granulesToOut.clear();
+ return result;
+}
+
+bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) {
+ Y_VERIFY(ReadMetadata->Limit);
+ if (!CurrentItemsLimit) {
+ return false;
+ }
+ Y_VERIFY(GranulesOutOrderForPortions.size());
+ if (granule.GetGranuleId() != GranulesOutOrderForPortions.front()->GetGranuleId()) {
+ return false;
+ }
+ while (GranulesOutOrderForPortions.size()) {
+ auto it = OrderedBatches.find(GranulesOutOrderForPortions.front()->GetGranuleId());
+ Y_VERIFY(it != OrderedBatches.end());
+ while (it->second.size() && it->second.front()->IsFiltered() && CurrentItemsLimit) {
+ auto b = it->second.front();
+ if (b->IsSortableInGranule()) {
+ if (CurrentItemsLimit <= b->GetFilteredRecordsCount()) {
+ CurrentItemsLimit = 0;
+ } else {
+ CurrentItemsLimit -= b->GetFilteredRecordsCount();
+ }
+ } else {
+ CurrentItemsLimit += b->GetFilteredRecordsCount();
+ }
+ OnBatchFilterInitialized(*b, context);
+
+ it->second.pop_front();
+ }
+ if (!CurrentItemsLimit || it->second.empty()) {
+ while (it->second.size()) {
+ auto b = it->second.front();
+ context.GetCounters().GetSkippedBytes()->Add(b->GetFetchBytes(context.GetPostFilterColumns()));
+ b->InitBatch(nullptr);
+ it->second.pop_front();
+ }
+ OrderedBatches.erase(it);
+ GranulesOutOrderForPortions.pop_front();
+ } else {
+ break;
+ }
+ }
+ return false;
+}
+
+void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) {
+ auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
+ for (ui64 granule : granulesOrder) {
+ TGranule& g = context.GetGranuleVerified(granule);
+ GranulesOutOrder.emplace_back(&g);
+ Y_VERIFY(OrderedBatches.emplace(granule, g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata)).second);
+ }
+ GranulesOutOrderForPortions = GranulesOutOrder;
+}
+
+std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ std::vector<TGranule*> result;
+ while (GranulesOutOrder.size()) {
+ NIndexedReader::TGranule* granule = GranulesOutOrder.front();
+ if (!granule->IsReady()) {
+ break;
+ }
+ result.emplace_back(granule);
+ Y_VERIFY(granulesToOut.erase(granule->GetGranuleId()));
+ GranulesOutOrder.pop_front();
+ }
+ return result;
+}
+
+void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context) {
+ Y_VERIFY(!!batch.GetFilter());
+ if (!batch.GetFilteredRecordsCount()) {
+ context.GetCounters().GetEmptyFilterCount()->Add(1);
+ context.GetCounters().GetEmptyFilterFetchedBytes()->Add(batch.GetFetchedBytes());
+ context.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(context.GetPostFilterColumns()));
+ batch.InitBatch(nullptr);
+ } else {
+ context.GetCounters().GetFilteredRowsCount()->Add(batch.GetFilterBatch()->num_rows());
+ if (batch.AskedColumnsAlready(context.GetPostFilterColumns())) {
+ context.GetCounters().GetFilterOnlyCount()->Add(1);
+ context.GetCounters().GetFilterOnlyFetchedBytes()->Add(batch.GetFetchedBytes());
+ context.GetCounters().GetFilterOnlyUsefulBytes()->Add(batch.GetUsefulFetchedBytes());
+ context.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(context.GetPostFilterColumns()));
+
+ batch.InitBatch(batch.GetFilterBatch());
+ } else {
+ context.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes());
+ context.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetUsefulFetchedBytes());
+
+ batch.ResetWithFilter(context.GetPostFilterColumns());
+ if (batch.IsFetchingReady()) {
+ auto processor = context.GetTasksProcessor();
+ if (auto assembleBatchTask = batch.AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
+ processor.Add(context, assembleBatchTask);
+ }
+ }
+
+ context.GetCounters().GetTwoPhasesCount()->Add(1);
+ context.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes());
+ context.GetCounters().GetTwoPhasesPostFilterUsefulBytes()->Add(batch.GetUsefulWaitingBytes());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
+ ("filtered_count", batch.GetFilterBatch()->num_rows())
+ ("blobs_count", batch.GetWaitingBlobs().size())
+ ("columns_count", batch.GetCurrentColumnIds()->size())
+ ("fetch_size", batch.GetWaitingBytes())
+ ;
+ }
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h
new file mode 100644
index 00000000000..ab14fef95ca
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h
@@ -0,0 +1,112 @@
+#pragma once
+#include "granule.h"
+#include "read_metadata.h"
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TGranulesFillingContext;
+
+class IOrderPolicy {
+protected:
+ TReadMetadata::TConstPtr ReadMetadata;
+ virtual void DoFill(TGranulesFillingContext& context) = 0;
+ virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0;
+ virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) {
+ OnBatchFilterInitialized(batchInfo, context);
+ return true;
+ }
+
+ void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context);
+public:
+ using TPtr = std::shared_ptr<IOrderPolicy>;
+ virtual ~IOrderPolicy() = default;
+
+ IOrderPolicy(TReadMetadata::TConstPtr readMetadata)
+ : ReadMetadata(readMetadata)
+ {
+
+ }
+
+ virtual bool CanInterrupt() const {
+ return false;
+ }
+
+ bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) {
+ return DoOnFilterReady(batchInfo, granule, context);
+ }
+
+
+ virtual bool ReadyForAddNotIndexedToEnd() const = 0;
+
+ std::vector<TGranule*> DetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
+ return DoDetachReadyGranules(granulesToOut);
+ }
+
+ void Fill(TGranulesFillingContext& context) {
+ DoFill(context);
+ }
+};
+
+class TNonSorting: public IOrderPolicy {
+private:
+ using TBase = IOrderPolicy;
+protected:
+ virtual void DoFill(TGranulesFillingContext& /*context*/) override {
+ }
+
+ virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+public:
+ TNonSorting(TReadMetadata::TConstPtr readMetadata)
+ :TBase(readMetadata)
+ {
+
+ }
+
+ virtual bool ReadyForAddNotIndexedToEnd() const override {
+ return true;
+ }
+};
+
+class TAnySorting: public IOrderPolicy {
+private:
+ using TBase = IOrderPolicy;
+ std::deque<TGranule*> GranulesOutOrder;
+protected:
+ virtual void DoFill(TGranulesFillingContext& context) override;
+ virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+public:
+ TAnySorting(TReadMetadata::TConstPtr readMetadata)
+ :TBase(readMetadata) {
+
+ }
+ virtual bool ReadyForAddNotIndexedToEnd() const override {
+ return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
+ }
+};
+
+class TPKSortingWithLimit: public IOrderPolicy {
+private:
+ using TBase = IOrderPolicy;
+ std::deque<TGranule*> GranulesOutOrder;
+ std::deque<TGranule*> GranulesOutOrderForPortions;
+ THashMap<ui64, std::deque<TBatch*>> OrderedBatches;
+ ui32 CurrentItemsLimit = 0;
+protected:
+ virtual void DoFill(TGranulesFillingContext& context) override;
+ virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
+ virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override;
+public:
+ virtual bool CanInterrupt() const override {
+ return true;
+ }
+
+ TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
+ :TBase(readMetadata) {
+ CurrentItemsLimit = ReadMetadata->Limit;
+ }
+ virtual bool ReadyForAddNotIndexedToEnd() const override {
+ return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
index 45310d0d228..460dc1297d8 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
@@ -31,7 +31,7 @@ bool TAssembleBatch::DoExecuteImpl() {
return true;
}
-bool TAssembleBatch::DoApply(TIndexedReadData& owner) const {
+bool TAssembleBatch::DoApply(TGranulesFillingContext& owner) const {
TBatch& batch = owner.GetBatchInfo(BatchNo);
batch.InitBatch(FullBatch);
return true;
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
index 616b079e016..494b55e10f5 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
@@ -20,7 +20,7 @@ private:
const ui32 BatchNo;
protected:
- virtual bool DoApply(TIndexedReadData& owner) const override;
+ virtual bool DoApply(TGranulesFillingContext& owner) const override;
virtual bool DoExecuteImpl() override;
public:
TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor,
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
new file mode 100644
index 00000000000..eff5afd970a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -0,0 +1,112 @@
+#include "read_metadata.h"
+#include "order_controller.h"
+#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
+#include <ydb/core/tx/columnshard/columnshard__stats_scan.h>
+
+namespace NKikimr::NOlap {
+
+std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const {
+ return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters);
+}
+
+std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
+ std::set<ui32> result;
+ if (LessPredicate) {
+ for (auto&& i : LessPredicate->ColumnNames()) {
+ result.emplace(IndexInfo.GetColumnId(i));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
+ }
+ }
+ if (GreaterPredicate) {
+ for (auto&& i : GreaterPredicate->ColumnNames()) {
+ result.emplace(IndexInfo.GetColumnId(i));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
+ }
+ }
+ if (Program) {
+ for (auto&& i : Program->GetEarlyFilterColumns()) {
+ auto id = IndexInfo.GetColumnIdOptional(i);
+ if (id) {
+ result.emplace(*id);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
+ }
+ }
+ }
+ if (PlanStep) {
+ auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
+ for (auto&& i : snapSchema->fields()) {
+ result.emplace(IndexInfo.GetColumnId(i->name()));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name());
+ }
+ }
+ return result;
+}
+
+std::set<ui32> TReadMetadata::GetUsedColumnIds() const {
+ std::set<ui32> result;
+ if (PlanStep) {
+ auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
+ for (auto&& i : snapSchema->fields()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name());
+ result.emplace(IndexInfo.GetColumnId(i->name()));
+ }
+ }
+ for (auto&& f : LoadSchema->fields()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name());
+ result.emplace(IndexInfo.GetColumnId(f->name()));
+ }
+ return result;
+}
+
+TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetResultYqlSchema() const {
+ return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, ResultColumnIds);
+}
+
+TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYqlSchema() const {
+ return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns);
+}
+
+std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TScanCounters& /*scanCounters*/) const {
+ return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this());
+}
+
+void TReadStats::PrintToLog() {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
+ ("event", "statistic")
+ ("begin", BeginTimestamp)
+ ("selected", SelectedIndex)
+ ("index_granules", IndexGranules)
+ ("index_portions", IndexPortions)
+ ("index_batches", IndexBatches)
+ ("committed_batches", CommittedBatches)
+ ("schema_columns", SchemaColumns)
+ ("filter_columns", FilterColumns)
+ ("additional_columns", AdditionalColumns)
+ ("portions_bytes", PortionsBytes)
+ ("data_filter_bytes", DataFilterBytes)
+ ("data_additional_bytes", DataAdditionalBytes)
+ ("delta_bytes", PortionsBytes - DataFilterBytes - DataAdditionalBytes)
+ ("selected_rows", SelectedRows)
+ ;
+}
+
+NIndexedReader::IOrderPolicy::TPtr TReadMetadata::BuildSortingPolicy() const {
+ if (Limit && Sorting != ESorting::NONE && IndexInfo.IsSorted() && IndexInfo.GetSortingKey()->num_fields()) {
+ ui32 idx = 0;
+ for (auto&& i : IndexInfo.GetPrimaryKey()) {
+ if (idx >= IndexInfo.GetSortingKey()->fields().size()) {
+ break;
+ }
+ if (IndexInfo.GetSortingKey()->fields()[idx]->name() != i.first) {
+ return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this());
+ }
+ ++idx;
+ }
+
+ return std::make_shared<NIndexedReader::TPKSortingWithLimit>(this->shared_from_this());
+ } else {
+ return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this());
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
new file mode 100644
index 00000000000..7ea2898dadc
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -0,0 +1,204 @@
+#pragma once
+#include "conveyor_task.h"
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/counters.h>
+#include <ydb/core/tx/columnshard/columnshard__scan.h>
+#include <ydb/core/tx/columnshard/engines/predicate.h>
+#include <ydb/core/tx/columnshard/engines/column_engine.h>
+#include <ydb/core/scheme_types/scheme_type_info.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
+
+namespace NKikimr::NColumnShard {
+class TScanIteratorBase;
+}
+
+namespace NKikimr::NOlap {
+
+namespace NIndexedReader {
+class IOrderPolicy;
+}
+
+struct TReadStats {
+ TInstant BeginTimestamp;
+ ui32 SelectedIndex{0};
+ ui64 IndexGranules{0};
+ ui64 IndexPortions{0};
+ ui64 IndexBatches{0};
+ ui64 CommittedBatches{0};
+ ui64 PortionsBytes{ 0 };
+ ui64 DataFilterBytes{ 0 };
+ ui64 DataAdditionalBytes{ 0 };
+
+ ui32 SchemaColumns = 0;
+ ui32 FilterColumns = 0;
+ ui32 AdditionalColumns = 0;
+
+ ui32 SelectedRows = 0;
+
+ TReadStats(ui32 indexNo)
+ : BeginTimestamp(TInstant::Now())
+ , SelectedIndex(indexNo)
+ {}
+
+ void PrintToLog();
+
+ TDuration Duration() {
+ return TInstant::Now() - BeginTimestamp;
+ }
+};
+
+// Holds all metadata that is needed to perform read/scan
+struct TReadMetadataBase {
+ using TConstPtr = std::shared_ptr<const TReadMetadataBase>;
+
+ enum class ESorting {
+ NONE = 0,
+ ASC,
+ DESC,
+ };
+
+ virtual ~TReadMetadataBase() = default;
+
+ std::shared_ptr<NOlap::TPredicate> LessPredicate;
+ std::shared_ptr<NOlap::TPredicate> GreaterPredicate;
+ std::shared_ptr<arrow::Schema> BlobSchema;
+ std::shared_ptr<arrow::Schema> LoadSchema; // ResultSchema + required for intermediate operations
+ std::shared_ptr<arrow::Schema> ResultSchema; // TODO: add Program modifications
+ std::shared_ptr<NSsa::TProgram> Program;
+ std::shared_ptr<const THashSet<TUnifiedBlobId>> ExternBlobs;
+ ESorting Sorting{ESorting::ASC}; // Sorting inside returned batches
+ ui64 Limit{0}; // TODO
+
+ bool IsAscSorted() const { return Sorting == ESorting::ASC; }
+ bool IsDescSorted() const { return Sorting == ESorting::DESC; }
+ bool IsSorted() const { return IsAscSorted() || IsDescSorted(); }
+ void SetDescSorting() { Sorting = ESorting::DESC; }
+
+ virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0;
+ virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0;
+ virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const = 0;
+ virtual void Dump(IOutputStream& out) const { Y_UNUSED(out); };
+
+ // TODO: can this only be done for base class?
+ friend IOutputStream& operator << (IOutputStream& out, const TReadMetadataBase& meta) {
+ meta.Dump(out);
+ return out;
+ }
+};
+
+// Holds all metadata that is needed to perform read/scan
+struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadMetadata> {
+ using TConstPtr = std::shared_ptr<const TReadMetadata>;
+
+ TIndexInfo IndexInfo;
+ ui64 PlanStep = 0;
+ ui64 TxId = 0;
+ std::shared_ptr<TSelectInfo> SelectInfo;
+ std::vector<TCommittedBlob> CommittedBlobs;
+ THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches;
+ std::shared_ptr<TReadStats> ReadStats;
+
+ std::shared_ptr<NIndexedReader::IOrderPolicy> BuildSortingPolicy() const;
+
+ TReadMetadata(const TIndexInfo& info)
+ : IndexInfo(info)
+ , ReadStats(std::make_shared<TReadStats>(info.GetId()))
+ {}
+
+ std::vector<std::string> GetColumnsOrder() const {
+ std::vector<std::string> result;
+ for (auto&& i : LoadSchema->fields()) {
+ result.emplace_back(i->name());
+ }
+ return result;
+ }
+
+ std::set<ui32> GetEarlyFilterColumnIds() const;
+ std::set<ui32> GetUsedColumnIds() const;
+
+ bool Empty() const {
+ Y_VERIFY(SelectInfo);
+ return SelectInfo->Portions.empty() && CommittedBlobs.empty();
+ }
+
+ std::shared_ptr<arrow::Schema> GetSortingKey() const {
+ return IndexInfo.GetSortingKey();
+ }
+
+ std::shared_ptr<arrow::Schema> GetReplaceKey() const {
+ return IndexInfo.GetReplaceKey();
+ }
+
+ TVector<TNameTypeInfo> GetResultYqlSchema() const override {
+ TVector<NTable::TTag> columnIds;
+ columnIds.reserve(ResultSchema->num_fields());
+ for (const auto& field: ResultSchema->fields()) {
+ TString name = TStringBuilder() << field->name();
+ columnIds.emplace_back(IndexInfo.GetColumnId(name));
+ }
+ return IndexInfo.GetColumns(columnIds);
+ }
+
+ TVector<TNameTypeInfo> GetKeyYqlSchema() const override {
+ return IndexInfo.GetPrimaryKey();
+ }
+
+ size_t NumIndexedRecords() const {
+ Y_VERIFY(SelectInfo);
+ return SelectInfo->NumRecords();
+ }
+
+ size_t NumIndexedBlobs() const {
+ Y_VERIFY(SelectInfo);
+ return SelectInfo->Stats().Blobs;
+ }
+
+ std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
+
+ void Dump(IOutputStream& out) const override {
+ out << "columns: " << (LoadSchema ? LoadSchema->num_fields() : 0)
+ << " index records: " << NumIndexedRecords()
+ << " index blobs: " << NumIndexedBlobs()
+ << " committed blobs: " << CommittedBlobs.size()
+ << " with program steps: " << (Program ? Program->Steps.size() : 0)
+ << (Sorting == ESorting::NONE ? " not" : (Sorting == ESorting::ASC ? " asc" : " desc"))
+ << " sorted, at snapshot: " << PlanStep << ":" << TxId;
+ if (GreaterPredicate) {
+ out << " from{" << *GreaterPredicate << "}";
+ }
+ if (LessPredicate) {
+ out << " to{" << *LessPredicate << "}";
+ }
+ if (SelectInfo) {
+ out << ", " << *SelectInfo;
+ }
+ }
+
+ friend IOutputStream& operator << (IOutputStream& out, const TReadMetadata& meta) {
+ meta.Dump(out);
+ return out;
+ }
+};
+
+struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadStatsMetadata> {
+ using TConstPtr = std::shared_ptr<const TReadStatsMetadata>;
+
+ const ui64 TabletId;
+ TVector<ui32> ReadColumnIds;
+ TVector<ui32> ResultColumnIds;
+ THashMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats;
+
+ explicit TReadStatsMetadata(ui64 tabletId)
+ : TabletId(tabletId)
+ {}
+
+ TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const override;
+
+ TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override;
+
+ std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 358a52ac016..0d2b7e728f1 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -23,7 +23,7 @@ public:
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
, Result(std::move(event))
, ReadMetadata(readMetadata)
- , IndexedData(ReadMetadata, IndexedBlobsForFetch, true, counters)
+ , IndexedData(ReadMetadata, IndexedBlobsForFetch, true, counters, TDataTasksProcessorContainer())
, Deadline(deadline)
, ColumnShardActorId(columnShardActorId)
, RequestCookie(requestCookie)
@@ -51,7 +51,7 @@ public:
return; // ignore duplicate parts
}
WaitIndexed.erase(event.BlobRange);
- IndexedData.AddIndexed(event.BlobRange, event.Data, NColumnShard::TDataTasksProcessorContainer());
+ IndexedData.AddIndexed(event.BlobRange, event.Data);
} else if (CommittedBlobs.contains(blobId)) {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0});
if (cmt.empty()) {