aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-04 15:59:39 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-04 16:14:22 +0300
commitb5c60b12845080cf7ca566be672cc94b51e4cba7 (patch)
tree076f894b2a95f78d5828c9c82f72050352175b45
parent5b0a5ace977f66c6e1b72587aebf7a7f7d960412 (diff)
downloadydb-b5c60b12845080cf7ca566be672cc94b51e4cba7.tar.gz
KIKIMR-19988: improve chunks merging. table using in some cases with early filter
-rw-r--r--ydb/core/formats/arrow/arrow_filter.cpp36
-rw-r--r--ydb/core/formats/arrow/arrow_filter.h12
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp43
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.h3
-rw-r--r--ydb/core/formats/arrow/program.cpp131
-rw-r--r--ydb/core/formats/arrow/program.h9
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h4
-rw-r--r--ydb/core/tx/columnshard/engines/filter.cpp109
-rw-r--r--ydb/core/tx/columnshard/engines/filter.h10
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/container.h2
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/filter.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/filter.h2
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/range.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/range.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp14
-rw-r--r--ydb/core/tx/program/program.cpp9
-rw-r--r--ydb/core/tx/program/program.h3
18 files changed, 283 insertions, 114 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp
index 978d55f4bff..559b33aab8a 100644
--- a/ydb/core/formats/arrow/arrow_filter.cpp
+++ b/ydb/core/formats/arrow/arrow_filter.cpp
@@ -195,8 +195,7 @@ std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter(const ui32
auto res = builder.Reserve(expectedSize);
Y_VERIFY_OK(builder.AppendValues(BuildSimpleFilter(expectedSize)));
std::shared_ptr<arrow::BooleanArray> out;
- res = builder.Finish(&out);
- Y_VERIFY_OK(res);
+ TStatusValidator::Validate(builder.Finish(&out));
return out;
}
@@ -301,23 +300,40 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D
return NArrow::TColumnFilter(std::move(bits));
}
-bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) {
+template <arrow::Datum::Kind kindExpected, class TData>
+bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch) {
if (!batch || !batch->num_rows()) {
return false;
}
- Y_VERIFY_S(Filter.empty() || Count == (size_t)batch->num_rows(), Count << " != " << batch->num_rows());
- if (IsTotalDenyFilter()) {
+ AFL_VERIFY(filter.IsEmpty() || filter.Size() == (size_t)batch->num_rows())("filter_size", filter.Size())("batch_size", batch->num_rows());
+ if (filter.IsTotalDenyFilter()) {
batch = batch->Slice(0, 0);
return false;
}
- if (IsTotalAllowFilter()) {
+ if (filter.IsTotalAllowFilter()) {
return true;
}
- auto res = arrow::compute::Filter(batch, BuildArrowFilter(batch->num_rows()));
+ auto res = arrow::compute::Filter(batch, filter.BuildArrowFilter(batch->num_rows()));
Y_VERIFY_S(res.ok(), res.status().message());
- Y_ABORT_UNLESS((*res).kind() == arrow::Datum::RECORD_BATCH);
- batch = (*res).record_batch();
- return batch->num_rows();
+ Y_ABORT_UNLESS((*res).kind() == kindExpected);
+ if constexpr (kindExpected == arrow::Datum::TABLE) {
+ batch = (*res).table();
+ return batch->num_rows();
+ }
+ if constexpr (kindExpected == arrow::Datum::RECORD_BATCH) {
+ batch = (*res).record_batch();
+ return batch->num_rows();
+ }
+ AFL_VERIFY(false);
+ return false;
+}
+
+bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch) {
+ return ApplyImpl<arrow::Datum::TABLE>(*this, batch);
+}
+
+bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) {
+ return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch);
}
const std::vector<bool>& TColumnFilter::BuildSimpleFilter(const ui32 expectedSize) const {
diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h
index fa467648a09..6770d76e802 100644
--- a/ydb/core/formats/arrow/arrow_filter.h
+++ b/ydb/core/formats/arrow/arrow_filter.h
@@ -127,8 +127,8 @@ public:
return;
}
bool currentValue = getter[0];
- ui32 sameValueCount = 0;
- for (ui32 i = 0; i < count; ++i) {
+ ui32 sameValueCount = 1;
+ for (ui32 i = 1; i < count; ++i) {
if (getter[i] != currentValue) {
Add(currentValue, sameValueCount);
sameValueCount = 0;
@@ -148,8 +148,10 @@ public:
std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(const ui32 expectedSize) const;
bool IsTotalAllowFilter() const;
-
bool IsTotalDenyFilter() const;
+ bool IsEmpty() const {
+ return Filter.empty();
+ }
static TColumnFilter BuildStopFilter() {
return TColumnFilter(false);
@@ -163,9 +165,9 @@ public:
TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
// It makes a filter using composite predicate
- static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border,
- ECompareType compareType);
+ static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);
+ bool Apply(std::shared_ptr<arrow::Table>& batch);
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch);
// Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions)
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index cc554adee8e..b671d263668 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -281,8 +281,16 @@ std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared
return table ? ToBatch(table) : nullptr;
}
-std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& table) {
- Y_ABORT_UNLESS(table);
+std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& tableExt, const bool combine) {
+ Y_ABORT_UNLESS(tableExt);
+ std::shared_ptr<arrow::Table> table;
+ if (combine) {
+ auto res = tableExt->CombineChunks();
+ Y_ABORT_UNLESS(res.ok());
+ table = *res;
+ } else {
+ table = tableExt;
+ }
std::vector<std::shared_ptr<arrow::Array>> columns;
columns.reserve(table->num_columns());
for (auto& col : table->columns()) {
@@ -755,8 +763,9 @@ bool ReserveData(arrow::ArrayBuilder& builder, const size_t size) {
return result.ok();
}
-bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result,
- const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary) {
+template <class TData, class TColumn, class TBuilder>
+bool MergeBatchColumnsImpl(const std::vector<std::shared_ptr<TData>>& batches, std::shared_ptr<TData>& result,
+ const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary, const TBuilder& builder) {
if (batches.empty()) {
result = nullptr;
return true;
@@ -766,7 +775,7 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b
return true;
}
std::vector<std::shared_ptr<arrow::Field>> fields;
- std::vector<std::shared_ptr<arrow::Array>> columns;
+ std::vector<std::shared_ptr<TColumn>> columns;
std::map<std::string, ui32> fieldNames;
for (auto&& i : batches) {
Y_ABORT_UNLESS(i);
@@ -785,11 +794,9 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b
}
Y_ABORT_UNLESS(fields.size() == columns.size());
- if (columnsOrder.empty()) {
- result = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), batches.front()->num_rows(), columns);
- } else {
+ if (columnsOrder.size()) {
std::vector<std::shared_ptr<arrow::Field>> fieldsOrdered;
- std::vector<std::shared_ptr<arrow::Array>> columnsOrdered;
+ std::vector<std::shared_ptr<TColumn>> columnsOrdered;
for (auto&& i : columnsOrder) {
auto it = fieldNames.find(i);
if (orderFieldsAreNecessary) {
@@ -803,10 +810,26 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b
std::swap(fieldsOrdered, fields);
std::swap(columnsOrdered, columns);
}
- result = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), batches.front()->num_rows(), columns);
+ result = builder(std::make_shared<arrow::Schema>(fields), batches.front()->num_rows(), std::move(columns));
return true;
}
+bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::Table>>& batches, std::shared_ptr<arrow::Table>& result, const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary) {
+ const auto builder = [](const std::shared_ptr<arrow::Schema>& schema, const ui32 recordsCount, std::vector<std::shared_ptr<arrow::ChunkedArray>>&& columns) {
+ return arrow::Table::Make(schema, columns, recordsCount);
+ };
+
+ return MergeBatchColumnsImpl<arrow::Table, arrow::ChunkedArray>(batches, result, columnsOrder, orderFieldsAreNecessary, builder);
+}
+
+bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary) {
+ const auto builder = [](const std::shared_ptr<arrow::Schema>& schema, const ui32 recordsCount, std::vector<std::shared_ptr<arrow::Array>>&& columns) {
+ return arrow::RecordBatch::Make(schema, recordsCount, columns);
+ };
+
+ return MergeBatchColumnsImpl<arrow::RecordBatch, arrow::Array>(batches, result, columnsOrder, orderFieldsAreNecessary, builder);
+}
+
std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow) {
return TRawReplaceKey(&x, xRow).CompareNotNull(TRawReplaceKey(&y, yRow));
}
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index fe4bd4c3c02..84aba485fa7 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -73,7 +73,7 @@ inline std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shar
}
std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
-std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable);
+std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine = false);
std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const std::shared_ptr<TSortDescription>& description);
@@ -98,6 +98,7 @@ std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size);
std::vector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema);
bool ReserveData(arrow::ArrayBuilder& builder, const size_t size);
bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder = {}, const bool orderFieldsAreNecessary = true);
+bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::Table>>& batches, std::shared_ptr<arrow::Table>& result, const std::vector<std::string>& columnsOrder = {}, const bool orderFieldsAreNecessary = true);
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp
index e0593d0b13d..a9b9864f81d 100644
--- a/ydb/core/formats/arrow/program.cpp
+++ b/ydb/core/formats/arrow/program.cpp
@@ -40,6 +40,7 @@ struct GroupByOptions : public arrow::compute::ScalarAggregateOptions {
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/datum.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
+#include <library/cpp/actors/core/log.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
namespace NKikimr::NSsa {
@@ -459,18 +460,15 @@ CH::GroupByOptions::Assign GetGroupByAssign(const TAggregateAssign& assign) {
class TFilterVisitor : public arrow::ArrayVisitor {
std::vector<bool> FiltersMerged;
+ ui32 CursorIdx = 0;
+ bool Started = false;
public:
void BuildColumnFilter(NArrow::TColumnFilter& result) {
result = NArrow::TColumnFilter(std::move(FiltersMerged));
}
arrow::Status Visit(const arrow::BooleanArray& array) override {
- InitAndCheck(array);
- for (ui32 i = 0; i < FiltersMerged.size(); ++i) {
- bool columnValue = array.Value(i);
- FiltersMerged[i] = FiltersMerged[i] && columnValue;
- }
- return arrow::Status::OK();
+ return VisitImpl(array);
}
arrow::Status Visit(const arrow::Int8Array& array) override {
@@ -481,24 +479,44 @@ public:
return VisitImpl(array);
}
+ TFilterVisitor(const ui32 rowsCount) {
+ FiltersMerged.resize(rowsCount, true);
+ }
+
+ class TModificationGuard: public TNonCopyable {
+ private:
+ TFilterVisitor& Owner;
+ public:
+ TModificationGuard(TFilterVisitor& owner)
+ : Owner(owner)
+ {
+ Owner.CursorIdx = 0;
+ AFL_VERIFY(!Owner.Started);
+ Owner.Started = true;
+ }
+
+ ~TModificationGuard() {
+ AFL_VERIFY(Owner.CursorIdx == Owner.FiltersMerged.size());
+ Owner.Started = false;
+ }
+ };
+
+ TModificationGuard StartVisit() {
+ return TModificationGuard(*this);
+ }
+
private:
template <class TArray>
arrow::Status VisitImpl(const TArray& array) {
- InitAndCheck(array);
+ AFL_VERIFY(Started);
for (ui32 i = 0; i < FiltersMerged.size(); ++i) {
- bool columnValue = (bool)array.Value(i);
- FiltersMerged[i] = FiltersMerged[i] && columnValue;
+ const bool columnValue = (bool)array.Value(i);
+ const ui32 currentIdx = CursorIdx++;
+ FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && columnValue;
}
+ AFL_VERIFY(CursorIdx <= FiltersMerged.size());
return arrow::Status::OK();
}
-
- void InitAndCheck(const arrow::Array& array) {
- if (FiltersMerged.empty()) {
- FiltersMerged.resize(array.length(), true);
- } else {
- Y_ABORT_UNLESS(FiltersMerged.size() == (size_t)array.length());
- }
- }
};
}
@@ -536,18 +554,33 @@ std::shared_ptr<arrow::RecordBatch> TDatumBatch::ToRecordBatch() const {
for (auto col : Datums) {
if (col.is_scalar()) {
columns.push_back(*arrow::MakeArrayFromScalar(*col.scalar(), Rows));
- }
- else if (col.is_array()){
+ } else if (col.is_array()){
if (col.length() == -1) {
return {};
}
columns.push_back(col.make_array());
+ } else {
+ AFL_VERIFY(false);
}
}
return arrow::RecordBatch::Make(Schema, Rows, columns);
}
-std::shared_ptr<TDatumBatch> TDatumBatch::FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch) {
+std::shared_ptr<TDatumBatch> TDatumBatch::FromRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch) {
+ std::vector<arrow::Datum> datums;
+ datums.reserve(batch->num_columns());
+ for (int64_t i = 0; i < batch->num_columns(); ++i) {
+ datums.push_back(arrow::Datum(batch->column(i)));
+ }
+ return std::make_shared<TProgramStep::TDatumBatch>(
+ TProgramStep::TDatumBatch{
+ .Schema = std::make_shared<arrow::Schema>(*batch->schema()),
+ .Datums = std::move(datums),
+ .Rows = batch->num_rows()
+ });
+}
+
+std::shared_ptr<TDatumBatch> TDatumBatch::FromTable(const std::shared_ptr<arrow::Table>& batch) {
std::vector<arrow::Datum> datums;
datums.reserve(batch->num_columns());
for (int64_t i = 0; i < batch->num_columns(); ++i) {
@@ -677,19 +710,24 @@ arrow::Status TProgramStep::ApplyAggregates(TDatumBatch& batch, arrow::compute::
}
arrow::Status TProgramStep::MakeCombinedFilter(TDatumBatch& batch, NArrow::TColumnFilter& result) const {
- TFilterVisitor filterVisitor;
+ TFilterVisitor filterVisitor(batch.Rows);
for (auto& colName : Filters) {
auto column = batch.GetColumnByName(colName);
if (!column.ok()) {
return column.status();
}
- if (!column->is_array()) {
- return arrow::Status::Invalid("Column '" + colName + "' is not an array.");
- }
- auto columnArray = column->make_array();
- auto status = columnArray->Accept(&filterVisitor);
- if (!status.ok()) {
- return status;
+ if (column->is_array()) {
+ auto g = filterVisitor.StartVisit();
+ auto columnArray = column->make_array();
+ NArrow::TStatusValidator::Validate(columnArray->Accept(&filterVisitor));
+ } else if (column->is_arraylike()) {
+ auto columnArray = column->chunked_array();
+ auto g = filterVisitor.StartVisit();
+ for (auto&& i : columnArray->chunks()) {
+ NArrow::TStatusValidator::Validate(i->Accept(&filterVisitor));
+ }
+ } else {
+ AFL_VERIFY(false)("column", colName);
}
}
filterVisitor.BuildColumnFilter(result);
@@ -702,10 +740,7 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const {
}
NArrow::TColumnFilter bits = NArrow::TColumnFilter::BuildAllowFilter();
- auto status = MakeCombinedFilter(batch, bits);
- if (!status.ok()) {
- return status;
- }
+ NArrow::TStatusValidator::Validate(MakeCombinedFilter(batch, bits));
if (!bits.IsTotalAllowFilter()) {
std::unordered_set<std::string_view> neededColumns;
bool allColumns = Projection.empty() && GroupBy.empty();
@@ -725,17 +760,10 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const {
auto filter = bits.BuildArrowFilter(batch.Rows);
for (int64_t i = 0; i < batch.Schema->num_fields(); ++i) {
- bool needed = (allColumns || neededColumns.contains(batch.Schema->field(i)->name()));
- if (batch.Datums[i].is_array() && needed) {
- auto res = arrow::compute::Filter(batch.Datums[i].make_array(), filter);
- if (!res.ok()) {
- return res.status();
- }
- if ((*res).kind() != batch.Datums[i].kind()) {
- return arrow::Status::Invalid("Unexpected filter result.");
- }
-
- batch.Datums[i] = *res;
+ if (batch.Datums[i].is_arraylike() && (allColumns || neededColumns.contains(batch.Schema->field(i)->name()))) {
+ auto datum = NArrow::TStatusValidator::GetValid(arrow::compute::Filter(batch.Datums[i], filter));
+ AFL_VERIFY(datum.is_arraylike())("datum", datum.ToString())("batch", batch.Datums[i].ToString());
+ batch.Datums[i] = std::move(datum);
}
}
@@ -830,9 +858,7 @@ std::set<std::string> TProgramStep::GetColumnsInUsage() const {
return result;
}
-NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
- arrow::compute::ExecContext* ctx) const
-{
+NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<TProgramStep::TDatumBatch>& rb, arrow::compute::ExecContext* ctx) const {
NArrow::TColumnFilter result = NArrow::TColumnFilter::BuildAllowFilter();
try {
if (Steps.empty()) {
@@ -843,9 +869,6 @@ NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::Rec
return result;
}
- auto batch = srcBatch;
- auto rb = TProgramStep::TDatumBatch::FromRecordBatch(batch);
-
if (!step->ApplyAssignes(*rb, ctx).ok()) {
return result;
}
@@ -860,6 +883,16 @@ NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::Rec
return result;
}
+NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ arrow::compute::ExecContext* ctx) const {
+ return MakeEarlyFilter(TProgramStep::TDatumBatch::FromRecordBatch(srcBatch), ctx);
+}
+
+NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::Table>& srcBatch,
+ arrow::compute::ExecContext* ctx) const {
+ return MakeEarlyFilter(TProgramStep::TDatumBatch::FromTable(srcBatch), ctx);
+}
+
std::set<std::string> TProgram::GetEarlyFilterColumns() const {
std::set<std::string> result;
if (Steps.empty()) {
diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h
index 329d243fc6e..6bf94b6b614 100644
--- a/ydb/core/formats/arrow/program.h
+++ b/ydb/core/formats/arrow/program.h
@@ -45,7 +45,8 @@ struct TDatumBatch {
arrow::Status AddColumn(const std::string& name, arrow::Datum&& column);
arrow::Result<arrow::Datum> GetColumnByName(const std::string& name) const;
std::shared_ptr<arrow::RecordBatch> ToRecordBatch() const;
- static std::shared_ptr<TDatumBatch> FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch);
+ static std::shared_ptr<TDatumBatch> FromRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch);
+ static std::shared_ptr<TDatumBatch> FromTable(const std::shared_ptr<arrow::Table>& batch);
};
template <class TAssignObject>
@@ -299,7 +300,11 @@ struct TProgram {
std::set<std::string> GetEarlyFilterColumns() const;
NArrow::TColumnFilter MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch,
- arrow::compute::ExecContext* ctx) const;
+ arrow::compute::ExecContext* ctx) const;
+ NArrow::TColumnFilter MakeEarlyFilter(const std::shared_ptr<arrow::Table>& batch,
+ arrow::compute::ExecContext* ctx) const;
+
+ NArrow::TColumnFilter MakeEarlyFilter(const std::shared_ptr<TProgramStep::TDatumBatch>& batch, arrow::compute::ExecContext* ctx) const;
};
inline arrow::Status ApplyProgram(
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
index 04b58eae0bb..22e479c46e1 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
@@ -67,7 +67,7 @@ public:
STATEFN(StateFunc) {
auto gTimeFull = KqpComputeActorSpan.StartStackTimeGuard("processing");
- auto gTime = KqpComputeActorSpan.StartStackTimeGuard("event_" + ::ToString(ev->GetTypeRewrite()));
+ auto gTime = KqpComputeActorSpan.StartStackTimeGuard("event_" + ev->GetTypeName());
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
@@ -83,7 +83,7 @@ public:
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
default:
- StopOnError("unexpected message on data fetching: " + ::ToString(ev->GetTypeRewrite()));
+ StopOnError("unexpected message on data fetching: " + ev->GetTypeName());
}
} catch (...) {
StopOnError("unexpected exception: " + CurrentExceptionMessage());
diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp
index 307ee53692f..86e04a8f68d 100644
--- a/ydb/core/tx/columnshard/engines/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/filter.cpp
@@ -8,6 +8,93 @@
namespace NKikimr::NOlap {
+template <class TArrayView>
+class TChunkedArrayIterator {
+private:
+ const arrow::ArrayVector* Chunks;
+ const typename TArrayView::value_type* RawView;
+ arrow::ArrayVector::const_iterator CurrentChunkIt;
+ ui32 CurrentChunkPosition = 0;
+public:
+ TChunkedArrayIterator(const std::shared_ptr<arrow::ChunkedArray>& chunks)
+ {
+ AFL_VERIFY(!!chunks);
+ Chunks = &chunks->chunks();
+ AFL_VERIFY(Chunks->size());
+ CurrentChunkIt = Chunks->begin();
+ Y_ABORT_UNLESS(chunks->type()->id() == arrow::TypeTraits<typename TArrayView::TypeClass>::type_singleton()->id());
+ if (IsValid()) {
+ RawView = std::static_pointer_cast<TArrayView>(*CurrentChunkIt)->raw_values();
+ }
+ }
+
+ bool IsValid() const {
+ return CurrentChunkIt != Chunks->end() && CurrentChunkPosition < (*CurrentChunkIt)->length();
+ }
+
+ typename TArrayView::value_type GetValue() const {
+ AFL_VERIFY_DEBUG(IsValid());
+ return RawView[CurrentChunkPosition];
+ }
+
+ bool Next() {
+ AFL_VERIFY_DEBUG(IsValid());
+ ++CurrentChunkPosition;
+ while (CurrentChunkIt != Chunks->end() && (*CurrentChunkIt)->length() == CurrentChunkPosition) {
+ if (++CurrentChunkIt != Chunks->end()) {
+ CurrentChunkPosition = 0;
+ RawView = std::static_pointer_cast<TArrayView>(*CurrentChunkIt)->raw_values();
+ }
+ }
+ return CurrentChunkIt != Chunks->end();
+ }
+};
+
+class TTableSnapshotGetter {
+private:
+ const TSnapshot Snapshot;
+ mutable TChunkedArrayIterator<arrow::UInt64Array> Steps;
+ mutable TChunkedArrayIterator<arrow::UInt64Array> Ids;
+ mutable i64 CurrentIdx = -1;
+public:
+ TTableSnapshotGetter(const std::shared_ptr<arrow::ChunkedArray>& steps, const std::shared_ptr<arrow::ChunkedArray>& ids, const TSnapshot& snapshot)
+ : Snapshot(snapshot)
+ , Steps(steps)
+ , Ids(ids)
+ {
+ Y_ABORT_UNLESS(steps->length() == ids->length());
+ }
+
+ bool operator[](const ui32 idx) const {
+ AFL_VERIFY(CurrentIdx + 1 == idx)("current_idx", CurrentIdx)("idx", idx);
+ CurrentIdx = idx;
+ const bool result = std::less_equal<TSnapshot>()(TSnapshot(Steps.GetValue(), Ids.GetValue()), Snapshot);
+ const bool sNext = Steps.Next();
+ const bool idNext = Ids.Next();
+ AFL_VERIFY(sNext == idNext);
+ if (!idNext) {
+ CurrentIdx = -2;
+ }
+ return result;
+ }
+};
+
+template <class TGetter, class TData>
+NArrow::TColumnFilter MakeSnapshotFilterImpl(const std::shared_ptr<TData>& batch, const TSnapshot& snapshot) {
+ Y_ABORT_UNLESS(batch);
+ auto steps = batch->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
+ auto ids = batch->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
+ NArrow::TColumnFilter result = NArrow::TColumnFilter::BuildAllowFilter();
+ TGetter getter(steps, ids, snapshot);
+ result.Reset(steps->length(), std::move(getter));
+ return result;
+}
+
+NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::Table>& batch,
+ const TSnapshot& snapshot) {
+ return MakeSnapshotFilterImpl<TTableSnapshotGetter>(batch, snapshot);
+}
+
class TSnapshotGetter {
private:
const arrow::UInt64Array::value_type* RawSteps;
@@ -32,34 +119,28 @@ public:
};
NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::Schema>& snapSchema,
const TSnapshot& snapshot) {
- Y_ABORT_UNLESS(batch);
- Y_ABORT_UNLESS(snapSchema);
- Y_ABORT_UNLESS(snapSchema->num_fields() == 2);
- auto steps = batch->GetColumnByName(snapSchema->fields()[0]->name());
- auto ids = batch->GetColumnByName(snapSchema->fields()[1]->name());
- NArrow::TColumnFilter result = NArrow::TColumnFilter::BuildAllowFilter();
- TSnapshotGetter getter(steps, ids, snapshot);
- result.Reset(steps->length(), std::move(getter));
- return result;
+ return MakeSnapshotFilterImpl<TSnapshotGetter>(batch, snapshot);
}
-NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata, const bool useSnapshotFilter) {
+NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::Table>& portion, const TReadMetadata& readMetadata, const bool useSnapshotFilter) {
Y_ABORT_UNLESS(portion);
NArrow::TColumnFilter result = readMetadata.GetPKRangesFilter().BuildFilter(portion);
if (readMetadata.GetSnapshot().GetPlanStep() && useSnapshotFilter) {
- auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
- result = result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot()));
+ result = result.And(MakeSnapshotFilter(portion, readMetadata.GetSnapshot()));
}
return result;
}
-NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata) {
+NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::Table>& batch, const TReadMetadata& readMetadata) {
return readMetadata.GetPKRangesFilter().BuildFilter(batch);
}
+NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::Table>& batch, std::shared_ptr<NSsa::TProgram> ssa) {
+ return ssa->MakeEarlyFilter(batch, NArrow::GetCustomExecContext());
+}
+
NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa) {
return ssa->MakeEarlyFilter(batch, NArrow::GetCustomExecContext());
}
diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h
index 7eaab200ff0..34f3913de45 100644
--- a/ydb/core/tx/columnshard/engines/filter.h
+++ b/ydb/core/tx/columnshard/engines/filter.h
@@ -6,13 +6,13 @@
namespace NKikimr::NOlap {
-NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::shared_ptr<arrow::Schema>& snapSchema,
- const TSnapshot& snapshot);
+NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot);
+NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::Table>& batch, const TSnapshot& snapshot);
struct TReadMetadata;
-NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata, const bool useSnapshotFilter);
-NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
+NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::Table>& batch, const TReadMetadata& readMetadata, const bool useSnapshotFilter);
+NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::Table>& batch, const TReadMetadata& readMetadata);
+NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::Table>& batch, std::shared_ptr<NSsa::TProgram> ssa);
NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa);
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/predicate/container.h b/ydb/core/tx/columnshard/engines/predicate/container.h
index 0606335cc9e..e485cc2ddae 100644
--- a/ydb/core/tx/columnshard/engines/predicate/container.h
+++ b/ydb/core/tx/columnshard/engines/predicate/container.h
@@ -63,7 +63,7 @@ public:
static std::optional<TPredicateContainer> BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo);
- NKikimr::NArrow::TColumnFilter BuildFilter(const std::shared_ptr<arrow::RecordBatch>& data) const {
+ NKikimr::NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const {
if (!Object) {
return NArrow::TColumnFilter::BuildAllowFilter();
}
diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.cpp b/ydb/core/tx/columnshard/engines/predicate/filter.cpp
index cdbff1d15f4..0a048fada45 100644
--- a/ydb/core/tx/columnshard/engines/predicate/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/predicate/filter.cpp
@@ -3,7 +3,7 @@
namespace NKikimr::NOlap {
-NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const {
+NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(const arrow::Datum& data) const {
if (SortedRanges.empty()) {
return NArrow::TColumnFilter::BuildAllowFilter();
}
diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.h b/ydb/core/tx/columnshard/engines/predicate/filter.h
index 45b64a91495..307891fec3b 100644
--- a/ydb/core/tx/columnshard/engines/predicate/filter.h
+++ b/ydb/core/tx/columnshard/engines/predicate/filter.h
@@ -35,7 +35,7 @@ public:
bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
- NArrow::TColumnFilter BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const;
+ NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
bool Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const TIndexInfo* indexInfo) Y_WARN_UNUSED_RESULT;
diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp
index a645c1444c0..bc526f4bbfe 100644
--- a/ydb/core/tx/columnshard/engines/predicate/range.cpp
+++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp
@@ -34,7 +34,7 @@ std::set<std::string> TPKRangeFilter::GetColumnNames() const {
return result;
}
-NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const {
+NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(const arrow::Datum& data) const {
NArrow::TColumnFilter result = PredicateTo.BuildFilter(data);
return result.And(PredicateFrom.BuildFilter(data));
}
diff --git a/ydb/core/tx/columnshard/engines/predicate/range.h b/ydb/core/tx/columnshard/engines/predicate/range.h
index 6c154bb5736..c124567066b 100644
--- a/ydb/core/tx/columnshard/engines/predicate/range.h
+++ b/ydb/core/tx/columnshard/engines/predicate/range.h
@@ -34,7 +34,7 @@ public:
static std::optional<TPKRangeFilter> Build(TPredicateContainer&& from, TPredicateContainer&& to);
- NArrow::TColumnFilter BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const;
+ NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
index daf0dfaf5a8..c5a24699809 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
@@ -13,12 +13,12 @@ bool TAssembleBatch::DoExecute() {
Y_ABORT_UNLESS(batchConstructor.GetColumnsCount());
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- auto addBatch = batchConstructor.Assemble(options);
+ auto addBatch = batchConstructor.AssembleTable(options);
Y_ABORT_UNLESS(addBatch);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
("columns_count", addBatch->num_columns())("num_rows", addBatch->num_rows());
Filter->Apply(addBatch);
- Result = addBatch;
+ Result = NArrow::ToBatch(addBatch, true);
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
index 1c88f99aa96..6bb0ccac738 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
@@ -12,7 +12,7 @@ bool TAssembleFilter::DoExecute() {
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
options.IncludedColumnIds = FilterColumnIds;
- if (RecordsMaxSnapshot < ReadMetadata->GetSnapshot() && UseFilter) {
+ if (RecordsMaxSnapshot <= ReadMetadata->GetSnapshot() && UseFilter) {
for (auto&& i : TIndexInfo::GetSpecialColumnIds()) {
options.IncludedColumnIds->erase(i);
}
@@ -21,19 +21,19 @@ bool TAssembleFilter::DoExecute() {
auto batchConstructor = BuildBatchConstructor(FilterColumnIds);
- auto batch = batchConstructor.Assemble(options);
+ auto batch = batchConstructor.AssembleTable(options);
Y_ABORT_UNLESS(batch);
Y_ABORT_UNLESS(batch->num_rows());
- if (RecordsMaxSnapshot < ReadMetadata->GetSnapshot() && UseFilter) {
+ if (RecordsMaxSnapshot <= ReadMetadata->GetSnapshot() && UseFilter) {
for (auto&& f : TIndexInfo::ArrowSchemaSnapshot()->fields()) {
auto c = NArrow::TStatusValidator::GetValid(arrow::MakeArrayOfNull(f->type(), batch->num_rows()));
- batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), f, c));
+ batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), f, std::make_shared<arrow::ChunkedArray>(c)));
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "restore_fake_special_columns");
}
OriginalCount = batch->num_rows();
- AppliedFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata, ReadMetadata->GetSnapshot() <= RecordsMaxSnapshot));
+ AppliedFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata, ReadMetadata->GetSnapshot() < RecordsMaxSnapshot));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "first_filter_using");
if (!AppliedFilter->Apply(batch)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount)("columns_count", FilterColumnIds.size());
@@ -55,7 +55,7 @@ bool TAssembleFilter::DoExecute() {
if ((size_t)batch->schema()->num_fields() < batchConstructor.GetColumnsCount()) {
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
options.ExcludedColumnIds = FilterColumnIds;
- auto addBatch = batchConstructor.Assemble(options);
+ auto addBatch = batchConstructor.AssembleTable(options);
Y_ABORT_UNLESS(addBatch);
Y_ABORT_UNLESS(AppliedFilter->Apply(addBatch));
Y_ABORT_UNLESS(NArrow::MergeBatchColumns({ batch, addBatch }, batch, batchConstructor.GetSchemaColumnNames(), true));
@@ -65,7 +65,7 @@ bool TAssembleFilter::DoExecute() {
("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", batchConstructor.GetColumnsCount())("use_filter", UseFilter)
("filter_columns", FilterColumnIds.size())("af_count", AppliedFilter->Size())("ef_count", earlyFilter ? earlyFilter->Size() : 0);
- FilteredBatch = batch;
+ FilteredBatch = NArrow::ToBatch(batch, true);
return true;
}
diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp
index 1092468712c..2c76e652002 100644
--- a/ydb/core/tx/program/program.cpp
+++ b/ydb/core/tx/program/program.cpp
@@ -419,7 +419,14 @@ bool TProgramContainer::HasProgram() const {
return !!Program;
}
-std::shared_ptr<NArrow::TColumnFilter> TProgramContainer::BuildEarlyFilter(std::shared_ptr<arrow::RecordBatch> batch) const {
+std::shared_ptr<NArrow::TColumnFilter> TProgramContainer::BuildEarlyFilter(const std::shared_ptr<arrow::Table>& batch) const {
+ if (Program) {
+ return std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Program));
+ }
+ return nullptr;
+}
+
+std::shared_ptr<NArrow::TColumnFilter> TProgramContainer::BuildEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch) const {
if (Program) {
return std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Program));
}
diff --git a/ydb/core/tx/program/program.h b/ydb/core/tx/program/program.h
index d3d562be5a1..245aebed195 100644
--- a/ydb/core/tx/program/program.h
+++ b/ydb/core/tx/program/program.h
@@ -35,7 +35,8 @@ public:
const THashMap<ui32, TString>& GetSourceColumns() const;
bool HasProgram() const;
- std::shared_ptr<NArrow::TColumnFilter> BuildEarlyFilter(std::shared_ptr<arrow::RecordBatch> batch) const;
+ std::shared_ptr<NArrow::TColumnFilter> BuildEarlyFilter(const std::shared_ptr<arrow::Table>& batch) const;
+ std::shared_ptr<NArrow::TColumnFilter> BuildEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch) const;
std::set<std::string> GetEarlyFilterColumns() const;
bool HasEarlyFilterOnly() const;