diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-04-08 13:54:26 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-08 13:54:26 +0300 |
commit | fd8b9bfe0d656aa9e8897ab6a844e6167ba3e45f (patch) | |
tree | f43b6fd367227a58a86fddfe96933f616715f3e9 | |
parent | c40c499ee8e44d120955fa02fb5e547d99573898 (diff) | |
download | ydb-fd8b9bfe0d656aa9e8897ab6a844e6167ba3e45f.tar.gz |
fix array access for merge on kqp (#3540)
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.cpp | 25 | ||||
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.h | 3 | ||||
-rw-r--r-- | ydb/core/formats/arrow/common/accessor.cpp | 9 | ||||
-rw-r--r-- | ydb/core/formats/arrow/common/accessor.h | 18 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_events.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.cpp | 79 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portions/portion_info.cpp | 5 |
7 files changed, 70 insertions, 71 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index aaf8cbb01a2..b71c9a342a8 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -296,30 +296,18 @@ std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr< return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns)); } -std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) { - auto res = arrow::Table::FromRecordBatches(batches); - if (!res.ok()) { - return nullptr; - } - - res = (*res)->CombineChunks(); - if (!res.ok()) { - return nullptr; - } - - return res.ValueOrDie(); -} - std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) { if (batches.empty()) { return nullptr; } - auto table = CombineInTable(batches); - return table ? ToBatch(table) : nullptr; + auto table = TStatusValidator::GetValid(arrow::Table::FromRecordBatches(batches)); + return table ? ToBatch(table, true) : nullptr; } std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& tableExt, const bool combine) { - Y_ABORT_UNLESS(tableExt); + if (!tableExt) { + return nullptr; + } std::shared_ptr<arrow::Table> table; if (combine) { auto res = tableExt->CombineChunks(); @@ -331,7 +319,8 @@ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& std::vector<std::shared_ptr<arrow::Array>> columns; columns.reserve(table->num_columns()); for (auto& col : table->columns()) { - Y_ABORT_UNLESS(col->num_chunks() == 1); + AFL_VERIFY(col->num_chunks() == 1)("size", col->num_chunks())("size_bytes", GetTableDataSize(tableExt)) + ("schema", tableExt->schema()->ToString())("size_new", GetTableDataSize(table)); columns.push_back(col->chunk(0)); } return arrow::RecordBatch::Make(table->schema(), table->num_rows(), columns); diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index 05d0e85263f..f4871e126e0 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -79,8 +79,7 @@ inline std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shar return ExtractExistedColumns(srcBatch, dstSchema->fields()); } -std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); -std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine = false); +std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine); std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb); std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared_ptr<arrow::RecordBatch>& batch, diff --git a/ydb/core/formats/arrow/common/accessor.cpp b/ydb/core/formats/arrow/common/accessor.cpp index c2465ca0a85..450c3f40cf4 100644 --- a/ydb/core/formats/arrow/common/accessor.cpp +++ b/ydb/core/formats/arrow/common/accessor.cpp @@ -56,6 +56,15 @@ std::partial_ordering IChunkedArray::TReader::CompareColumns(const std::vector<T return std::partial_ordering::equivalent; } +IChunkedArray::TAddress IChunkedArray::TReader::GetReadChunk(const ui64 position) const { + AFL_VERIFY(position < ChunkedArray->GetRecordsCount()); + if (CurrentChunkAddress && position < CurrentChunkAddress->GetStartPosition() + CurrentChunkAddress->GetArray()->length() && CurrentChunkAddress->GetStartPosition() <= position) { + } else { + CurrentChunkAddress = ChunkedArray->DoGetChunk(CurrentChunkAddress, position); + } + return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition(), CurrentChunkAddress->GetChunkIndex()); +} + const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& item) const { return TComparator::TypedCompare<true>(*Array, Position, *item.Array, item.Position); } diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h index 3ed449c12c5..56cf25096e7 100644 --- a/ydb/core/formats/arrow/common/accessor.h +++ b/ydb/core/formats/arrow/common/accessor.h @@ -48,10 +48,13 @@ public: private: YDB_READONLY_DEF(std::shared_ptr<arrow::Array>, Array); YDB_READONLY(ui64, Position, 0); + YDB_READONLY(ui64, ChunkIdx, 0); public: - TAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 position) + TAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 position, const ui64 chunkIdx) : Array(arr) - , Position(position) { + , Position(position) + , ChunkIdx(chunkIdx) + { } @@ -126,16 +129,7 @@ public: return ChunkedArray->GetRecordsCount(); } - TAddress GetReadChunk(const ui64 position) const { - AFL_VERIFY(position < ChunkedArray->GetRecordsCount()); - if (CurrentChunkAddress && position < CurrentChunkAddress->GetStartPosition() + CurrentChunkAddress->GetArray()->length() && CurrentChunkAddress->GetStartPosition() <= position) { - return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition()); - } else { - CurrentChunkAddress = ChunkedArray->DoGetChunk(CurrentChunkAddress, position); - return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition()); - } - } - + TAddress GetReadChunk(const ui64 position) const; static std::partial_ordering CompareColumns(const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition); void AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const; std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const; diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h index 3efb920bf2a..6092c4a1a37 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_events.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h @@ -162,7 +162,7 @@ struct TEvKqpCompute { Y_DEBUG_ABORT_UNLESS(ArrowBatch != nullptr); auto* protoArrowBatch = Remote->Record.MutableArrowBatch(); protoArrowBatch->SetSchema(NArrow::SerializeSchema(*ArrowBatch->schema())); - protoArrowBatch->SetBatch(NArrow::SerializeBatchNoCompression(NArrow::ToBatch(ArrowBatch))); + protoArrowBatch->SetBatch(NArrow::SerializeBatchNoCompression(NArrow::ToBatch(ArrowBatch, true))); break; } } diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index 985967b70b7..846a2f5eb75 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -3,6 +3,7 @@ #include <ydb/core/engine/minikql/minikql_engine_host.h> #include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/core/scheme/scheme_types_proto.h> +#include <ydb/core/formats/arrow/common/accessor.h> #include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/library/yql/minikql/mkql_string_util.h> @@ -215,11 +216,11 @@ template <class TArrayTypeExt, class TValueType = typename TArrayTypeExt::value_ class TElementAccessor { public: using TArrayType = TArrayTypeExt; - static NYql::NUdf::TUnboxedValue ExtractValue(TArrayType& array, const ui32 rowIndex) { + static NYql::NUdf::TUnboxedValue ExtractValue(const TArrayType& array, const ui32 rowIndex) { return NUdf::TUnboxedValuePod(static_cast<TValueType>(array.Value(rowIndex))); } - static void Validate(TArrayType& /*array*/) { + static void Validate(const TArrayType& /*array*/) { } @@ -232,13 +233,13 @@ template <> class TElementAccessor<arrow::Decimal128Array, NYql::NDecimal::TInt128> { public: using TArrayType = arrow::Decimal128Array; - static void Validate(arrow::Decimal128Array& array) { + static void Validate(const arrow::Decimal128Array& array) { const auto& type = arrow::internal::checked_cast<const arrow::Decimal128Type&>(*array.type()); YQL_ENSURE(type.precision() == NScheme::DECIMAL_PRECISION, "Unsupported Decimal precision."); YQL_ENSURE(type.scale() == NScheme::DECIMAL_SCALE, "Unsupported Decimal scale."); } - static NYql::NUdf::TUnboxedValue ExtractValue(arrow::Decimal128Array& array, const ui32 rowIndex) { + static NYql::NUdf::TUnboxedValue ExtractValue(const arrow::Decimal128Array& array, const ui32 rowIndex) { auto data = array.GetView(rowIndex); YQL_ENSURE(data.size() == sizeof(NYql::NDecimal::TInt128), "Wrong data size"); NYql::NDecimal::TInt128 val; @@ -254,10 +255,10 @@ template <> class TElementAccessor<arrow::BinaryArray, NUdf::TStringRef> { public: using TArrayType = arrow::BinaryArray; - static void Validate(arrow::BinaryArray& /*array*/) { + static void Validate(const arrow::BinaryArray& /*array*/) { } - static NYql::NUdf::TUnboxedValue ExtractValue(arrow::BinaryArray& array, const ui32 rowIndex) { + static NYql::NUdf::TUnboxedValue ExtractValue(const arrow::BinaryArray& array, const ui32 rowIndex) { auto data = array.GetView(rowIndex); return MakeString(NUdf::TStringRef(data.data(), data.size())); } @@ -270,10 +271,10 @@ template <> class TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TStringRef> { public: using TArrayType = arrow::FixedSizeBinaryArray; - static void Validate(arrow::FixedSizeBinaryArray& /*array*/) { + static void Validate(const arrow::FixedSizeBinaryArray& /*array*/) { } - static NYql::NUdf::TUnboxedValue ExtractValue(arrow::FixedSizeBinaryArray& array, const ui32 rowIndex) { + static NYql::NUdf::TUnboxedValue ExtractValue(const arrow::FixedSizeBinaryArray& array, const ui32 rowIndex) { auto data = array.GetView(rowIndex); return MakeString(NUdf::TStringRef(data.data(), data.size() - 1)); } @@ -286,33 +287,44 @@ public: template <class TElementAccessor, class TAccessor> TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor, - const TBatchDataAccessor& batch, const ui32 columnIndex, arrow::ChunkedArray* chunkedArrayExt, NScheme::TTypeInfo columnType) { + const TBatchDataAccessor& batch, const ui32 columnIndex, const std::shared_ptr<arrow::ChunkedArray>& chunkedArrayExt, NScheme::TTypeInfo columnType) { auto statAccumulator = TElementAccessor::BuildStatAccumulator(columnType); - for (auto&& i : chunkedArrayExt->chunks()) { - auto& arrayExt = *i; - auto& array = *reinterpret_cast<typename TElementAccessor::TArrayType*>(&arrayExt); - TElementAccessor::Validate(array); - - const auto applyToIndex = [&](const ui32 rowIndexFrom, const ui32 rowIndexTo) { - auto& rowItem = editAccessor(rowIndexTo, columnIndex); - if (array.IsNull(rowIndexFrom)) { - statAccumulator.AddNull(); - rowItem = NUdf::TUnboxedValue(); - } else { - rowItem = TElementAccessor::ExtractValue(array, rowIndexFrom); - statAccumulator.AddValue(rowItem); - } - }; - if (batch.HasDataIndexes()) { - ui32 idx = 0; - for (const i64 rowIndex : batch.GetDataIndexes()) { - applyToIndex(rowIndex, idx++); - } + auto trivialChunkedArray = std::make_shared<NArrow::NAccessor::TTrivialChunkedArray>(chunkedArrayExt); + NArrow::NAccessor::IChunkedArray::TReader reader(trivialChunkedArray); + + std::optional<ui32> chunkIdx; + const auto applyToIndex = [&](const ui32 rowIndexFrom, const ui32 rowIndexTo) { + auto address = reader.GetReadChunk(rowIndexFrom); + auto* currentArray = static_cast<const typename TElementAccessor::TArrayType*>(address.GetArray().get()); + + if (!chunkIdx || *chunkIdx != address.GetChunkIdx()) { + TElementAccessor::Validate(*currentArray); + chunkIdx = address.GetChunkIdx(); + } + + auto& rowItem = editAccessor(rowIndexTo, columnIndex); + if (currentArray->IsNull(address.GetPosition())) { + statAccumulator.AddNull(); + rowItem = NUdf::TUnboxedValue(); } else { - for (i64 rowIndex = 0; rowIndex < batch.GetRecordsCount(); ++rowIndex) { - applyToIndex(rowIndex, rowIndex); - } + rowItem = TElementAccessor::ExtractValue(*currentArray, address.GetPosition()); + statAccumulator.AddValue(rowItem); + } + }; + + if (batch.HasDataIndexes()) { + ui32 idx = 0; + std::map<ui64, ui64> remapIndexes; + for (const i64 rowIndex : batch.GetDataIndexes()) { + YQL_ENSURE(remapIndexes.emplace(rowIndex, idx++).second); + } + for (auto&& i : remapIndexes) { + applyToIndex(i.first, i.second); + } + } else { + for (i64 rowIndex = 0; rowIndex < batch.GetRecordsCount(); ++rowIndex) { + applyToIndex(rowIndex, rowIndex); } } return statAccumulator.Finish(); @@ -322,8 +334,7 @@ TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor, template <class TAccessor> TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor, const TBatchDataAccessor& batch, i64 columnIndex, NScheme::TTypeInfo columnType) { - std::shared_ptr<arrow::ChunkedArray> columnSharedPtr = batch.GetBatch()->column(columnIndex); - arrow::ChunkedArray* columnPtr = columnSharedPtr.get(); + const std::shared_ptr<arrow::ChunkedArray> columnPtr = batch.GetBatch()->column(columnIndex); namespace NTypeIds = NScheme::NTypeIds; switch (columnType.GetTypeId()) { case NTypeIds::Bool: diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 4a0cc425e28..1d61b6db712 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -777,10 +777,7 @@ std::shared_ptr<arrow::Table> TPortionInfo::TPreparedBatchData::AssembleTable(co } std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { - auto table = AssembleTable(options); - auto res = table->CombineChunks(); - Y_ABORT_UNLESS(res.ok()); - return NArrow::ToBatch(*res); + return NArrow::ToBatch(AssembleTable(options), true); } } |