diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-09 14:37:25 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-09 14:37:25 +0300 |
commit | 97a49abbd45f380237a57b95730cbe2d81c91e3f (patch) | |
tree | c897db0fea2f1af76924cc80e2f4143d41792070 | |
parent | d7316a25ddae54e96b1db61144a2ab8c7255cfe6 (diff) | |
download | ydb-97a49abbd45f380237a57b95730cbe2d81c91e3f.tar.gz |
use wide flow directly withno intermediate direct holder
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.cpp | 193 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.h | 42 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data_ut.cpp | 68 |
3 files changed, 193 insertions, 110 deletions
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index e3c26ea5e7c..f0e04056d95 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -138,61 +138,86 @@ NUdf::TUnboxedValue MakeUnboxedValueFromDecimal128Array(arrow::Array* column, ui } // namespace -TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, - const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) -{ +ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxedValue* const* result) { + ui32 resultColumnsCount = 0; + if (ColumnsCount) { + for (ui32 i = 0; i < CellsCountForRow; ++i) { + if (result[i]) { + *result[i] = std::move(Cells[CurrentRow * CellsCountForRow + i]); + ++resultColumnsCount; + } + } + } + ++CurrentRow; + return resultColumnsCount; +} + +template <class TAccessor> +TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor, + const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) { TBytesStatistics columnStats; // Hold pointer to column until function end std::shared_ptr<arrow::Array> columnSharedPtr = batch.column(columnIndex); arrow::Array* columnPtr = columnSharedPtr.get(); namespace NTypeIds = NScheme::NTypeIds; for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { - auto& rowItem = editAccessors[rowIndex][columnIndex]; + auto& rowItem = editAccessor(rowIndex, columnIndex); if (columnPtr->IsNull(rowIndex)) { rowItem = NUdf::TUnboxedValue(); } else { - switch(columnType.GetTypeId()) { - case NTypeIds::Bool: { + switch (columnType.GetTypeId()) { + case NTypeIds::Bool: + { rowItem = MakeUnboxedValue<arrow::BooleanArray, bool>(columnPtr, rowIndex); break; } - case NTypeIds::Int8: { + case NTypeIds::Int8: + { rowItem = MakeUnboxedValue<arrow::Int8Array>(columnPtr, rowIndex); break; } - case NTypeIds::Int16: { + case NTypeIds::Int16: + { rowItem = MakeUnboxedValue<arrow::Int16Array>(columnPtr, rowIndex); break; } - case NTypeIds::Int32: { + case NTypeIds::Int32: + { rowItem = MakeUnboxedValue<arrow::Int32Array>(columnPtr, rowIndex); break; } - case NTypeIds::Int64: { + case NTypeIds::Int64: + { rowItem = MakeUnboxedValue<arrow::Int64Array, i64>(columnPtr, rowIndex); break; } - case NTypeIds::Uint8: { + case NTypeIds::Uint8: + { rowItem = MakeUnboxedValue<arrow::UInt8Array>(columnPtr, rowIndex); break; } - case NTypeIds::Uint16: { + case NTypeIds::Uint16: + { rowItem = MakeUnboxedValue<arrow::UInt16Array>(columnPtr, rowIndex); break; } - case NTypeIds::Uint32: { + case NTypeIds::Uint32: + { rowItem = MakeUnboxedValue<arrow::UInt32Array>(columnPtr, rowIndex); break; } - case NTypeIds::Uint64: { + case NTypeIds::Uint64: + { rowItem = MakeUnboxedValue<arrow::UInt64Array, ui64>(columnPtr, rowIndex); break; } - case NTypeIds::Float: { + case NTypeIds::Float: + { rowItem = MakeUnboxedValue<arrow::FloatArray>(columnPtr, rowIndex); break; } - case NTypeIds::Double: { + case NTypeIds::Double: + { rowItem = MakeUnboxedValue<arrow::DoubleArray>(columnPtr, rowIndex); break; } @@ -201,33 +226,40 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& case NTypeIds::Json: case NTypeIds::Yson: case NTypeIds::JsonDocument: - case NTypeIds::DyNumber: { + case NTypeIds::DyNumber: + { rowItem = MakeUnboxedValueFromBinaryData(columnPtr, rowIndex); break; } - case NTypeIds::Date: { + case NTypeIds::Date: + { rowItem = MakeUnboxedValue<arrow::UInt16Array>(columnPtr, rowIndex); break; } - case NTypeIds::Datetime: { + case NTypeIds::Datetime: + { rowItem = MakeUnboxedValue<arrow::UInt32Array>(columnPtr, rowIndex); break; } - case NTypeIds::Timestamp: { + case NTypeIds::Timestamp: + { rowItem = MakeUnboxedValue<arrow::TimestampArray, ui64>(columnPtr, rowIndex); break; } - case NTypeIds::Interval: { + case NTypeIds::Interval: + { rowItem = MakeUnboxedValue<arrow::DurationArray, ui64>(columnPtr, rowIndex); break; } - case NTypeIds::Decimal: { + case NTypeIds::Decimal: + { rowItem = MakeUnboxedValueFromDecimal128Array(columnPtr, rowIndex); break; } case NTypeIds::PairUi64Ui64: case NTypeIds::ActorId: - case NTypeIds::StepOrderId: { + case NTypeIds::StepOrderId: + { Y_VERIFY_DEBUG_S(false, "Unsupported (deprecated) type: " << NScheme::TypeName(columnType.GetTypeId())); rowItem = MakeUnboxedValueFromFixedSizeBinaryData(columnPtr, rowIndex); break; @@ -245,13 +277,43 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& return columnStats; } +TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors, + const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType) +{ + const auto accessor = [editAccessors, columnsCount](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& { + return editAccessors[rowIndex * columnsCount + colIndex]; + }; + return WriteColumnValuesFromArrowImpl(accessor, batch, columnIndex, columnType); +} +TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, + const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) +{ + const auto accessor = [&editAccessors](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& { + return editAccessors[rowIndex][colIndex]; + }; + return WriteColumnValuesFromArrowImpl(accessor, batch, columnIndex, columnType); +} std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) { auto sizes = GetUnboxedValueSize(value, type); return {sizes.AllocatedBytes, sizes.DataBytes}; } +ui32 TKqpScanComputeContext::TScanData::FillUnboxedCells(NUdf::TUnboxedValue* const* result) { + YQL_ENSURE(!RowBatches.empty()); + auto& batch = RowBatches.front(); + auto rowStats = GetRowSize(batch.GetCurrentData(), ResultColumns, SystemColumns); + const ui32 resultColumnsCount = batch.FillUnboxedCells(result); + if (batch.IsFinished()) { + RowBatches.pop(); + } + + StoredBytes -= rowStats.AllocatedBytes; + YQL_ENSURE(RowBatches.empty() == (StoredBytes == 0), "StoredBytes miscalculated!"); + return resultColumnsCount; +} + TKqpScanComputeContext::TScanData::TScanData(const TTableId& tableId, const TTableRange& range, const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys, const TSmallVec<TColumn>& resultColumns) @@ -325,23 +387,28 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba TVector<ui64> bytesList; bytesList.reserve(batch.size()); - TUnboxedValueVector rows; - rows.reserve(batch.size()); + TUnboxedValueVector cells; + if (!ColumnsCount()) { + cells.resize(batch.size(), holderFactory.GetEmptyContainer()); + stats.AddStatistics({ sizeof(ui64) * batch.size(), sizeof(ui64) * batch.size() }); + } else { + cells.resize(batch.size() * ColumnsCount()); - for (size_t rowIndex = 0; rowIndex < batch.size(); ++rowIndex) { - auto& row = batch[rowIndex]; + for (size_t rowIndex = 0; rowIndex < batch.size(); ++rowIndex) { + auto& row = batch[rowIndex]; - // Convert row into an UnboxedValue - NUdf::TUnboxedValue* rowItems = nullptr; - rows.emplace_back(holderFactory.CreateDirectArrayHolder(ResultColumns.size() + SystemColumns.size(), rowItems)); - for (ui32 i = 0; i < ResultColumns.size(); ++i) { - rowItems[i] = GetCellValue(row[i], ResultColumns[i].Type); - } - FillSystemColumns(&rowItems[ResultColumns.size()], shardId, SystemColumns); + auto* vectorStart = &cells.data()[rowIndex * ColumnsCount()]; + for (ui32 i = 0; i < ResultColumns.size(); ++i) { + vectorStart[i] = GetCellValue(row[i], ResultColumns[i].Type); + } + FillSystemColumns(vectorStart + ResultColumns.size(), shardId, SystemColumns); - stats.AddStatistics(GetRowSize(rowItems, ResultColumns, SystemColumns)); + stats.AddStatistics(GetRowSize(vectorStart, ResultColumns, SystemColumns)); + } + } + if (cells.size()) { + RowBatches.emplace(RowBatch(ColumnsCount(), std::move(cells), shardId)); } - RowBatches.emplace(RowBatch{std::move(rows), shardId}); StoredBytes += stats.AllocatedBytes; if (BasicStats) { @@ -361,42 +428,33 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch, } TBytesStatistics stats; - TUnboxedValueVector rows; + TUnboxedValueVector cells; - if (ResultColumns.empty() && SystemColumns.empty()) { - rows.resize(batch.num_rows(), holderFactory.GetEmptyContainer()); + if (!ColumnsCount()) { + cells.resize(batch.num_rows(), holderFactory.GetEmptyContainer()); + stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() }); } else { - TVector<NUdf::TUnboxedValue*> editAccessors(batch.num_rows()); - rows.reserve(batch.num_rows()); - - for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { - rows.emplace_back(holderFactory.CreateDirectArrayHolder( - ResultColumns.size() + SystemColumns.size(), - editAccessors[rowIndex]) - ); - } + cells.resize(batch.num_rows() * ColumnsCount()); for (size_t columnIndex = 0; columnIndex < ResultColumns.size(); ++columnIndex) { stats.AddStatistics( - WriteColumnValuesFromArrow(editAccessors, batch, columnIndex, ResultColumns[columnIndex].Type) + WriteColumnValuesFromArrow(cells.data(), batch, columnIndex, ColumnsCount(), ResultColumns[columnIndex].Type) ); } if (!SystemColumns.empty()) { for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { - FillSystemColumns(&editAccessors[rowIndex][ResultColumns.size()], shardId, SystemColumns); + FillSystemColumns(&cells[rowIndex * ColumnsCount() + ResultColumns.size()], shardId, SystemColumns); } stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue); } } - if (ResultColumns.empty()) { - stats.AddStatistics({sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows()}); + if (cells.size()) { + RowBatches.emplace(RowBatch(ColumnsCount(), std::move(cells), shardId)); } - RowBatches.emplace(RowBatch{std::move(rows), shardId}); - StoredBytes += stats.AllocatedBytes; if (BasicStats) { BasicStats->Rows += batch.num_rows(); @@ -406,20 +464,6 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch, return stats.AllocatedBytes; } -NUdf::TUnboxedValue TKqpScanComputeContext::TScanData::TakeRow() { - YQL_ENSURE(!RowBatches.empty()); - auto& batch = RowBatches.front(); - auto row = std::move(batch.Batch[batch.CurrentRow++]); - auto rowStats = GetRowSize(row.GetElements(), ResultColumns, SystemColumns); - - StoredBytes -= rowStats.AllocatedBytes; - if (batch.CurrentRow == batch.Batch.size()) { - RowBatches.pop(); - } - YQL_ENSURE(RowBatches.empty() == (StoredBytes == 0), "StoredBytes miscalculated!"); - return row; -} - void TKqpScanComputeContext::AddTableScan(ui32, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, NYql::NDqProto::EDqStatsMode statsMode) { @@ -458,7 +502,7 @@ public: : ScanData(scanData) {} - NUdf::EFetchStatus Next(NUdf::TUnboxedValue& result) override { + NUdf::EFetchStatus Next(NUdf::TUnboxedValue& /*result*/) override { if (ScanData.IsEmpty()) { if (ScanData.IsFinished()) { return NUdf::EFetchStatus::Finish; @@ -466,7 +510,8 @@ public: return NUdf::EFetchStatus::Yield; } - result = std::move(ScanData.TakeRow()); + Y_VERIFY(false); +// result = std::move(ScanData.BuildNextDirectArrayHolder()); return NUdf::EFetchStatus::Ok; } @@ -478,13 +523,7 @@ public: return EFetchResult::Yield; } - auto row = ScanData.TakeRow(); - for (ui32 i = 0; i < ScanData.GetResultColumns().size() + ScanData.GetSystemColumns().size(); ++i) { - if (result[i]) { - *result[i] = std::move(row.GetElement(i)); - } - } - + ScanData.FillUnboxedCells(result); return EFetchResult::One; } diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h index 6391be44f58..13f0f1c070d 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.h +++ b/ydb/core/kqp/runtime/kqp_scan_data.h @@ -14,6 +14,7 @@ #include <library/cpp/actors/core/log.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> +#include <ydb/library/yql/utils/yql_panic.h> namespace NKikimrTxDataShard { class TKqpTransaction_TScanTaskMeta; @@ -36,6 +37,8 @@ struct TBytesStatistics { TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type); TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType); +TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors, + const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType); void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo type); @@ -58,6 +61,12 @@ public: const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys, const TSmallVec<TColumn>& resultColumns); + ui32 ColumnsCount() const { + return ResultColumns.size() + SystemColumns.size(); + } + + ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result); + TScanData(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, NYql::NDqProto::EDqStatsMode statsMode); ~TScanData() { @@ -82,8 +91,6 @@ public: ui64 AddRows(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory); - NUdf::TUnboxedValue TakeRow(); - bool IsEmpty() const { return RowBatches.empty(); } @@ -101,7 +108,8 @@ public: } void Clear() { - RowBatches.clear(); + TQueue<RowBatch> newQueue; + std::swap(newQueue, RowBatches); } public: @@ -134,10 +142,32 @@ public: std::unique_ptr<TProfileStats> ProfileStats; private: - struct RowBatch { - TUnboxedValueVector Batch; - TMaybe<ui64> ShardId; + class RowBatch { + private: + const ui32 CellsCountForRow; + const ui32 ColumnsCount; + TUnboxedValueVector Cells; ui64 CurrentRow = 0; + public: + TMaybe<ui64> ShardId; + + explicit RowBatch(const ui32 columnsCount, TUnboxedValueVector&& cells, TMaybe<ui64> shardId) + : CellsCountForRow(columnsCount ? columnsCount : 1) + , ColumnsCount(columnsCount) + , Cells(std::move(cells)) + , ShardId(shardId) + { + } + + const NUdf::TUnboxedValue* GetCurrentData() const { + return Cells.data() + CurrentRow * CellsCountForRow; + } + + bool IsFinished() { + return CurrentRow * CellsCountForRow == Cells.size(); + } + + ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result); }; TSmallVec<TColumn> Columns; diff --git a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp index 68762c76bca..aae2d0c669f 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp @@ -251,32 +251,38 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { scanData.AddRows(*batch, {}, factory); + std::vector<NUdf::TUnboxedValue> container; + container.resize(20); + std::vector<NUdf::TUnboxedValue*> containerPtr; + for (auto&& i : container) { + containerPtr.emplace_back(&i); + } for (auto& row: rows) { - auto result_row = scanData.TakeRow(); - UNIT_ASSERT_EQUAL(result_row.GetElement(0 ).Get<bool >(), row.Bool ); - UNIT_ASSERT_EQUAL(result_row.GetElement(1 ).Get<i8 >(), row.Int8 ); - UNIT_ASSERT_EQUAL(result_row.GetElement(2 ).Get<i16 >(), row.Int16 ); - UNIT_ASSERT_EQUAL(result_row.GetElement(3 ).Get<i32 >(), row.Int32 ); - UNIT_ASSERT_EQUAL(result_row.GetElement(4 ).Get<i64 >(), row.Int64 ); - UNIT_ASSERT_EQUAL(result_row.GetElement(5 ).Get<ui8 >(), row.UInt8 ); - UNIT_ASSERT_EQUAL(result_row.GetElement(6 ).Get<ui16 >(), row.UInt16 ); - UNIT_ASSERT_EQUAL(result_row.GetElement(7 ).Get<ui32 >(), row.UInt32 ); - UNIT_ASSERT_EQUAL(result_row.GetElement(8 ).Get<ui64 >(), row.UInt64 ); - UNIT_ASSERT_EQUAL(result_row.GetElement(9 ).Get<float >(), row.Float32); - UNIT_ASSERT_EQUAL(result_row.GetElement(10).Get<double>(), row.Float64); - auto tmpString = result_row.GetElement(11); + scanData.FillUnboxedCells(containerPtr.data()); + UNIT_ASSERT_EQUAL(container[0 ].Get<bool >(), row.Bool ); + UNIT_ASSERT_EQUAL(container[1 ].Get<i8 >(), row.Int8 ); + UNIT_ASSERT_EQUAL(container[2 ].Get<i16 >(), row.Int16 ); + UNIT_ASSERT_EQUAL(container[3 ].Get<i32 >(), row.Int32 ); + UNIT_ASSERT_EQUAL(container[4 ].Get<i64 >(), row.Int64 ); + UNIT_ASSERT_EQUAL(container[5 ].Get<ui8 >(), row.UInt8 ); + UNIT_ASSERT_EQUAL(container[6 ].Get<ui16 >(), row.UInt16 ); + UNIT_ASSERT_EQUAL(container[7 ].Get<ui32 >(), row.UInt32 ); + UNIT_ASSERT_EQUAL(container[8 ].Get<ui64 >(), row.UInt64 ); + UNIT_ASSERT_EQUAL(container[9 ].Get<float >(), row.Float32); + UNIT_ASSERT_EQUAL(container[10].Get<double>(), row.Float64); + auto tmpString = container[11]; UNIT_ASSERT_EQUAL(TString(tmpString.AsStringRef().Data()), row.String); - auto tmpUtf8 = result_row.GetElement(12); + auto tmpUtf8 = container[12]; UNIT_ASSERT_EQUAL(TString(tmpUtf8.AsStringRef().Data()), row.Utf8); - auto tmpJson = result_row.GetElement(13); + auto tmpJson = container[13]; UNIT_ASSERT_EQUAL(TString(tmpJson.AsStringRef().Data()), row.Json); - auto tmpYson = result_row.GetElement(14); + auto tmpYson = container[14]; UNIT_ASSERT_EQUAL(TString(tmpYson.AsStringRef().Data()), row.Yson); - UNIT_ASSERT_EQUAL(result_row.GetElement(15).Get<i32 >(), row.Date ); - UNIT_ASSERT_EQUAL(result_row.GetElement(16).Get<i64 >(), row.Datetime ); - UNIT_ASSERT_EQUAL(result_row.GetElement(17).Get<i64 >(), row.Timestamp); - UNIT_ASSERT_EQUAL(result_row.GetElement(18).Get<i64 >(), row.Interval ); - UNIT_ASSERT_EQUAL(result_row.GetElement(19).GetInt128(), row.Decimal ); + UNIT_ASSERT_EQUAL(container[15].Get<i32 >(), row.Date ); + UNIT_ASSERT_EQUAL(container[16].Get<i64 >(), row.Datetime ); + UNIT_ASSERT_EQUAL(container[17].Get<i64 >(), row.Timestamp); + UNIT_ASSERT_EQUAL(container[18].Get<i64 >(), row.Interval ); + UNIT_ASSERT_EQUAL(container[19].GetInt128(), row.Decimal ); } UNIT_ASSERT(scanData.IsEmpty()); @@ -301,9 +307,16 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { scanData.AddRows(*batch, {}, factory); + std::vector<NUdf::TUnboxedValue> container; + container.resize(1); + std::vector<NUdf::TUnboxedValue*> containerPtr; + for (auto&& i : container) { + containerPtr.emplace_back(&i); + } + for (auto& row: rows) { - auto result_row = scanData.TakeRow(); - UNIT_ASSERT_EQUAL(result_row.GetElement(0).Get<i8>(), row.Int8); + scanData.FillUnboxedCells(containerPtr.data()); + UNIT_ASSERT_EQUAL(container[0].Get<i8>(), row.Int8); } UNIT_ASSERT(scanData.IsEmpty()); @@ -321,11 +334,12 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { auto bytes = scanData.AddRows(emptyBatch, {}, factory); UNIT_ASSERT(bytes > 0); + std::vector<NUdf::TUnboxedValue*> containerPtr; + for (const auto& row: emptyBatch) { Y_UNUSED(row); UNIT_ASSERT(!scanData.IsEmpty()); - auto item = scanData.TakeRow(); - UNIT_ASSERT(item.GetListLength() == 0); + UNIT_ASSERT(scanData.FillUnboxedCells(containerPtr.data()) == 0); } UNIT_ASSERT(scanData.IsEmpty()); } @@ -341,11 +355,11 @@ Y_UNIT_TEST_SUITE(TKqpScanData) { auto bytes = scanData.AddRows(*anotherEmptyBatch, {}, factory); UNIT_ASSERT(bytes > 0); + std::vector<NUdf::TUnboxedValue*> containerPtr; for (const auto& row: rows) { Y_UNUSED(row); UNIT_ASSERT(!scanData.IsEmpty()); - auto item = scanData.TakeRow(); - UNIT_ASSERT(item.GetListLength() == 0); + UNIT_ASSERT(scanData.FillUnboxedCells(containerPtr.data()) == 0); } UNIT_ASSERT(scanData.IsEmpty()); } |