aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-28 15:34:40 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-28 15:34:40 +0300
commit252edae5dce1381309d136306c615141a18407e3 (patch)
tree2985b31c9718562990cfdc4d9718b5a43d47db81
parentfeb79a8842a81474d42b9c460ffb7d93ef44184c (diff)
downloadydb-252edae5dce1381309d136306c615141a18407e3.tar.gz
skip blobs not filtered
-rw-r--r--ydb/core/formats/arrow_filter.h40
-rw-r--r--ydb/core/formats/arrow_helpers.cpp4
-rw-r--r--ydb/core/formats/arrow_helpers.h2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/columns_table.h4
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp30
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h12
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp36
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h32
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp67
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h109
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp54
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h6
19 files changed, 327 insertions, 127 deletions
diff --git a/ydb/core/formats/arrow_filter.h b/ydb/core/formats/arrow_filter.h
index 8e1e60c57b..43aaed58d0 100644
--- a/ydb/core/formats/arrow_filter.h
+++ b/ydb/core/formats/arrow_filter.h
@@ -40,6 +40,46 @@ private:
public:
+ class TIterator {
+ private:
+ ui32 InternalPosition = 0;
+ std::deque<ui32>::const_iterator It;
+ std::deque<ui32>::const_iterator ItEnd;
+ bool CurrentValue;
+ public:
+ TIterator(const std::deque<ui32>& filter, const bool startValue)
+ : It(filter.begin())
+ , ItEnd(filter.end())
+ , CurrentValue(startValue)
+ {
+
+ }
+
+ bool IsBatchForSkip(const ui32 size) const {
+ return !CurrentValue && (*It - InternalPosition) >= size;
+ }
+
+ bool Next(const ui32 size) {
+ ui32 sizeRemain = size;
+ while (It != ItEnd) {
+ if (*It - InternalPosition > sizeRemain) {
+ InternalPosition += sizeRemain;
+ return true;
+ } else {
+ sizeRemain -= *It - InternalPosition;
+ InternalPosition = 0;
+ CurrentValue = !CurrentValue;
+ ++It;
+ }
+ }
+ return false;
+ }
+ };
+
+ TIterator GetIterator() const {
+ return TIterator(Filter, GetStartValue());
+ }
+
TColumnFilter(std::vector<bool>&& values) {
const ui32 count = values.size();
Reset(count, std::move(values));
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index db47cb93bc..c1f2165b1d 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -225,12 +225,12 @@ std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const
return *batch;
}
-std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema) {
+std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema, const ui32 rowsCount) {
std::vector<std::shared_ptr<arrow::Array>> columns;
columns.reserve(schema->num_fields());
for (auto& field : schema->fields()) {
- auto result = arrow::MakeArrayOfNull(field->type(), 0);
+ auto result = arrow::MakeArrayOfNull(field->type(), rowsCount);
Y_VERIFY_OK(result.status());
columns.emplace_back(*result);
Y_VERIFY(columns.back());
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 3d6f060171..d96fd11b0a 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -70,7 +70,7 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b
std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob,
const std::shared_ptr<arrow::Schema>& schema);
-std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema);
+std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema, const ui32 rowsCount = 0);
std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const std::vector<TString>& columnNames);
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index b408adb99a..56497ad8b0 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -346,7 +346,7 @@ private:
}
// Switch to the next range if the current one is finished
- if (ScanIterator && ScanIterator->Finished() && !InFlightReads) {
+ if (ScanIterator && ScanIterator->Finished()) {
NextReadMetadata();
}
diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h
index 99021dcde3..8ad4b38abd 100644
--- a/ydb/core/tx/columnshard/engines/columns_table.h
+++ b/ydb/core/tx/columnshard/engines/columns_table.h
@@ -19,6 +19,10 @@ struct TColumnRecord {
TBlobRange BlobRange;
TString Metadata;
+ ui32 GetRowsCount() const {
+ return 0;
+ }
+
bool operator == (const TColumnRecord& rec) const {
return (Granule == rec.Granule) && (ColumnId == rec.ColumnId) &&
(PlanStep == rec.PlanStep) && (TxId == rec.TxId) && (Portion == rec.Portion) && (Chunk == rec.Chunk);
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index e2a041af03..441089d748 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -54,11 +54,18 @@ std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaSnapshot() {
}
bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) {
- const auto& name = field.name();
- return (name == SPEC_COL_PLAN_STEP)
- || (name == SPEC_COL_TX_ID);
+ return IsSpecialColumn(field.name());
}
+bool TIndexInfo::IsSpecialColumn(const std::string& fieldName) {
+ return fieldName == SPEC_COL_PLAN_STEP
+ || fieldName == SPEC_COL_TX_ID;
+}
+
+bool TIndexInfo::IsSpecialColumn(const ui32 fieldId) {
+ return fieldId == (ui32)ESpecialColumn::PLAN_STEP
+ || fieldId == (ui32)ESpecialColumn::TX_ID;
+}
ui32 TIndexInfo::GetColumnId(const std::string& name) const {
auto id = GetColumnIdOptional(name);
@@ -69,16 +76,15 @@ ui32 TIndexInfo::GetColumnId(const std::string& name) const {
std::optional<ui32> TIndexInfo::GetColumnIdOptional(const std::string& name) const {
const auto ni = ColumnNames.find(name);
- if (ni == ColumnNames.end()) {
- if (name == SPEC_COL_PLAN_STEP) {
- return ui32(ESpecialColumn::PLAN_STEP);
- } else if (name == SPEC_COL_TX_ID) {
- return ui32(ESpecialColumn::TX_ID);
- }
- return {};
+ if (ni != ColumnNames.end()) {
+ return ni->second;
}
-
- return ni->second;
+ if (name == SPEC_COL_PLAN_STEP) {
+ return ui32(ESpecialColumn::PLAN_STEP);
+ } else if (name == SPEC_COL_TX_ID) {
+ return ui32(ESpecialColumn::TX_ID);
+ }
+ return {};
}
TString TIndexInfo::GetColumnName(ui32 id, bool required) const {
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index d9b96bc902..c6af3b0d9b 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -48,7 +48,17 @@ public:
/// Matches name of the filed with names of the special columns.
static bool IsSpecialColumn(const arrow::Field& field);
-
+ static bool IsSpecialColumn(const ui32 field);
+ static bool IsSpecialColumn(const std::string& fieldName);
+ template <class TContainer>
+ static bool IsSpecialColumns(const TContainer& c) {
+ for (auto&& i : c) {
+ if (!IsSpecialColumn(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
public:
TIndexInfo(const TString& name, ui32 id);
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 91e6d4d010..2fa4c30922 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -117,7 +117,7 @@ std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColum
return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters);
}
-std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) const {
+std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
std::set<ui32> result;
if (LessPredicate) {
for (auto&& i : LessPredicate->ColumnNames()) {
@@ -140,9 +140,6 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) cons
}
}
}
- if (noTrivial && result.empty()) {
- return result;
- }
if (PlanStep) {
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
for (auto&& i : snapSchema->fields()) {
@@ -195,6 +192,19 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::
}
}
+bool TIndexedReadData::PredictManyResultsAfterFilter(const TPortionInfo& portionInfo) const {
+ if (!portionInfo.AllowEarlyFilter()) {
+ return true;
+ }
+ if (EarlyFilterColumns.empty()) {
+ return true;
+ }
+ if (TIndexInfo::IsSpecialColumns(EarlyFilterColumns)) {
+ return true;
+ }
+ return false;
+}
+
void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) {
Y_VERIFY(ReadMetadata->BlobSchema);
Y_VERIFY(ReadMetadata->LoadSchema);
@@ -222,10 +232,10 @@ void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) {
}
NIndexedReader::TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo);
- if (portionInfo.AllowEarlyFilter()) {
- currentBatch.Reset(&EarlyFilterColumns);
+ if (!OnePhaseReadMode && !PredictManyResultsAfterFilter(portionInfo)) {
+ currentBatch.ResetNoFilter(&EarlyFilterColumns);
} else {
- currentBatch.Reset(&UsedColumns);
+ currentBatch.ResetNoFilter(&UsedColumns);
}
Batches[batchNo] = &currentBatch;
++batchNo;
@@ -573,17 +583,13 @@ TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata,
: Counters(counters)
, FetchBlobsQueue(fetchBlobsQueue)
, ReadMetadata(readMetadata)
+ , OnePhaseReadMode(internalRead)
{
UsedColumns = ReadMetadata->GetUsedColumnIds();
PostFilterColumns = ReadMetadata->GetUsedColumnIds();
- EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(true);
- if (internalRead || EarlyFilterColumns.empty()) {
- EarlyFilterColumns = PostFilterColumns;
- PostFilterColumns.clear();
- } else {
- for (auto&& i : EarlyFilterColumns) {
- PostFilterColumns.erase(i);
- }
+ EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds();
+ for (auto&& i : EarlyFilterColumns) {
+ PostFilterColumns.erase(i);
}
Y_VERIFY(ReadMetadata->SelectInfo);
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 3a4c3c430d..b33355b129 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -126,7 +126,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_
return result;
}
- std::set<ui32> GetEarlyFilterColumnIds(const bool noTrivial) const;
+ std::set<ui32> GetEarlyFilterColumnIds() const;
std::set<ui32> GetUsedColumnIds() const;
bool Empty() const {
@@ -225,15 +225,14 @@ struct TPartialReadResult {
class TIndexedReadData {
private:
- std::set<ui32> EarlyFilterColumns;
- std::set<ui32> UsedColumns;
+ YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns);
YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns);
+ std::set<ui32> UsedColumns;
bool AbortedFlag = false;
YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters);
std::vector<NIndexedReader::TBatch*> Batches;
TFetchBlobsQueue& FetchBlobsQueue;
- friend class NIndexedReader::TBatch;
- friend class NIndexedReader::TGranule;
+ bool PredictManyResultsAfterFilter(const TPortionInfo& portionInfo) const;
public:
TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, const bool internalRead, const NColumnShard::TScanCounters& counters);
@@ -272,17 +271,11 @@ public:
Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size());
Y_VERIFY(!IsInProgress());
}
-private:
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
-
- void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
- void OnGranuleReady(NIndexedReader::TGranule& granule) {
- Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second);
- Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag);
+ NOlap::TReadMetadata::TConstPtr GetReadMetadata() const {
+ return ReadMetadata;
}
+ void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
if (batch && batch->num_rows()) {
ReadMetadata->ReadStats->SelectedRows += batch->num_rows();
@@ -295,6 +288,17 @@ private:
}
}
+ void OnGranuleReady(NIndexedReader::TGranule& granule) {
+ Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second);
+ Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag);
+ }
+
+private:
+ NOlap::TReadMetadata::TConstPtr ReadMetadata;
+ bool OnePhaseReadMode = false;
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
+
THashSet<const void*> BatchesToDedup;
THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch
THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut;
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 73035b6c24..c0d34a36c5 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -29,62 +29,6 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr
return blob;
}
-TPortionInfo::TPreparedBatchData TPortionInfo::PrepareForAssemble(const TIndexInfo& indexInfo,
- const std::shared_ptr<arrow::Schema>& schema,
- const THashMap<TBlobRange, TString>& blobsData, const std::optional<std::set<ui32>>& columnIds) const {
- // Correct records order
- TMap<int, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
-
- std::vector<std::shared_ptr<arrow::Field>> schemaFields;
-
- for (auto&& i : schema->fields()) {
- if (columnIds && !columnIds->contains(indexInfo.GetColumnId(i->name()))) {
- continue;
- }
- schemaFields.emplace_back(i);
- }
-
- for (auto& rec : Records) {
- if (columnIds && !columnIds->contains(rec.ColumnId)) {
- continue;
- }
- ui32 columnId = rec.ColumnId;
- TString columnName = indexInfo.GetColumnName(columnId);
- std::string name(columnName.data(), columnName.size());
- int pos = schema->GetFieldIndex(name);
- if (pos < 0) {
- continue; // no such column in schema - do not need it
- }
-
- columnChunks[pos][rec.Chunk] = rec.BlobRange;
- }
-
- // Make chunked arrays for columns
- std::vector<TPreparedColumn> columns;
- columns.reserve(columnChunks.size());
-
- for (auto& [pos, orderedChunks] : columnChunks) {
- auto field = schema->field(pos);
-
- TVector<TString> blobs;
- blobs.reserve(orderedChunks.size());
- ui32 expected = 0;
- for (auto& [chunk, blobId] : orderedChunks) {
- Y_VERIFY(chunk == expected);
- ++expected;
-
- auto it = blobsData.find(blobId);
- Y_VERIFY(it != blobsData.end());
- TString data = it->second;
- blobs.push_back(data);
- }
-
- columns.emplace_back(TPreparedColumn(field, std::move(blobs)));
- }
-
- return TPreparedBatchData(std::move(columns), std::make_shared<arrow::Schema>(schemaFields));
-}
-
void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) {
Y_VERIFY(column->length());
@@ -252,7 +196,7 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con
ui32 count = 0;
if (!reverse) {
for (auto& blob : Blobs) {
- batches.push_back(NArrow::DeserializeBatch(blob, schema));
+ batches.push_back(blob.BuildRecordBatch(schema));
Y_VERIFY(batches.back());
if (count + batches.back()->num_rows() >= needCount) {
Y_VERIFY(count <= needCount);
@@ -266,7 +210,7 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con
}
} else {
for (auto it = Blobs.rbegin(); it != Blobs.rend(); ++it) {
- batches.push_back(NArrow::DeserializeBatch(*it, schema));
+ batches.push_back(it->BuildRecordBatch(schema));
Y_VERIFY(batches.back());
if (count + batches.back()->num_rows() >= needCount) {
Y_VERIFY(count <= needCount);
@@ -288,11 +232,16 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con
std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
+ std::vector< std::shared_ptr<arrow::Field>> fields;
for (auto&& i : Columns) {
+ if (!options.IsAcceptedColumn(i.GetColumnId())) {
+ continue;
+ }
columns.emplace_back(i.Assemble(options.GetRecordsCountLimitDef(Max<ui32>()), !options.IsForwardAssemble()));
+ fields.emplace_back(i.GetField());
}
- auto table = arrow::Table::Make(Schema, columns);
+ auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns);
auto res = table->CombineChunks();
Y_VERIFY(res.ok());
return NArrow::ToBatch(*res);
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index db2123bc94..ed6a188055 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -203,17 +203,49 @@ struct TPortionInfo {
return Meta.ColumnMeta.find(columnId)->second.HasMinMax();
}
+ class TAssembleBlobInfo {
+ private:
+ YDB_READONLY(ui32, NullRowsCount, 0);
+ YDB_READONLY_DEF(TString, Data);
+ public:
+ TAssembleBlobInfo(const ui32 rowsCount)
+ : NullRowsCount(rowsCount) {
+
+ }
+
+ TAssembleBlobInfo(const TString& data)
+ : Data(data) {
+
+ }
+
+ std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(std::shared_ptr<arrow::Schema> schema) const {
+ if (NullRowsCount) {
+ Y_VERIFY(!Data);
+ return NArrow::MakeEmptyBatch(schema, NullRowsCount);
+ } else {
+ Y_VERIFY(Data);
+ return NArrow::DeserializeBatch(Data, schema);
+ }
+ }
+ };
+
class TPreparedColumn {
private:
+ YDB_READONLY(ui32, ColumnId, 0);
std::shared_ptr<arrow::Field> Field;
- std::vector<TString> Blobs;
+ std::vector<TAssembleBlobInfo> Blobs;
public:
const std::string& GetName() const {
return Field->name();
}
- TPreparedColumn(const std::shared_ptr<arrow::Field>& field, std::vector<TString>&& blobs)
- : Field(field)
+ std::shared_ptr<arrow::Field> GetField() const {
+ return Field;
+ }
+
+ TPreparedColumn(const std::shared_ptr<arrow::Field>& field, std::vector<TAssembleBlobInfo>&& blobs, const ui32 columnId)
+ : ColumnId(columnId)
+ , Field(field)
, Blobs(std::move(blobs))
{
@@ -229,11 +261,26 @@ struct TPortionInfo {
public:
+ std::vector<std::string> GetSchemaColumnNames() const {
+ return Schema->field_names();
+ }
+
class TAssembleOptions {
private:
YDB_OPT(ui32, RecordsCountLimit);
YDB_FLAG_ACCESSOR(ForwardAssemble, true);
+ YDB_OPT(std::set<ui32>, IncludedColumnIds);
+ YDB_OPT(std::set<ui32>, ExcludedColumnIds);
public:
+ bool IsAcceptedColumn(const ui32 columnId) const {
+ if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) {
+ return false;
+ }
+ if (ExcludedColumnIds && ExcludedColumnIds->contains(columnId)) {
+ return false;
+ }
+ return true;
+ }
};
size_t GetColumnsCount() const {
@@ -250,9 +297,61 @@ struct TPortionInfo {
std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = Default<TAssembleOptions>()) const;
};
+ template <class TExternalBlobInfo>
TPreparedBatchData PrepareForAssemble(const TIndexInfo& indexInfo,
- const std::shared_ptr<arrow::Schema>& schema,
- const THashMap<TBlobRange, TString>& data, const std::optional<std::set<ui32>>& columnIds) const;
+ const std::shared_ptr<arrow::Schema>& schema,
+ const THashMap<TBlobRange, TExternalBlobInfo>& blobsData, const std::optional<std::set<ui32>>& columnIds) const {
+ // Correct records order
+ TMap<int, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
+
+ std::vector<std::shared_ptr<arrow::Field>> schemaFields;
+
+ for (auto&& i : schema->fields()) {
+ if (columnIds && !columnIds->contains(indexInfo.GetColumnId(i->name()))) {
+ continue;
+ }
+ schemaFields.emplace_back(i);
+ }
+
+ for (auto& rec : Records) {
+ if (columnIds && !columnIds->contains(rec.ColumnId)) {
+ continue;
+ }
+ ui32 columnId = rec.ColumnId;
+ TString columnName = indexInfo.GetColumnName(columnId);
+ std::string name(columnName.data(), columnName.size());
+ int pos = schema->GetFieldIndex(name);
+ if (pos < 0) {
+ continue; // no such column in schema - do not need it
+ }
+
+ columnChunks[pos][rec.Chunk] = rec.BlobRange;
+ }
+
+ // Make chunked arrays for columns
+ std::vector<TPreparedColumn> columns;
+ columns.reserve(columnChunks.size());
+
+ for (auto& [pos, orderedChunks] : columnChunks) {
+ auto field = schema->field(pos);
+ TVector<TAssembleBlobInfo> blobs;
+ blobs.reserve(orderedChunks.size());
+ ui32 expected = 0;
+ for (auto& [chunk, blobRange] : orderedChunks) {
+ Y_VERIFY(chunk == expected);
+ ++expected;
+
+ auto it = blobsData.find(blobRange);
+ Y_VERIFY(it != blobsData.end());
+ blobs.emplace_back(it->second);
+ }
+
+ columns.emplace_back(TPreparedColumn(field, std::move(blobs), indexInfo.GetColumnId(field->name())));
+ }
+
+ return TPreparedBatchData(std::move(columns), std::make_shared<arrow::Schema>(schemaFields));
+ }
+
std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const TIndexInfo& indexInfo,
const std::shared_ptr<arrow::Schema>& schema,
const THashMap<TBlobRange, TString>& data) const {
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index c6baead8e9..3ac9a61cfe 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -29,7 +29,7 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard
auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data, CurrentColumnIds);
Data.clear();
if (!Filter) {
- return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, *this, PortionInfo->AllowEarlyFilter(), processor);
+ return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, *this, PortionInfo->AllowEarlyFilter(), Owner->GetEarlyFilterColumns(), processor);
} else {
Y_VERIFY(FilterBatch);
return std::make_shared<TAssembleBatch>(std::move(batchConstructor), *this, readMetadata->GetColumnsOrder(), processor);
@@ -65,7 +65,7 @@ ui64 TBatch::GetFetchBytes(const std::set<ui32>* columnIds) {
return result;
}
-void TBatch::Reset(const std::set<ui32>* columnIds) {
+void TBatch::ResetCommon(const std::set<ui32>* columnIds) {
if (!columnIds) {
CurrentColumnIds.reset();
} else {
@@ -80,13 +80,18 @@ void TBatch::Reset(const std::set<ui32>* columnIds) {
Y_VERIFY(Data.empty());
WaitingBytes = 0;
FetchedBytes = 0;
+}
+
+void TBatch::ResetNoFilter(const std::set<ui32>* columnIds) {
+ Y_VERIFY(!Filter);
+ ResetCommon(columnIds);
for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
continue;
}
AskedColumnIds.emplace(rec.ColumnId);
Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second);
- Owner->Owner->AddBlobForFetch(rec.BlobRange, *this);
+ Owner->AddBlobForFetch(rec.BlobRange, *this);
Y_VERIFY(rec.Portion == Portion);
Y_VERIFY(rec.Valid());
Y_VERIFY(Granule == rec.Granule);
@@ -94,6 +99,46 @@ void TBatch::Reset(const std::set<ui32>* columnIds) {
}
}
+void TBatch::ResetWithFilter(const std::set<ui32>* columnIds) {
+ Y_VERIFY(Filter);
+ ResetCommon(columnIds);
+ std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects;
+ for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
+ if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
+ continue;
+ }
+ AskedColumnIds.emplace(rec.ColumnId);
+ orderedObjects[rec.ColumnId][rec.Chunk] = &rec;
+ Y_VERIFY(rec.Valid());
+ Y_VERIFY(Portion == rec.Portion);
+ Y_VERIFY(Granule == rec.Granule);
+ }
+
+ for (auto&& columnInfo : orderedObjects) {
+ ui32 expected = 0;
+ auto it = Filter->GetIterator();
+ bool undefinedShift = false;
+ bool itFinished = false;
+ for (auto&& [chunk, rec] : columnInfo.second) {
+ Y_VERIFY(!itFinished);
+ Y_VERIFY(expected++ == chunk);
+ if (!rec->GetRowsCount()) {
+ undefinedShift = true;
+ }
+ if (!undefinedShift && it.IsBatchForSkip(rec->GetRowsCount())) {
+ Data.emplace(rec->BlobRange, TPortionInfo::TAssembleBlobInfo(rec->GetRowsCount()));
+ } else {
+ Y_VERIFY(WaitIndexed.emplace(rec->BlobRange).second);
+ Owner->AddBlobForFetch(rec->BlobRange, *this);
+ WaitingBytes += rec->BlobRange.Size;
+ }
+ if (!undefinedShift) {
+ itFinished = !it.Next(rec->GetRowsCount());
+ }
+ }
+ }
+}
+
void TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch) {
Y_VERIFY(filter);
Y_VERIFY(!Filter);
@@ -110,11 +155,12 @@ void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) {
bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) {
if (!WaitIndexed.erase(bRange)) {
+ Y_ASSERT(false);
return false;
}
WaitingBytes -= bRange.Size;
FetchedBytes += bRange.Size;
- Data.emplace(bRange, blobData);
+ Data.emplace(bRange, TPortionInfo::TAssembleBlobInfo(blobData));
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index d6dd76809c..55f3417d26 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -30,20 +30,20 @@ private:
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch);
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
- THashMap<TBlobRange, TString> Data;
- TGranule* Owner = nullptr;
+ THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
+ TGranule* Owner;
const TPortionInfo* PortionInfo = nullptr;
- friend class TGranule;
- TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo);
-
YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds);
std::set<ui32> AskedColumnIds;
+ void ResetCommon(const std::set<ui32>* columnIds);
public:
+ TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo);
bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData);
bool AskedColumnsAlready(const std::set<ui32>& columnIds) const;
- void Reset(const std::set<ui32>* columnIds);
+ void ResetNoFilter(const std::set<ui32>* columnIds);
+ void ResetWithFilter(const std::set<ui32>* columnIds);
ui64 GetFetchBytes(const std::set<ui32>* columnIds);
void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch);
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
index c013266d6f..c3738a9c66 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
@@ -22,6 +22,10 @@ bool IDataTasksProcessor::ITask::Apply(NOlap::TIndexedReadData& indexedDataRead)
return DoApply(indexedDataRead);
}
+TDataTasksProcessorContainer IDataTasksProcessor::ITask::GetTasksProcessorContainer() const {
+ return TDataTasksProcessorContainer(OwnerOperator);
+}
+
bool IDataTasksProcessor::Add(ITask::TPtr task) {
if (IsStopped()) {
return false;
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
index efb808cdac..90a8f98a10 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
@@ -8,7 +8,7 @@ class TIndexedReadData;
namespace NKikimr::NColumnShard {
-class IDataTasksProcessor;
+class TDataTasksProcessorContainer;
class IDataTasksProcessor {
private:
@@ -22,6 +22,7 @@ public:
std::shared_ptr<IDataTasksProcessor> OwnerOperator;
YDB_READONLY_FLAG(DataProcessed, false);
protected:
+ TDataTasksProcessorContainer GetTasksProcessorContainer() const;
virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0;
virtual bool DoExecuteImpl() = 0;
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index 5039b0405d..1ae809aa49 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -7,7 +7,10 @@ bool TAssembleFilter::DoExecuteImpl() {
/// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey.
/// It's not OK to apply predicate before replacing key duplicates otherwise.
/// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
- auto batch = BatchConstructor.Assemble();
+
+ TPortionInfo::TPreparedBatchData::TAssembleOptions options;
+ options.SetIncludedColumnIds(FilterColumnIds);
+ auto batch = BatchConstructor.Assemble(options);
Y_VERIFY(batch);
Y_VERIFY(batch->num_rows());
OriginalCount = batch->num_rows();
@@ -26,8 +29,18 @@ bool TAssembleFilter::DoExecuteImpl() {
return true;
}
}
+
+ if ((size_t)batch->schema()->num_fields() < BatchConstructor.GetColumnsCount()) {
+ TPortionInfo::TPreparedBatchData::TAssembleOptions options;
+ options.SetExcludedColumnIds(FilterColumnIds);
+ auto addBatch = BatchConstructor.Assemble(options);
+ Y_VERIFY(addBatch);
+ Y_VERIFY(Filter->Apply(addBatch));
+ Y_VERIFY(NArrow::MergeBatchColumns({ batch, addBatch }, batch, BatchConstructor.GetSchemaColumnNames(), true));
+ }
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")
- ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter);
+ ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter)
+ ("filter_columns", FilterColumnIds.size());
FilteredBatch = batch;
return true;
@@ -57,7 +70,13 @@ bool TAssembleFilter::DoApply(TIndexedReadData& owner) const {
owner.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes());
owner.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount);
- batch.Reset(&owner.GetPostFilterColumns());
+ batch.ResetWithFilter(&owner.GetPostFilterColumns());
+ if (batch.IsFetchingReady()) {
+ auto processor = GetTasksProcessorContainer();
+ if (auto assembleBatchTask = batch.AssembleTask(processor.GetObject(), owner.GetReadMetadata())) {
+ processor.Add(owner, assembleBatchTask);
+ }
+ }
owner.GetCounters().GetTwoPhasesCount()->Add(1);
owner.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes());
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
index 902276896f..7ec1b9af0c 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
@@ -19,17 +19,19 @@ namespace NKikimr::NOlap::NIndexedReader {
const ui32 BatchNo;
ui32 OriginalCount = 0;
bool AllowEarlyFilter = false;
+ std::set<ui32> FilterColumnIds;
protected:
virtual bool DoApply(TIndexedReadData& owner) const override;
virtual bool DoExecuteImpl() override;
public:
TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
- TBatch& batch, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor)
+ TBatch& batch, const bool allowEarlyFilter, const std::set<ui32>& filterColumnIds, NColumnShard::IDataTasksProcessor::TPtr processor)
: TBase(processor)
, BatchConstructor(batchConstructor)
, ReadMetadata(readMetadata)
, BatchNo(batch.GetBatchNo())
, AllowEarlyFilter(allowEarlyFilter)
+ , FilterColumnIds(filterColumnIds)
{
TBase::SetPriority(TBase::EPriority::Normal);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index f92bf1b482..e7c4d80f82 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -25,4 +25,12 @@ NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, c
return infoEmplace.first->second;
}
+void TGranule::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const {
+ Owner->AddBlobForFetch(range, batch);
+}
+
+const std::set<ui32>& TGranule::GetEarlyFilterColumns() const {
+ return Owner->GetEarlyFilterColumns();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index cad0a9318e..6c32ea426e 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -17,8 +17,6 @@ private:
THashMap<ui32, TBatch> Batches;
std::set<ui32> WaitBatches;
TIndexedReadData* Owner = nullptr;
- void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
- friend class NIndexedReader::TBatch;
public:
TGranule(const ui64 granuleId, TIndexedReadData& owner)
: GranuleId(granuleId)
@@ -26,7 +24,11 @@ public:
}
+ const std::set<ui32>& GetEarlyFilterColumns() const;
+ void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo);
+ void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const;
+
};
}