aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-01-09 14:37:25 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-01-09 14:37:25 +0300
commit97a49abbd45f380237a57b95730cbe2d81c91e3f (patch)
treec897db0fea2f1af76924cc80e2f4143d41792070
parentd7316a25ddae54e96b1db61144a2ab8c7255cfe6 (diff)
downloadydb-97a49abbd45f380237a57b95730cbe2d81c91e3f.tar.gz
use wide flow directly withno intermediate direct holder
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp193
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.h42
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data_ut.cpp68
3 files changed, 193 insertions, 110 deletions
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index e3c26ea5e7c..f0e04056d95 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -138,61 +138,86 @@ NUdf::TUnboxedValue MakeUnboxedValueFromDecimal128Array(arrow::Array* column, ui
} // namespace
-TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
- const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType)
-{
+ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxedValue* const* result) {
+ ui32 resultColumnsCount = 0;
+ if (ColumnsCount) {
+ for (ui32 i = 0; i < CellsCountForRow; ++i) {
+ if (result[i]) {
+ *result[i] = std::move(Cells[CurrentRow * CellsCountForRow + i]);
+ ++resultColumnsCount;
+ }
+ }
+ }
+ ++CurrentRow;
+ return resultColumnsCount;
+}
+
+template <class TAccessor>
+TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor,
+ const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) {
TBytesStatistics columnStats;
// Hold pointer to column until function end
std::shared_ptr<arrow::Array> columnSharedPtr = batch.column(columnIndex);
arrow::Array* columnPtr = columnSharedPtr.get();
namespace NTypeIds = NScheme::NTypeIds;
for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
- auto& rowItem = editAccessors[rowIndex][columnIndex];
+ auto& rowItem = editAccessor(rowIndex, columnIndex);
if (columnPtr->IsNull(rowIndex)) {
rowItem = NUdf::TUnboxedValue();
} else {
- switch(columnType.GetTypeId()) {
- case NTypeIds::Bool: {
+ switch (columnType.GetTypeId()) {
+ case NTypeIds::Bool:
+ {
rowItem = MakeUnboxedValue<arrow::BooleanArray, bool>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Int8: {
+ case NTypeIds::Int8:
+ {
rowItem = MakeUnboxedValue<arrow::Int8Array>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Int16: {
+ case NTypeIds::Int16:
+ {
rowItem = MakeUnboxedValue<arrow::Int16Array>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Int32: {
+ case NTypeIds::Int32:
+ {
rowItem = MakeUnboxedValue<arrow::Int32Array>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Int64: {
+ case NTypeIds::Int64:
+ {
rowItem = MakeUnboxedValue<arrow::Int64Array, i64>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Uint8: {
+ case NTypeIds::Uint8:
+ {
rowItem = MakeUnboxedValue<arrow::UInt8Array>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Uint16: {
+ case NTypeIds::Uint16:
+ {
rowItem = MakeUnboxedValue<arrow::UInt16Array>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Uint32: {
+ case NTypeIds::Uint32:
+ {
rowItem = MakeUnboxedValue<arrow::UInt32Array>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Uint64: {
+ case NTypeIds::Uint64:
+ {
rowItem = MakeUnboxedValue<arrow::UInt64Array, ui64>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Float: {
+ case NTypeIds::Float:
+ {
rowItem = MakeUnboxedValue<arrow::FloatArray>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Double: {
+ case NTypeIds::Double:
+ {
rowItem = MakeUnboxedValue<arrow::DoubleArray>(columnPtr, rowIndex);
break;
}
@@ -201,33 +226,40 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>&
case NTypeIds::Json:
case NTypeIds::Yson:
case NTypeIds::JsonDocument:
- case NTypeIds::DyNumber: {
+ case NTypeIds::DyNumber:
+ {
rowItem = MakeUnboxedValueFromBinaryData(columnPtr, rowIndex);
break;
}
- case NTypeIds::Date: {
+ case NTypeIds::Date:
+ {
rowItem = MakeUnboxedValue<arrow::UInt16Array>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Datetime: {
+ case NTypeIds::Datetime:
+ {
rowItem = MakeUnboxedValue<arrow::UInt32Array>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Timestamp: {
+ case NTypeIds::Timestamp:
+ {
rowItem = MakeUnboxedValue<arrow::TimestampArray, ui64>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Interval: {
+ case NTypeIds::Interval:
+ {
rowItem = MakeUnboxedValue<arrow::DurationArray, ui64>(columnPtr, rowIndex);
break;
}
- case NTypeIds::Decimal: {
+ case NTypeIds::Decimal:
+ {
rowItem = MakeUnboxedValueFromDecimal128Array(columnPtr, rowIndex);
break;
}
case NTypeIds::PairUi64Ui64:
case NTypeIds::ActorId:
- case NTypeIds::StepOrderId: {
+ case NTypeIds::StepOrderId:
+ {
Y_VERIFY_DEBUG_S(false, "Unsupported (deprecated) type: " << NScheme::TypeName(columnType.GetTypeId()));
rowItem = MakeUnboxedValueFromFixedSizeBinaryData(columnPtr, rowIndex);
break;
@@ -245,13 +277,43 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>&
return columnStats;
}
+TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors,
+ const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType)
+{
+ const auto accessor = [editAccessors, columnsCount](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& {
+ return editAccessors[rowIndex * columnsCount + colIndex];
+ };
+ return WriteColumnValuesFromArrowImpl(accessor, batch, columnIndex, columnType);
+}
+TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
+ const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType)
+{
+ const auto accessor = [&editAccessors](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& {
+ return editAccessors[rowIndex][colIndex];
+ };
+ return WriteColumnValuesFromArrowImpl(accessor, batch, columnIndex, columnType);
+}
std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) {
auto sizes = GetUnboxedValueSize(value, type);
return {sizes.AllocatedBytes, sizes.DataBytes};
}
+ui32 TKqpScanComputeContext::TScanData::FillUnboxedCells(NUdf::TUnboxedValue* const* result) {
+ YQL_ENSURE(!RowBatches.empty());
+ auto& batch = RowBatches.front();
+ auto rowStats = GetRowSize(batch.GetCurrentData(), ResultColumns, SystemColumns);
+ const ui32 resultColumnsCount = batch.FillUnboxedCells(result);
+ if (batch.IsFinished()) {
+ RowBatches.pop();
+ }
+
+ StoredBytes -= rowStats.AllocatedBytes;
+ YQL_ENSURE(RowBatches.empty() == (StoredBytes == 0), "StoredBytes miscalculated!");
+ return resultColumnsCount;
+}
+
TKqpScanComputeContext::TScanData::TScanData(const TTableId& tableId, const TTableRange& range,
const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys,
const TSmallVec<TColumn>& resultColumns)
@@ -325,23 +387,28 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba
TVector<ui64> bytesList;
bytesList.reserve(batch.size());
- TUnboxedValueVector rows;
- rows.reserve(batch.size());
+ TUnboxedValueVector cells;
+ if (!ColumnsCount()) {
+ cells.resize(batch.size(), holderFactory.GetEmptyContainer());
+ stats.AddStatistics({ sizeof(ui64) * batch.size(), sizeof(ui64) * batch.size() });
+ } else {
+ cells.resize(batch.size() * ColumnsCount());
- for (size_t rowIndex = 0; rowIndex < batch.size(); ++rowIndex) {
- auto& row = batch[rowIndex];
+ for (size_t rowIndex = 0; rowIndex < batch.size(); ++rowIndex) {
+ auto& row = batch[rowIndex];
- // Convert row into an UnboxedValue
- NUdf::TUnboxedValue* rowItems = nullptr;
- rows.emplace_back(holderFactory.CreateDirectArrayHolder(ResultColumns.size() + SystemColumns.size(), rowItems));
- for (ui32 i = 0; i < ResultColumns.size(); ++i) {
- rowItems[i] = GetCellValue(row[i], ResultColumns[i].Type);
- }
- FillSystemColumns(&rowItems[ResultColumns.size()], shardId, SystemColumns);
+ auto* vectorStart = &cells.data()[rowIndex * ColumnsCount()];
+ for (ui32 i = 0; i < ResultColumns.size(); ++i) {
+ vectorStart[i] = GetCellValue(row[i], ResultColumns[i].Type);
+ }
+ FillSystemColumns(vectorStart + ResultColumns.size(), shardId, SystemColumns);
- stats.AddStatistics(GetRowSize(rowItems, ResultColumns, SystemColumns));
+ stats.AddStatistics(GetRowSize(vectorStart, ResultColumns, SystemColumns));
+ }
+ }
+ if (cells.size()) {
+ RowBatches.emplace(RowBatch(ColumnsCount(), std::move(cells), shardId));
}
- RowBatches.emplace(RowBatch{std::move(rows), shardId});
StoredBytes += stats.AllocatedBytes;
if (BasicStats) {
@@ -361,42 +428,33 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch,
}
TBytesStatistics stats;
- TUnboxedValueVector rows;
+ TUnboxedValueVector cells;
- if (ResultColumns.empty() && SystemColumns.empty()) {
- rows.resize(batch.num_rows(), holderFactory.GetEmptyContainer());
+ if (!ColumnsCount()) {
+ cells.resize(batch.num_rows(), holderFactory.GetEmptyContainer());
+ stats.AddStatistics({ sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows() });
} else {
- TVector<NUdf::TUnboxedValue*> editAccessors(batch.num_rows());
- rows.reserve(batch.num_rows());
-
- for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
- rows.emplace_back(holderFactory.CreateDirectArrayHolder(
- ResultColumns.size() + SystemColumns.size(),
- editAccessors[rowIndex])
- );
- }
+ cells.resize(batch.num_rows() * ColumnsCount());
for (size_t columnIndex = 0; columnIndex < ResultColumns.size(); ++columnIndex) {
stats.AddStatistics(
- WriteColumnValuesFromArrow(editAccessors, batch, columnIndex, ResultColumns[columnIndex].Type)
+ WriteColumnValuesFromArrow(cells.data(), batch, columnIndex, ColumnsCount(), ResultColumns[columnIndex].Type)
);
}
if (!SystemColumns.empty()) {
for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
- FillSystemColumns(&editAccessors[rowIndex][ResultColumns.size()], shardId, SystemColumns);
+ FillSystemColumns(&cells[rowIndex * ColumnsCount() + ResultColumns.size()], shardId, SystemColumns);
}
stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue);
}
}
- if (ResultColumns.empty()) {
- stats.AddStatistics({sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows()});
+ if (cells.size()) {
+ RowBatches.emplace(RowBatch(ColumnsCount(), std::move(cells), shardId));
}
- RowBatches.emplace(RowBatch{std::move(rows), shardId});
-
StoredBytes += stats.AllocatedBytes;
if (BasicStats) {
BasicStats->Rows += batch.num_rows();
@@ -406,20 +464,6 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch,
return stats.AllocatedBytes;
}
-NUdf::TUnboxedValue TKqpScanComputeContext::TScanData::TakeRow() {
- YQL_ENSURE(!RowBatches.empty());
- auto& batch = RowBatches.front();
- auto row = std::move(batch.Batch[batch.CurrentRow++]);
- auto rowStats = GetRowSize(row.GetElements(), ResultColumns, SystemColumns);
-
- StoredBytes -= rowStats.AllocatedBytes;
- if (batch.CurrentRow == batch.Batch.size()) {
- RowBatches.pop();
- }
- YQL_ENSURE(RowBatches.empty() == (StoredBytes == 0), "StoredBytes miscalculated!");
- return row;
-}
-
void TKqpScanComputeContext::AddTableScan(ui32, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
NYql::NDqProto::EDqStatsMode statsMode)
{
@@ -458,7 +502,7 @@ public:
: ScanData(scanData)
{}
- NUdf::EFetchStatus Next(NUdf::TUnboxedValue& result) override {
+ NUdf::EFetchStatus Next(NUdf::TUnboxedValue& /*result*/) override {
if (ScanData.IsEmpty()) {
if (ScanData.IsFinished()) {
return NUdf::EFetchStatus::Finish;
@@ -466,7 +510,8 @@ public:
return NUdf::EFetchStatus::Yield;
}
- result = std::move(ScanData.TakeRow());
+ Y_VERIFY(false);
+// result = std::move(ScanData.BuildNextDirectArrayHolder());
return NUdf::EFetchStatus::Ok;
}
@@ -478,13 +523,7 @@ public:
return EFetchResult::Yield;
}
- auto row = ScanData.TakeRow();
- for (ui32 i = 0; i < ScanData.GetResultColumns().size() + ScanData.GetSystemColumns().size(); ++i) {
- if (result[i]) {
- *result[i] = std::move(row.GetElement(i));
- }
- }
-
+ ScanData.FillUnboxedCells(result);
return EFetchResult::One;
}
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h
index 6391be44f58..13f0f1c070d 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.h
+++ b/ydb/core/kqp/runtime/kqp_scan_data.h
@@ -14,6 +14,7 @@
#include <library/cpp/actors/core/log.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
+#include <ydb/library/yql/utils/yql_panic.h>
namespace NKikimrTxDataShard {
class TKqpTransaction_TScanTaskMeta;
@@ -36,6 +37,8 @@ struct TBytesStatistics {
TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type);
TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType);
+TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors,
+ const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType);
void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo type);
@@ -58,6 +61,12 @@ public:
const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys,
const TSmallVec<TColumn>& resultColumns);
+ ui32 ColumnsCount() const {
+ return ResultColumns.size() + SystemColumns.size();
+ }
+
+ ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result);
+
TScanData(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, NYql::NDqProto::EDqStatsMode statsMode);
~TScanData() {
@@ -82,8 +91,6 @@ public:
ui64 AddRows(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
- NUdf::TUnboxedValue TakeRow();
-
bool IsEmpty() const {
return RowBatches.empty();
}
@@ -101,7 +108,8 @@ public:
}
void Clear() {
- RowBatches.clear();
+ TQueue<RowBatch> newQueue;
+ std::swap(newQueue, RowBatches);
}
public:
@@ -134,10 +142,32 @@ public:
std::unique_ptr<TProfileStats> ProfileStats;
private:
- struct RowBatch {
- TUnboxedValueVector Batch;
- TMaybe<ui64> ShardId;
+ class RowBatch {
+ private:
+ const ui32 CellsCountForRow;
+ const ui32 ColumnsCount;
+ TUnboxedValueVector Cells;
ui64 CurrentRow = 0;
+ public:
+ TMaybe<ui64> ShardId;
+
+ explicit RowBatch(const ui32 columnsCount, TUnboxedValueVector&& cells, TMaybe<ui64> shardId)
+ : CellsCountForRow(columnsCount ? columnsCount : 1)
+ , ColumnsCount(columnsCount)
+ , Cells(std::move(cells))
+ , ShardId(shardId)
+ {
+ }
+
+ const NUdf::TUnboxedValue* GetCurrentData() const {
+ return Cells.data() + CurrentRow * CellsCountForRow;
+ }
+
+ bool IsFinished() {
+ return CurrentRow * CellsCountForRow == Cells.size();
+ }
+
+ ui32 FillUnboxedCells(NUdf::TUnboxedValue* const* result);
};
TSmallVec<TColumn> Columns;
diff --git a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
index 68762c76bca..aae2d0c669f 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
@@ -251,32 +251,38 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
scanData.AddRows(*batch, {}, factory);
+ std::vector<NUdf::TUnboxedValue> container;
+ container.resize(20);
+ std::vector<NUdf::TUnboxedValue*> containerPtr;
+ for (auto&& i : container) {
+ containerPtr.emplace_back(&i);
+ }
for (auto& row: rows) {
- auto result_row = scanData.TakeRow();
- UNIT_ASSERT_EQUAL(result_row.GetElement(0 ).Get<bool >(), row.Bool );
- UNIT_ASSERT_EQUAL(result_row.GetElement(1 ).Get<i8 >(), row.Int8 );
- UNIT_ASSERT_EQUAL(result_row.GetElement(2 ).Get<i16 >(), row.Int16 );
- UNIT_ASSERT_EQUAL(result_row.GetElement(3 ).Get<i32 >(), row.Int32 );
- UNIT_ASSERT_EQUAL(result_row.GetElement(4 ).Get<i64 >(), row.Int64 );
- UNIT_ASSERT_EQUAL(result_row.GetElement(5 ).Get<ui8 >(), row.UInt8 );
- UNIT_ASSERT_EQUAL(result_row.GetElement(6 ).Get<ui16 >(), row.UInt16 );
- UNIT_ASSERT_EQUAL(result_row.GetElement(7 ).Get<ui32 >(), row.UInt32 );
- UNIT_ASSERT_EQUAL(result_row.GetElement(8 ).Get<ui64 >(), row.UInt64 );
- UNIT_ASSERT_EQUAL(result_row.GetElement(9 ).Get<float >(), row.Float32);
- UNIT_ASSERT_EQUAL(result_row.GetElement(10).Get<double>(), row.Float64);
- auto tmpString = result_row.GetElement(11);
+ scanData.FillUnboxedCells(containerPtr.data());
+ UNIT_ASSERT_EQUAL(container[0 ].Get<bool >(), row.Bool );
+ UNIT_ASSERT_EQUAL(container[1 ].Get<i8 >(), row.Int8 );
+ UNIT_ASSERT_EQUAL(container[2 ].Get<i16 >(), row.Int16 );
+ UNIT_ASSERT_EQUAL(container[3 ].Get<i32 >(), row.Int32 );
+ UNIT_ASSERT_EQUAL(container[4 ].Get<i64 >(), row.Int64 );
+ UNIT_ASSERT_EQUAL(container[5 ].Get<ui8 >(), row.UInt8 );
+ UNIT_ASSERT_EQUAL(container[6 ].Get<ui16 >(), row.UInt16 );
+ UNIT_ASSERT_EQUAL(container[7 ].Get<ui32 >(), row.UInt32 );
+ UNIT_ASSERT_EQUAL(container[8 ].Get<ui64 >(), row.UInt64 );
+ UNIT_ASSERT_EQUAL(container[9 ].Get<float >(), row.Float32);
+ UNIT_ASSERT_EQUAL(container[10].Get<double>(), row.Float64);
+ auto tmpString = container[11];
UNIT_ASSERT_EQUAL(TString(tmpString.AsStringRef().Data()), row.String);
- auto tmpUtf8 = result_row.GetElement(12);
+ auto tmpUtf8 = container[12];
UNIT_ASSERT_EQUAL(TString(tmpUtf8.AsStringRef().Data()), row.Utf8);
- auto tmpJson = result_row.GetElement(13);
+ auto tmpJson = container[13];
UNIT_ASSERT_EQUAL(TString(tmpJson.AsStringRef().Data()), row.Json);
- auto tmpYson = result_row.GetElement(14);
+ auto tmpYson = container[14];
UNIT_ASSERT_EQUAL(TString(tmpYson.AsStringRef().Data()), row.Yson);
- UNIT_ASSERT_EQUAL(result_row.GetElement(15).Get<i32 >(), row.Date );
- UNIT_ASSERT_EQUAL(result_row.GetElement(16).Get<i64 >(), row.Datetime );
- UNIT_ASSERT_EQUAL(result_row.GetElement(17).Get<i64 >(), row.Timestamp);
- UNIT_ASSERT_EQUAL(result_row.GetElement(18).Get<i64 >(), row.Interval );
- UNIT_ASSERT_EQUAL(result_row.GetElement(19).GetInt128(), row.Decimal );
+ UNIT_ASSERT_EQUAL(container[15].Get<i32 >(), row.Date );
+ UNIT_ASSERT_EQUAL(container[16].Get<i64 >(), row.Datetime );
+ UNIT_ASSERT_EQUAL(container[17].Get<i64 >(), row.Timestamp);
+ UNIT_ASSERT_EQUAL(container[18].Get<i64 >(), row.Interval );
+ UNIT_ASSERT_EQUAL(container[19].GetInt128(), row.Decimal );
}
UNIT_ASSERT(scanData.IsEmpty());
@@ -301,9 +307,16 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
scanData.AddRows(*batch, {}, factory);
+ std::vector<NUdf::TUnboxedValue> container;
+ container.resize(1);
+ std::vector<NUdf::TUnboxedValue*> containerPtr;
+ for (auto&& i : container) {
+ containerPtr.emplace_back(&i);
+ }
+
for (auto& row: rows) {
- auto result_row = scanData.TakeRow();
- UNIT_ASSERT_EQUAL(result_row.GetElement(0).Get<i8>(), row.Int8);
+ scanData.FillUnboxedCells(containerPtr.data());
+ UNIT_ASSERT_EQUAL(container[0].Get<i8>(), row.Int8);
}
UNIT_ASSERT(scanData.IsEmpty());
@@ -321,11 +334,12 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
auto bytes = scanData.AddRows(emptyBatch, {}, factory);
UNIT_ASSERT(bytes > 0);
+ std::vector<NUdf::TUnboxedValue*> containerPtr;
+
for (const auto& row: emptyBatch) {
Y_UNUSED(row);
UNIT_ASSERT(!scanData.IsEmpty());
- auto item = scanData.TakeRow();
- UNIT_ASSERT(item.GetListLength() == 0);
+ UNIT_ASSERT(scanData.FillUnboxedCells(containerPtr.data()) == 0);
}
UNIT_ASSERT(scanData.IsEmpty());
}
@@ -341,11 +355,11 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
auto bytes = scanData.AddRows(*anotherEmptyBatch, {}, factory);
UNIT_ASSERT(bytes > 0);
+ std::vector<NUdf::TUnboxedValue*> containerPtr;
for (const auto& row: rows) {
Y_UNUSED(row);
UNIT_ASSERT(!scanData.IsEmpty());
- auto item = scanData.TakeRow();
- UNIT_ASSERT(item.GetListLength() == 0);
+ UNIT_ASSERT(scanData.FillUnboxedCells(containerPtr.data()) == 0);
}
UNIT_ASSERT(scanData.IsEmpty());
}