diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-04 15:59:39 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-04 16:14:22 +0300 |
commit | b5c60b12845080cf7ca566be672cc94b51e4cba7 (patch) | |
tree | 076f894b2a95f78d5828c9c82f72050352175b45 | |
parent | 5b0a5ace977f66c6e1b72587aebf7a7f7d960412 (diff) | |
download | ydb-b5c60b12845080cf7ca566be672cc94b51e4cba7.tar.gz |
KIKIMR-19988: improve chunks merging. table using in some cases with early filter
18 files changed, 283 insertions, 114 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index 978d55f4bff..559b33aab8a 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -195,8 +195,7 @@ std::shared_ptr<arrow::BooleanArray> TColumnFilter::BuildArrowFilter(const ui32 auto res = builder.Reserve(expectedSize); Y_VERIFY_OK(builder.AppendValues(BuildSimpleFilter(expectedSize))); std::shared_ptr<arrow::BooleanArray> out; - res = builder.Finish(&out); - Y_VERIFY_OK(res); + TStatusValidator::Validate(builder.Finish(&out)); return out; } @@ -301,23 +300,40 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D return NArrow::TColumnFilter(std::move(bits)); } -bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) { +template <arrow::Datum::Kind kindExpected, class TData> +bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch) { if (!batch || !batch->num_rows()) { return false; } - Y_VERIFY_S(Filter.empty() || Count == (size_t)batch->num_rows(), Count << " != " << batch->num_rows()); - if (IsTotalDenyFilter()) { + AFL_VERIFY(filter.IsEmpty() || filter.Size() == (size_t)batch->num_rows())("filter_size", filter.Size())("batch_size", batch->num_rows()); + if (filter.IsTotalDenyFilter()) { batch = batch->Slice(0, 0); return false; } - if (IsTotalAllowFilter()) { + if (filter.IsTotalAllowFilter()) { return true; } - auto res = arrow::compute::Filter(batch, BuildArrowFilter(batch->num_rows())); + auto res = arrow::compute::Filter(batch, filter.BuildArrowFilter(batch->num_rows())); Y_VERIFY_S(res.ok(), res.status().message()); - Y_ABORT_UNLESS((*res).kind() == arrow::Datum::RECORD_BATCH); - batch = (*res).record_batch(); - return batch->num_rows(); + Y_ABORT_UNLESS((*res).kind() == kindExpected); + if constexpr (kindExpected == arrow::Datum::TABLE) { + batch = (*res).table(); + return batch->num_rows(); + } + if constexpr (kindExpected == arrow::Datum::RECORD_BATCH) { + batch = (*res).record_batch(); + return batch->num_rows(); + } + AFL_VERIFY(false); + return false; +} + +bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch) { + return ApplyImpl<arrow::Datum::TABLE>(*this, batch); +} + +bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) { + return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch); } const std::vector<bool>& TColumnFilter::BuildSimpleFilter(const ui32 expectedSize) const { diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h index fa467648a09..6770d76e802 100644 --- a/ydb/core/formats/arrow/arrow_filter.h +++ b/ydb/core/formats/arrow/arrow_filter.h @@ -127,8 +127,8 @@ public: return; } bool currentValue = getter[0]; - ui32 sameValueCount = 0; - for (ui32 i = 0; i < count; ++i) { + ui32 sameValueCount = 1; + for (ui32 i = 1; i < count; ++i) { if (getter[i] != currentValue) { Add(currentValue, sameValueCount); sameValueCount = 0; @@ -148,8 +148,10 @@ public: std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(const ui32 expectedSize) const; bool IsTotalAllowFilter() const; - bool IsTotalDenyFilter() const; + bool IsEmpty() const { + return Filter.empty(); + } static TColumnFilter BuildStopFilter() { return TColumnFilter(false); @@ -163,9 +165,9 @@ public: TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT; // It makes a filter using composite predicate - static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, - ECompareType compareType); + static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType); + bool Apply(std::shared_ptr<arrow::Table>& batch); bool Apply(std::shared_ptr<arrow::RecordBatch>& batch); // Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions) diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index cc554adee8e..b671d263668 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -281,8 +281,16 @@ std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared return table ? ToBatch(table) : nullptr; } -std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& table) { - Y_ABORT_UNLESS(table); +std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& tableExt, const bool combine) { + Y_ABORT_UNLESS(tableExt); + std::shared_ptr<arrow::Table> table; + if (combine) { + auto res = tableExt->CombineChunks(); + Y_ABORT_UNLESS(res.ok()); + table = *res; + } else { + table = tableExt; + } std::vector<std::shared_ptr<arrow::Array>> columns; columns.reserve(table->num_columns()); for (auto& col : table->columns()) { @@ -755,8 +763,9 @@ bool ReserveData(arrow::ArrayBuilder& builder, const size_t size) { return result.ok(); } -bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, - const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary) { +template <class TData, class TColumn, class TBuilder> +bool MergeBatchColumnsImpl(const std::vector<std::shared_ptr<TData>>& batches, std::shared_ptr<TData>& result, + const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary, const TBuilder& builder) { if (batches.empty()) { result = nullptr; return true; @@ -766,7 +775,7 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b return true; } std::vector<std::shared_ptr<arrow::Field>> fields; - std::vector<std::shared_ptr<arrow::Array>> columns; + std::vector<std::shared_ptr<TColumn>> columns; std::map<std::string, ui32> fieldNames; for (auto&& i : batches) { Y_ABORT_UNLESS(i); @@ -785,11 +794,9 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b } Y_ABORT_UNLESS(fields.size() == columns.size()); - if (columnsOrder.empty()) { - result = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), batches.front()->num_rows(), columns); - } else { + if (columnsOrder.size()) { std::vector<std::shared_ptr<arrow::Field>> fieldsOrdered; - std::vector<std::shared_ptr<arrow::Array>> columnsOrdered; + std::vector<std::shared_ptr<TColumn>> columnsOrdered; for (auto&& i : columnsOrder) { auto it = fieldNames.find(i); if (orderFieldsAreNecessary) { @@ -803,10 +810,26 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b std::swap(fieldsOrdered, fields); std::swap(columnsOrdered, columns); } - result = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), batches.front()->num_rows(), columns); + result = builder(std::make_shared<arrow::Schema>(fields), batches.front()->num_rows(), std::move(columns)); return true; } +bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::Table>>& batches, std::shared_ptr<arrow::Table>& result, const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary) { + const auto builder = [](const std::shared_ptr<arrow::Schema>& schema, const ui32 recordsCount, std::vector<std::shared_ptr<arrow::ChunkedArray>>&& columns) { + return arrow::Table::Make(schema, columns, recordsCount); + }; + + return MergeBatchColumnsImpl<arrow::Table, arrow::ChunkedArray>(batches, result, columnsOrder, orderFieldsAreNecessary, builder); +} + +bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary) { + const auto builder = [](const std::shared_ptr<arrow::Schema>& schema, const ui32 recordsCount, std::vector<std::shared_ptr<arrow::Array>>&& columns) { + return arrow::RecordBatch::Make(schema, recordsCount, columns); + }; + + return MergeBatchColumnsImpl<arrow::RecordBatch, arrow::Array>(batches, result, columnsOrder, orderFieldsAreNecessary, builder); +} + std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow) { return TRawReplaceKey(&x, xRow).CompareNotNull(TRawReplaceKey(&y, yRow)); } diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index fe4bd4c3c02..84aba485fa7 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -73,7 +73,7 @@ inline std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shar } std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); -std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable); +std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine = false); std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const std::shared_ptr<TSortDescription>& description); @@ -98,6 +98,7 @@ std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size); std::vector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema); bool ReserveData(arrow::ArrayBuilder& builder, const size_t size); bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder = {}, const bool orderFieldsAreNecessary = true); +bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::Table>>& batches, std::shared_ptr<arrow::Table>& result, const std::vector<std::string>& columnsOrder = {}, const bool orderFieldsAreNecessary = true); std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique); diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index e0593d0b13d..a9b9864f81d 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -40,6 +40,7 @@ struct GroupByOptions : public arrow::compute::ScalarAggregateOptions { #include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/datum.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/result.h> +#include <library/cpp/actors/core/log.h> #include <ydb/library/yverify_stream/yverify_stream.h> namespace NKikimr::NSsa { @@ -459,18 +460,15 @@ CH::GroupByOptions::Assign GetGroupByAssign(const TAggregateAssign& assign) { class TFilterVisitor : public arrow::ArrayVisitor { std::vector<bool> FiltersMerged; + ui32 CursorIdx = 0; + bool Started = false; public: void BuildColumnFilter(NArrow::TColumnFilter& result) { result = NArrow::TColumnFilter(std::move(FiltersMerged)); } arrow::Status Visit(const arrow::BooleanArray& array) override { - InitAndCheck(array); - for (ui32 i = 0; i < FiltersMerged.size(); ++i) { - bool columnValue = array.Value(i); - FiltersMerged[i] = FiltersMerged[i] && columnValue; - } - return arrow::Status::OK(); + return VisitImpl(array); } arrow::Status Visit(const arrow::Int8Array& array) override { @@ -481,24 +479,44 @@ public: return VisitImpl(array); } + TFilterVisitor(const ui32 rowsCount) { + FiltersMerged.resize(rowsCount, true); + } + + class TModificationGuard: public TNonCopyable { + private: + TFilterVisitor& Owner; + public: + TModificationGuard(TFilterVisitor& owner) + : Owner(owner) + { + Owner.CursorIdx = 0; + AFL_VERIFY(!Owner.Started); + Owner.Started = true; + } + + ~TModificationGuard() { + AFL_VERIFY(Owner.CursorIdx == Owner.FiltersMerged.size()); + Owner.Started = false; + } + }; + + TModificationGuard StartVisit() { + return TModificationGuard(*this); + } + private: template <class TArray> arrow::Status VisitImpl(const TArray& array) { - InitAndCheck(array); + AFL_VERIFY(Started); for (ui32 i = 0; i < FiltersMerged.size(); ++i) { - bool columnValue = (bool)array.Value(i); - FiltersMerged[i] = FiltersMerged[i] && columnValue; + const bool columnValue = (bool)array.Value(i); + const ui32 currentIdx = CursorIdx++; + FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && columnValue; } + AFL_VERIFY(CursorIdx <= FiltersMerged.size()); return arrow::Status::OK(); } - - void InitAndCheck(const arrow::Array& array) { - if (FiltersMerged.empty()) { - FiltersMerged.resize(array.length(), true); - } else { - Y_ABORT_UNLESS(FiltersMerged.size() == (size_t)array.length()); - } - } }; } @@ -536,18 +554,33 @@ std::shared_ptr<arrow::RecordBatch> TDatumBatch::ToRecordBatch() const { for (auto col : Datums) { if (col.is_scalar()) { columns.push_back(*arrow::MakeArrayFromScalar(*col.scalar(), Rows)); - } - else if (col.is_array()){ + } else if (col.is_array()){ if (col.length() == -1) { return {}; } columns.push_back(col.make_array()); + } else { + AFL_VERIFY(false); } } return arrow::RecordBatch::Make(Schema, Rows, columns); } -std::shared_ptr<TDatumBatch> TDatumBatch::FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch) { +std::shared_ptr<TDatumBatch> TDatumBatch::FromRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch) { + std::vector<arrow::Datum> datums; + datums.reserve(batch->num_columns()); + for (int64_t i = 0; i < batch->num_columns(); ++i) { + datums.push_back(arrow::Datum(batch->column(i))); + } + return std::make_shared<TProgramStep::TDatumBatch>( + TProgramStep::TDatumBatch{ + .Schema = std::make_shared<arrow::Schema>(*batch->schema()), + .Datums = std::move(datums), + .Rows = batch->num_rows() + }); +} + +std::shared_ptr<TDatumBatch> TDatumBatch::FromTable(const std::shared_ptr<arrow::Table>& batch) { std::vector<arrow::Datum> datums; datums.reserve(batch->num_columns()); for (int64_t i = 0; i < batch->num_columns(); ++i) { @@ -677,19 +710,24 @@ arrow::Status TProgramStep::ApplyAggregates(TDatumBatch& batch, arrow::compute:: } arrow::Status TProgramStep::MakeCombinedFilter(TDatumBatch& batch, NArrow::TColumnFilter& result) const { - TFilterVisitor filterVisitor; + TFilterVisitor filterVisitor(batch.Rows); for (auto& colName : Filters) { auto column = batch.GetColumnByName(colName); if (!column.ok()) { return column.status(); } - if (!column->is_array()) { - return arrow::Status::Invalid("Column '" + colName + "' is not an array."); - } - auto columnArray = column->make_array(); - auto status = columnArray->Accept(&filterVisitor); - if (!status.ok()) { - return status; + if (column->is_array()) { + auto g = filterVisitor.StartVisit(); + auto columnArray = column->make_array(); + NArrow::TStatusValidator::Validate(columnArray->Accept(&filterVisitor)); + } else if (column->is_arraylike()) { + auto columnArray = column->chunked_array(); + auto g = filterVisitor.StartVisit(); + for (auto&& i : columnArray->chunks()) { + NArrow::TStatusValidator::Validate(i->Accept(&filterVisitor)); + } + } else { + AFL_VERIFY(false)("column", colName); } } filterVisitor.BuildColumnFilter(result); @@ -702,10 +740,7 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { } NArrow::TColumnFilter bits = NArrow::TColumnFilter::BuildAllowFilter(); - auto status = MakeCombinedFilter(batch, bits); - if (!status.ok()) { - return status; - } + NArrow::TStatusValidator::Validate(MakeCombinedFilter(batch, bits)); if (!bits.IsTotalAllowFilter()) { std::unordered_set<std::string_view> neededColumns; bool allColumns = Projection.empty() && GroupBy.empty(); @@ -725,17 +760,10 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { auto filter = bits.BuildArrowFilter(batch.Rows); for (int64_t i = 0; i < batch.Schema->num_fields(); ++i) { - bool needed = (allColumns || neededColumns.contains(batch.Schema->field(i)->name())); - if (batch.Datums[i].is_array() && needed) { - auto res = arrow::compute::Filter(batch.Datums[i].make_array(), filter); - if (!res.ok()) { - return res.status(); - } - if ((*res).kind() != batch.Datums[i].kind()) { - return arrow::Status::Invalid("Unexpected filter result."); - } - - batch.Datums[i] = *res; + if (batch.Datums[i].is_arraylike() && (allColumns || neededColumns.contains(batch.Schema->field(i)->name()))) { + auto datum = NArrow::TStatusValidator::GetValid(arrow::compute::Filter(batch.Datums[i], filter)); + AFL_VERIFY(datum.is_arraylike())("datum", datum.ToString())("batch", batch.Datums[i].ToString()); + batch.Datums[i] = std::move(datum); } } @@ -830,9 +858,7 @@ std::set<std::string> TProgramStep::GetColumnsInUsage() const { return result; } -NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& srcBatch, - arrow::compute::ExecContext* ctx) const -{ +NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<TProgramStep::TDatumBatch>& rb, arrow::compute::ExecContext* ctx) const { NArrow::TColumnFilter result = NArrow::TColumnFilter::BuildAllowFilter(); try { if (Steps.empty()) { @@ -843,9 +869,6 @@ NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::Rec return result; } - auto batch = srcBatch; - auto rb = TProgramStep::TDatumBatch::FromRecordBatch(batch); - if (!step->ApplyAssignes(*rb, ctx).ok()) { return result; } @@ -860,6 +883,16 @@ NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::Rec return result; } +NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + arrow::compute::ExecContext* ctx) const { + return MakeEarlyFilter(TProgramStep::TDatumBatch::FromRecordBatch(srcBatch), ctx); +} + +NArrow::TColumnFilter TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::Table>& srcBatch, + arrow::compute::ExecContext* ctx) const { + return MakeEarlyFilter(TProgramStep::TDatumBatch::FromTable(srcBatch), ctx); +} + std::set<std::string> TProgram::GetEarlyFilterColumns() const { std::set<std::string> result; if (Steps.empty()) { diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h index 329d243fc6e..6bf94b6b614 100644 --- a/ydb/core/formats/arrow/program.h +++ b/ydb/core/formats/arrow/program.h @@ -45,7 +45,8 @@ struct TDatumBatch { arrow::Status AddColumn(const std::string& name, arrow::Datum&& column); arrow::Result<arrow::Datum> GetColumnByName(const std::string& name) const; std::shared_ptr<arrow::RecordBatch> ToRecordBatch() const; - static std::shared_ptr<TDatumBatch> FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch); + static std::shared_ptr<TDatumBatch> FromRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch); + static std::shared_ptr<TDatumBatch> FromTable(const std::shared_ptr<arrow::Table>& batch); }; template <class TAssignObject> @@ -299,7 +300,11 @@ struct TProgram { std::set<std::string> GetEarlyFilterColumns() const; NArrow::TColumnFilter MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, - arrow::compute::ExecContext* ctx) const; + arrow::compute::ExecContext* ctx) const; + NArrow::TColumnFilter MakeEarlyFilter(const std::shared_ptr<arrow::Table>& batch, + arrow::compute::ExecContext* ctx) const; + + NArrow::TColumnFilter MakeEarlyFilter(const std::shared_ptr<TProgramStep::TDatumBatch>& batch, arrow::compute::ExecContext* ctx) const; }; inline arrow::Status ApplyProgram( diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h index 04b58eae0bb..22e479c46e1 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h @@ -67,7 +67,7 @@ public: STATEFN(StateFunc) { auto gTimeFull = KqpComputeActorSpan.StartStackTimeGuard("processing"); - auto gTime = KqpComputeActorSpan.StartStackTimeGuard("event_" + ::ToString(ev->GetTypeRewrite())); + auto gTime = KqpComputeActorSpan.StartStackTimeGuard("event_" + ev->GetTypeName()); try { switch (ev->GetTypeRewrite()) { hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute); @@ -83,7 +83,7 @@ public: IgnoreFunc(TEvInterconnect::TEvNodeConnected); IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); default: - StopOnError("unexpected message on data fetching: " + ::ToString(ev->GetTypeRewrite())); + StopOnError("unexpected message on data fetching: " + ev->GetTypeName()); } } catch (...) { StopOnError("unexpected exception: " + CurrentExceptionMessage()); diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp index 307ee53692f..86e04a8f68d 100644 --- a/ydb/core/tx/columnshard/engines/filter.cpp +++ b/ydb/core/tx/columnshard/engines/filter.cpp @@ -8,6 +8,93 @@ namespace NKikimr::NOlap { +template <class TArrayView> +class TChunkedArrayIterator { +private: + const arrow::ArrayVector* Chunks; + const typename TArrayView::value_type* RawView; + arrow::ArrayVector::const_iterator CurrentChunkIt; + ui32 CurrentChunkPosition = 0; +public: + TChunkedArrayIterator(const std::shared_ptr<arrow::ChunkedArray>& chunks) + { + AFL_VERIFY(!!chunks); + Chunks = &chunks->chunks(); + AFL_VERIFY(Chunks->size()); + CurrentChunkIt = Chunks->begin(); + Y_ABORT_UNLESS(chunks->type()->id() == arrow::TypeTraits<typename TArrayView::TypeClass>::type_singleton()->id()); + if (IsValid()) { + RawView = std::static_pointer_cast<TArrayView>(*CurrentChunkIt)->raw_values(); + } + } + + bool IsValid() const { + return CurrentChunkIt != Chunks->end() && CurrentChunkPosition < (*CurrentChunkIt)->length(); + } + + typename TArrayView::value_type GetValue() const { + AFL_VERIFY_DEBUG(IsValid()); + return RawView[CurrentChunkPosition]; + } + + bool Next() { + AFL_VERIFY_DEBUG(IsValid()); + ++CurrentChunkPosition; + while (CurrentChunkIt != Chunks->end() && (*CurrentChunkIt)->length() == CurrentChunkPosition) { + if (++CurrentChunkIt != Chunks->end()) { + CurrentChunkPosition = 0; + RawView = std::static_pointer_cast<TArrayView>(*CurrentChunkIt)->raw_values(); + } + } + return CurrentChunkIt != Chunks->end(); + } +}; + +class TTableSnapshotGetter { +private: + const TSnapshot Snapshot; + mutable TChunkedArrayIterator<arrow::UInt64Array> Steps; + mutable TChunkedArrayIterator<arrow::UInt64Array> Ids; + mutable i64 CurrentIdx = -1; +public: + TTableSnapshotGetter(const std::shared_ptr<arrow::ChunkedArray>& steps, const std::shared_ptr<arrow::ChunkedArray>& ids, const TSnapshot& snapshot) + : Snapshot(snapshot) + , Steps(steps) + , Ids(ids) + { + Y_ABORT_UNLESS(steps->length() == ids->length()); + } + + bool operator[](const ui32 idx) const { + AFL_VERIFY(CurrentIdx + 1 == idx)("current_idx", CurrentIdx)("idx", idx); + CurrentIdx = idx; + const bool result = std::less_equal<TSnapshot>()(TSnapshot(Steps.GetValue(), Ids.GetValue()), Snapshot); + const bool sNext = Steps.Next(); + const bool idNext = Ids.Next(); + AFL_VERIFY(sNext == idNext); + if (!idNext) { + CurrentIdx = -2; + } + return result; + } +}; + +template <class TGetter, class TData> +NArrow::TColumnFilter MakeSnapshotFilterImpl(const std::shared_ptr<TData>& batch, const TSnapshot& snapshot) { + Y_ABORT_UNLESS(batch); + auto steps = batch->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); + auto ids = batch->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); + NArrow::TColumnFilter result = NArrow::TColumnFilter::BuildAllowFilter(); + TGetter getter(steps, ids, snapshot); + result.Reset(steps->length(), std::move(getter)); + return result; +} + +NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::Table>& batch, + const TSnapshot& snapshot) { + return MakeSnapshotFilterImpl<TTableSnapshotGetter>(batch, snapshot); +} + class TSnapshotGetter { private: const arrow::UInt64Array::value_type* RawSteps; @@ -32,34 +119,28 @@ public: }; NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::Schema>& snapSchema, const TSnapshot& snapshot) { - Y_ABORT_UNLESS(batch); - Y_ABORT_UNLESS(snapSchema); - Y_ABORT_UNLESS(snapSchema->num_fields() == 2); - auto steps = batch->GetColumnByName(snapSchema->fields()[0]->name()); - auto ids = batch->GetColumnByName(snapSchema->fields()[1]->name()); - NArrow::TColumnFilter result = NArrow::TColumnFilter::BuildAllowFilter(); - TSnapshotGetter getter(steps, ids, snapshot); - result.Reset(steps->length(), std::move(getter)); - return result; + return MakeSnapshotFilterImpl<TSnapshotGetter>(batch, snapshot); } -NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata, const bool useSnapshotFilter) { +NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::Table>& portion, const TReadMetadata& readMetadata, const bool useSnapshotFilter) { Y_ABORT_UNLESS(portion); NArrow::TColumnFilter result = readMetadata.GetPKRangesFilter().BuildFilter(portion); if (readMetadata.GetSnapshot().GetPlanStep() && useSnapshotFilter) { - auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); - result = result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot())); + result = result.And(MakeSnapshotFilter(portion, readMetadata.GetSnapshot())); } return result; } -NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata) { +NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::Table>& batch, const TReadMetadata& readMetadata) { return readMetadata.GetPKRangesFilter().BuildFilter(batch); } +NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::Table>& batch, std::shared_ptr<NSsa::TProgram> ssa) { + return ssa->MakeEarlyFilter(batch, NArrow::GetCustomExecContext()); +} + NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa) { return ssa->MakeEarlyFilter(batch, NArrow::GetCustomExecContext()); } diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h index 7eaab200ff0..34f3913de45 100644 --- a/ydb/core/tx/columnshard/engines/filter.h +++ b/ydb/core/tx/columnshard/engines/filter.h @@ -6,13 +6,13 @@ namespace NKikimr::NOlap { -NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::shared_ptr<arrow::Schema>& snapSchema, - const TSnapshot& snapshot); +NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot); +NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::Table>& batch, const TSnapshot& snapshot); struct TReadMetadata; -NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata, const bool useSnapshotFilter); -NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); +NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::Table>& batch, const TReadMetadata& readMetadata, const bool useSnapshotFilter); +NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::Table>& batch, const TReadMetadata& readMetadata); +NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::Table>& batch, std::shared_ptr<NSsa::TProgram> ssa); NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa); } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/predicate/container.h b/ydb/core/tx/columnshard/engines/predicate/container.h index 0606335cc9e..e485cc2ddae 100644 --- a/ydb/core/tx/columnshard/engines/predicate/container.h +++ b/ydb/core/tx/columnshard/engines/predicate/container.h @@ -63,7 +63,7 @@ public: static std::optional<TPredicateContainer> BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo); - NKikimr::NArrow::TColumnFilter BuildFilter(const std::shared_ptr<arrow::RecordBatch>& data) const { + NKikimr::NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const { if (!Object) { return NArrow::TColumnFilter::BuildAllowFilter(); } diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.cpp b/ydb/core/tx/columnshard/engines/predicate/filter.cpp index cdbff1d15f4..0a048fada45 100644 --- a/ydb/core/tx/columnshard/engines/predicate/filter.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/filter.cpp @@ -3,7 +3,7 @@ namespace NKikimr::NOlap { -NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const { +NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(const arrow::Datum& data) const { if (SortedRanges.empty()) { return NArrow::TColumnFilter::BuildAllowFilter(); } diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.h b/ydb/core/tx/columnshard/engines/predicate/filter.h index 45b64a91495..307891fec3b 100644 --- a/ydb/core/tx/columnshard/engines/predicate/filter.h +++ b/ydb/core/tx/columnshard/engines/predicate/filter.h @@ -35,7 +35,7 @@ public: bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const; - NArrow::TColumnFilter BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const; + NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const; bool Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const TIndexInfo* indexInfo) Y_WARN_UNUSED_RESULT; diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp index a645c1444c0..bc526f4bbfe 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp @@ -34,7 +34,7 @@ std::set<std::string> TPKRangeFilter::GetColumnNames() const { return result; } -NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const { +NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(const arrow::Datum& data) const { NArrow::TColumnFilter result = PredicateTo.BuildFilter(data); return result.And(PredicateFrom.BuildFilter(data)); } diff --git a/ydb/core/tx/columnshard/engines/predicate/range.h b/ydb/core/tx/columnshard/engines/predicate/range.h index 6c154bb5736..c124567066b 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.h +++ b/ydb/core/tx/columnshard/engines/predicate/range.h @@ -34,7 +34,7 @@ public: static std::optional<TPKRangeFilter> Build(TPredicateContainer&& from, TPredicateContainer&& to); - NArrow::TColumnFilter BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const; + NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const; bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp index daf0dfaf5a8..c5a24699809 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp @@ -13,12 +13,12 @@ bool TAssembleBatch::DoExecute() { Y_ABORT_UNLESS(batchConstructor.GetColumnsCount()); TPortionInfo::TPreparedBatchData::TAssembleOptions options; - auto addBatch = batchConstructor.Assemble(options); + auto addBatch = batchConstructor.AssembleTable(options); Y_ABORT_UNLESS(addBatch); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) ("columns_count", addBatch->num_columns())("num_rows", addBatch->num_rows()); Filter->Apply(addBatch); - Result = addBatch; + Result = NArrow::ToBatch(addBatch, true); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp index 1c88f99aa96..6bb0ccac738 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp @@ -12,7 +12,7 @@ bool TAssembleFilter::DoExecute() { TPortionInfo::TPreparedBatchData::TAssembleOptions options; options.IncludedColumnIds = FilterColumnIds; - if (RecordsMaxSnapshot < ReadMetadata->GetSnapshot() && UseFilter) { + if (RecordsMaxSnapshot <= ReadMetadata->GetSnapshot() && UseFilter) { for (auto&& i : TIndexInfo::GetSpecialColumnIds()) { options.IncludedColumnIds->erase(i); } @@ -21,19 +21,19 @@ bool TAssembleFilter::DoExecute() { auto batchConstructor = BuildBatchConstructor(FilterColumnIds); - auto batch = batchConstructor.Assemble(options); + auto batch = batchConstructor.AssembleTable(options); Y_ABORT_UNLESS(batch); Y_ABORT_UNLESS(batch->num_rows()); - if (RecordsMaxSnapshot < ReadMetadata->GetSnapshot() && UseFilter) { + if (RecordsMaxSnapshot <= ReadMetadata->GetSnapshot() && UseFilter) { for (auto&& f : TIndexInfo::ArrowSchemaSnapshot()->fields()) { auto c = NArrow::TStatusValidator::GetValid(arrow::MakeArrayOfNull(f->type(), batch->num_rows())); - batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), f, c)); + batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), f, std::make_shared<arrow::ChunkedArray>(c))); } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "restore_fake_special_columns"); } OriginalCount = batch->num_rows(); - AppliedFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata, ReadMetadata->GetSnapshot() <= RecordsMaxSnapshot)); + AppliedFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata, ReadMetadata->GetSnapshot() < RecordsMaxSnapshot)); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "first_filter_using"); if (!AppliedFilter->Apply(batch)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount)("columns_count", FilterColumnIds.size()); @@ -55,7 +55,7 @@ bool TAssembleFilter::DoExecute() { if ((size_t)batch->schema()->num_fields() < batchConstructor.GetColumnsCount()) { TPortionInfo::TPreparedBatchData::TAssembleOptions options; options.ExcludedColumnIds = FilterColumnIds; - auto addBatch = batchConstructor.Assemble(options); + auto addBatch = batchConstructor.AssembleTable(options); Y_ABORT_UNLESS(addBatch); Y_ABORT_UNLESS(AppliedFilter->Apply(addBatch)); Y_ABORT_UNLESS(NArrow::MergeBatchColumns({ batch, addBatch }, batch, batchConstructor.GetSchemaColumnNames(), true)); @@ -65,7 +65,7 @@ bool TAssembleFilter::DoExecute() { ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", batchConstructor.GetColumnsCount())("use_filter", UseFilter) ("filter_columns", FilterColumnIds.size())("af_count", AppliedFilter->Size())("ef_count", earlyFilter ? earlyFilter->Size() : 0); - FilteredBatch = batch; + FilteredBatch = NArrow::ToBatch(batch, true); return true; } diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp index 1092468712c..2c76e652002 100644 --- a/ydb/core/tx/program/program.cpp +++ b/ydb/core/tx/program/program.cpp @@ -419,7 +419,14 @@ bool TProgramContainer::HasProgram() const { return !!Program; } -std::shared_ptr<NArrow::TColumnFilter> TProgramContainer::BuildEarlyFilter(std::shared_ptr<arrow::RecordBatch> batch) const { +std::shared_ptr<NArrow::TColumnFilter> TProgramContainer::BuildEarlyFilter(const std::shared_ptr<arrow::Table>& batch) const { + if (Program) { + return std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Program)); + } + return nullptr; +} + +std::shared_ptr<NArrow::TColumnFilter> TProgramContainer::BuildEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch) const { if (Program) { return std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Program)); } diff --git a/ydb/core/tx/program/program.h b/ydb/core/tx/program/program.h index d3d562be5a1..245aebed195 100644 --- a/ydb/core/tx/program/program.h +++ b/ydb/core/tx/program/program.h @@ -35,7 +35,8 @@ public: const THashMap<ui32, TString>& GetSourceColumns() const; bool HasProgram() const; - std::shared_ptr<NArrow::TColumnFilter> BuildEarlyFilter(std::shared_ptr<arrow::RecordBatch> batch) const; + std::shared_ptr<NArrow::TColumnFilter> BuildEarlyFilter(const std::shared_ptr<arrow::Table>& batch) const; + std::shared_ptr<NArrow::TColumnFilter> BuildEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch) const; std::set<std::string> GetEarlyFilterColumns() const; bool HasEarlyFilterOnly() const; |