aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-04-08 13:54:26 +0300
committerGitHub <noreply@github.com>2024-04-08 13:54:26 +0300
commitfd8b9bfe0d656aa9e8897ab6a844e6167ba3e45f (patch)
treef43b6fd367227a58a86fddfe96933f616715f3e9
parentc40c499ee8e44d120955fa02fb5e547d99573898 (diff)
downloadydb-fd8b9bfe0d656aa9e8897ab6a844e6167ba3e45f.tar.gz
fix array access for merge on kqp (#3540)
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp25
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.h3
-rw-r--r--ydb/core/formats/arrow/common/accessor.cpp9
-rw-r--r--ydb/core/formats/arrow/common/accessor.h18
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_events.h2
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp79
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp5
7 files changed, 70 insertions, 71 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index aaf8cbb01a2..b71c9a342a8 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -296,30 +296,18 @@ std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr<
return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns));
}
-std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
- auto res = arrow::Table::FromRecordBatches(batches);
- if (!res.ok()) {
- return nullptr;
- }
-
- res = (*res)->CombineChunks();
- if (!res.ok()) {
- return nullptr;
- }
-
- return res.ValueOrDie();
-}
-
std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
if (batches.empty()) {
return nullptr;
}
- auto table = CombineInTable(batches);
- return table ? ToBatch(table) : nullptr;
+ auto table = TStatusValidator::GetValid(arrow::Table::FromRecordBatches(batches));
+ return table ? ToBatch(table, true) : nullptr;
}
std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& tableExt, const bool combine) {
- Y_ABORT_UNLESS(tableExt);
+ if (!tableExt) {
+ return nullptr;
+ }
std::shared_ptr<arrow::Table> table;
if (combine) {
auto res = tableExt->CombineChunks();
@@ -331,7 +319,8 @@ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>&
std::vector<std::shared_ptr<arrow::Array>> columns;
columns.reserve(table->num_columns());
for (auto& col : table->columns()) {
- Y_ABORT_UNLESS(col->num_chunks() == 1);
+ AFL_VERIFY(col->num_chunks() == 1)("size", col->num_chunks())("size_bytes", GetTableDataSize(tableExt))
+ ("schema", tableExt->schema()->ToString())("size_new", GetTableDataSize(table));
columns.push_back(col->chunk(0));
}
return arrow::RecordBatch::Make(table->schema(), table->num_rows(), columns);
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index 05d0e85263f..f4871e126e0 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -79,8 +79,7 @@ inline std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shar
return ExtractExistedColumns(srcBatch, dstSchema->fields());
}
-std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
-std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine = false);
+std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable, const bool combine);
std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& rb);
std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared_ptr<arrow::RecordBatch>& batch,
diff --git a/ydb/core/formats/arrow/common/accessor.cpp b/ydb/core/formats/arrow/common/accessor.cpp
index c2465ca0a85..450c3f40cf4 100644
--- a/ydb/core/formats/arrow/common/accessor.cpp
+++ b/ydb/core/formats/arrow/common/accessor.cpp
@@ -56,6 +56,15 @@ std::partial_ordering IChunkedArray::TReader::CompareColumns(const std::vector<T
return std::partial_ordering::equivalent;
}
+IChunkedArray::TAddress IChunkedArray::TReader::GetReadChunk(const ui64 position) const {
+ AFL_VERIFY(position < ChunkedArray->GetRecordsCount());
+ if (CurrentChunkAddress && position < CurrentChunkAddress->GetStartPosition() + CurrentChunkAddress->GetArray()->length() && CurrentChunkAddress->GetStartPosition() <= position) {
+ } else {
+ CurrentChunkAddress = ChunkedArray->DoGetChunk(CurrentChunkAddress, position);
+ }
+ return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition(), CurrentChunkAddress->GetChunkIndex());
+}
+
const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& item) const {
return TComparator::TypedCompare<true>(*Array, Position, *item.Array, item.Position);
}
diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h
index 3ed449c12c5..56cf25096e7 100644
--- a/ydb/core/formats/arrow/common/accessor.h
+++ b/ydb/core/formats/arrow/common/accessor.h
@@ -48,10 +48,13 @@ public:
private:
YDB_READONLY_DEF(std::shared_ptr<arrow::Array>, Array);
YDB_READONLY(ui64, Position, 0);
+ YDB_READONLY(ui64, ChunkIdx, 0);
public:
- TAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 position)
+ TAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 position, const ui64 chunkIdx)
: Array(arr)
- , Position(position) {
+ , Position(position)
+ , ChunkIdx(chunkIdx)
+ {
}
@@ -126,16 +129,7 @@ public:
return ChunkedArray->GetRecordsCount();
}
- TAddress GetReadChunk(const ui64 position) const {
- AFL_VERIFY(position < ChunkedArray->GetRecordsCount());
- if (CurrentChunkAddress && position < CurrentChunkAddress->GetStartPosition() + CurrentChunkAddress->GetArray()->length() && CurrentChunkAddress->GetStartPosition() <= position) {
- return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition());
- } else {
- CurrentChunkAddress = ChunkedArray->DoGetChunk(CurrentChunkAddress, position);
- return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition());
- }
- }
-
+ TAddress GetReadChunk(const ui64 position) const;
static std::partial_ordering CompareColumns(const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition);
void AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const;
std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h
index 3efb920bf2a..6092c4a1a37 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h
@@ -162,7 +162,7 @@ struct TEvKqpCompute {
Y_DEBUG_ABORT_UNLESS(ArrowBatch != nullptr);
auto* protoArrowBatch = Remote->Record.MutableArrowBatch();
protoArrowBatch->SetSchema(NArrow::SerializeSchema(*ArrowBatch->schema()));
- protoArrowBatch->SetBatch(NArrow::SerializeBatchNoCompression(NArrow::ToBatch(ArrowBatch)));
+ protoArrowBatch->SetBatch(NArrow::SerializeBatchNoCompression(NArrow::ToBatch(ArrowBatch, true)));
break;
}
}
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index 985967b70b7..846a2f5eb75 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/scheme/scheme_types_proto.h>
+#include <ydb/core/formats/arrow/common/accessor.h>
#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
@@ -215,11 +216,11 @@ template <class TArrayTypeExt, class TValueType = typename TArrayTypeExt::value_
class TElementAccessor {
public:
using TArrayType = TArrayTypeExt;
- static NYql::NUdf::TUnboxedValue ExtractValue(TArrayType& array, const ui32 rowIndex) {
+ static NYql::NUdf::TUnboxedValue ExtractValue(const TArrayType& array, const ui32 rowIndex) {
return NUdf::TUnboxedValuePod(static_cast<TValueType>(array.Value(rowIndex)));
}
- static void Validate(TArrayType& /*array*/) {
+ static void Validate(const TArrayType& /*array*/) {
}
@@ -232,13 +233,13 @@ template <>
class TElementAccessor<arrow::Decimal128Array, NYql::NDecimal::TInt128> {
public:
using TArrayType = arrow::Decimal128Array;
- static void Validate(arrow::Decimal128Array& array) {
+ static void Validate(const arrow::Decimal128Array& array) {
const auto& type = arrow::internal::checked_cast<const arrow::Decimal128Type&>(*array.type());
YQL_ENSURE(type.precision() == NScheme::DECIMAL_PRECISION, "Unsupported Decimal precision.");
YQL_ENSURE(type.scale() == NScheme::DECIMAL_SCALE, "Unsupported Decimal scale.");
}
- static NYql::NUdf::TUnboxedValue ExtractValue(arrow::Decimal128Array& array, const ui32 rowIndex) {
+ static NYql::NUdf::TUnboxedValue ExtractValue(const arrow::Decimal128Array& array, const ui32 rowIndex) {
auto data = array.GetView(rowIndex);
YQL_ENSURE(data.size() == sizeof(NYql::NDecimal::TInt128), "Wrong data size");
NYql::NDecimal::TInt128 val;
@@ -254,10 +255,10 @@ template <>
class TElementAccessor<arrow::BinaryArray, NUdf::TStringRef> {
public:
using TArrayType = arrow::BinaryArray;
- static void Validate(arrow::BinaryArray& /*array*/) {
+ static void Validate(const arrow::BinaryArray& /*array*/) {
}
- static NYql::NUdf::TUnboxedValue ExtractValue(arrow::BinaryArray& array, const ui32 rowIndex) {
+ static NYql::NUdf::TUnboxedValue ExtractValue(const arrow::BinaryArray& array, const ui32 rowIndex) {
auto data = array.GetView(rowIndex);
return MakeString(NUdf::TStringRef(data.data(), data.size()));
}
@@ -270,10 +271,10 @@ template <>
class TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TStringRef> {
public:
using TArrayType = arrow::FixedSizeBinaryArray;
- static void Validate(arrow::FixedSizeBinaryArray& /*array*/) {
+ static void Validate(const arrow::FixedSizeBinaryArray& /*array*/) {
}
- static NYql::NUdf::TUnboxedValue ExtractValue(arrow::FixedSizeBinaryArray& array, const ui32 rowIndex) {
+ static NYql::NUdf::TUnboxedValue ExtractValue(const arrow::FixedSizeBinaryArray& array, const ui32 rowIndex) {
auto data = array.GetView(rowIndex);
return MakeString(NUdf::TStringRef(data.data(), data.size() - 1));
}
@@ -286,33 +287,44 @@ public:
template <class TElementAccessor, class TAccessor>
TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor,
- const TBatchDataAccessor& batch, const ui32 columnIndex, arrow::ChunkedArray* chunkedArrayExt, NScheme::TTypeInfo columnType) {
+ const TBatchDataAccessor& batch, const ui32 columnIndex, const std::shared_ptr<arrow::ChunkedArray>& chunkedArrayExt, NScheme::TTypeInfo columnType) {
auto statAccumulator = TElementAccessor::BuildStatAccumulator(columnType);
- for (auto&& i : chunkedArrayExt->chunks()) {
- auto& arrayExt = *i;
- auto& array = *reinterpret_cast<typename TElementAccessor::TArrayType*>(&arrayExt);
- TElementAccessor::Validate(array);
-
- const auto applyToIndex = [&](const ui32 rowIndexFrom, const ui32 rowIndexTo) {
- auto& rowItem = editAccessor(rowIndexTo, columnIndex);
- if (array.IsNull(rowIndexFrom)) {
- statAccumulator.AddNull();
- rowItem = NUdf::TUnboxedValue();
- } else {
- rowItem = TElementAccessor::ExtractValue(array, rowIndexFrom);
- statAccumulator.AddValue(rowItem);
- }
- };
- if (batch.HasDataIndexes()) {
- ui32 idx = 0;
- for (const i64 rowIndex : batch.GetDataIndexes()) {
- applyToIndex(rowIndex, idx++);
- }
+ auto trivialChunkedArray = std::make_shared<NArrow::NAccessor::TTrivialChunkedArray>(chunkedArrayExt);
+ NArrow::NAccessor::IChunkedArray::TReader reader(trivialChunkedArray);
+
+ std::optional<ui32> chunkIdx;
+ const auto applyToIndex = [&](const ui32 rowIndexFrom, const ui32 rowIndexTo) {
+ auto address = reader.GetReadChunk(rowIndexFrom);
+ auto* currentArray = static_cast<const typename TElementAccessor::TArrayType*>(address.GetArray().get());
+
+ if (!chunkIdx || *chunkIdx != address.GetChunkIdx()) {
+ TElementAccessor::Validate(*currentArray);
+ chunkIdx = address.GetChunkIdx();
+ }
+
+ auto& rowItem = editAccessor(rowIndexTo, columnIndex);
+ if (currentArray->IsNull(address.GetPosition())) {
+ statAccumulator.AddNull();
+ rowItem = NUdf::TUnboxedValue();
} else {
- for (i64 rowIndex = 0; rowIndex < batch.GetRecordsCount(); ++rowIndex) {
- applyToIndex(rowIndex, rowIndex);
- }
+ rowItem = TElementAccessor::ExtractValue(*currentArray, address.GetPosition());
+ statAccumulator.AddValue(rowItem);
+ }
+ };
+
+ if (batch.HasDataIndexes()) {
+ ui32 idx = 0;
+ std::map<ui64, ui64> remapIndexes;
+ for (const i64 rowIndex : batch.GetDataIndexes()) {
+ YQL_ENSURE(remapIndexes.emplace(rowIndex, idx++).second);
+ }
+ for (auto&& i : remapIndexes) {
+ applyToIndex(i.first, i.second);
+ }
+ } else {
+ for (i64 rowIndex = 0; rowIndex < batch.GetRecordsCount(); ++rowIndex) {
+ applyToIndex(rowIndex, rowIndex);
}
}
return statAccumulator.Finish();
@@ -322,8 +334,7 @@ TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor,
template <class TAccessor>
TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor,
const TBatchDataAccessor& batch, i64 columnIndex, NScheme::TTypeInfo columnType) {
- std::shared_ptr<arrow::ChunkedArray> columnSharedPtr = batch.GetBatch()->column(columnIndex);
- arrow::ChunkedArray* columnPtr = columnSharedPtr.get();
+ const std::shared_ptr<arrow::ChunkedArray> columnPtr = batch.GetBatch()->column(columnIndex);
namespace NTypeIds = NScheme::NTypeIds;
switch (columnType.GetTypeId()) {
case NTypeIds::Bool:
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 4a0cc425e28..1d61b6db712 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -777,10 +777,7 @@ std::shared_ptr<arrow::Table> TPortionInfo::TPreparedBatchData::AssembleTable(co
}
std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
- auto table = AssembleTable(options);
- auto res = table->CombineChunks();
- Y_ABORT_UNLESS(res.ok());
- return NArrow::ToBatch(*res);
+ return NArrow::ToBatch(AssembleTable(options), true);
}
}