diff options
author | ssmike <ssmike@ydb.tech> | 2023-01-30 16:24:03 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-01-30 16:24:03 +0300 |
commit | 9dcc4b43f691712172d5156d718dc9f0a930af0b (patch) | |
tree | 78f8c837d24e88efc3eb1ecd052f4d32f0530152 | |
parent | fa320a05d3da5588e60caec22bf721750876384a (diff) | |
download | ydb-9dcc4b43f691712172d5156d718dc9f0a930af0b.tar.gz |
fix non-system column indices
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.h | 2 |
3 files changed, 32 insertions, 13 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 2c20e32a4b9..5b227e0ec30 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -647,11 +647,14 @@ public: NMiniKQL::TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row) { NMiniKQL::TBytesStatistics rowStats{0, 0}; - for (size_t i = 0; i < Settings.ColumnsSize(); ++i) { - if (IsSystemColumn(Settings.GetColumns(i).GetId())) { + size_t columnIndex = 0; + for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { + if (IsSystemColumn(Settings.GetColumns(resultColumnIndex).GetId())) { rowStats.AllocatedBytes += sizeof(NUdf::TUnboxedValue); } else { - rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(row[i], NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(i).GetType()))); + rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize( + row[columnIndex], NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType()))); + columnIndex += 1; } } if (Settings.ColumnsSize() == 0) { @@ -687,19 +690,21 @@ public: ); } - for (size_t columnIndex = 0; columnIndex < Settings.ColumnsSize(); ++columnIndex) { - auto tag = Settings.GetColumns(columnIndex).GetId(); - auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(columnIndex).GetType()); + size_t columnIndex = 0; + for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { + auto tag = Settings.GetColumns(resultColumnIndex).GetId(); + auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType()); if (IsSystemColumn(tag)) { for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { - NMiniKQL::FillSystemColumn(editAccessors[rowIndex][columnIndex], shardId, tag, type); + NMiniKQL::FillSystemColumn(editAccessors[rowIndex][resultColumnIndex], shardId, tag, type); stats.AllocatedBytes += sizeof(NUdf::TUnboxedValue); } } else { hasResultColumns = true; stats.AddStatistics( - NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, type) + NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, resultColumnIndex, type) ); + columnIndex += 1; } } } @@ -722,13 +727,15 @@ public: const auto& row = result->Get()->GetCells(rowIndex); NUdf::TUnboxedValue* rowItems = nullptr; batch.emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems)); - for (size_t i = 0; i < Settings.ColumnsSize(); ++i) { - auto tag = Settings.GetColumns(i).GetId(); - auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(i).GetType()); + size_t columnIndex = 0; + for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { + auto tag = Settings.GetColumns(resultColumnIndex).GetId(); + auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType()); if (IsSystemColumn(tag)) { - NMiniKQL::FillSystemColumn(rowItems[i], shardId, tag, type); + NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type); } else { - rowItems[i] = NMiniKQL::GetCellValue(row[i], type); + rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type); + columnIndex += 1; } } stats.AddStatistics(GetRowSize(rowItems)); diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index 3ded07d28e3..4ae2eb8b2fb 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -394,6 +394,16 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& return WriteColumnValuesFromArrowImpl(accessor, batch, columnIndex, columnType); } +TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, + const arrow::RecordBatch& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType) +{ + const auto accessor = [=, &editAccessors](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& { + YQL_ENSURE(colIndex == columnIndex); + return editAccessors[rowIndex][resultColumnIndex]; + }; + return WriteColumnValuesFromArrowImpl(accessor, batch, columnIndex, columnType); +} + std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) { auto sizes = GetUnboxedValueSize(value, type); return {sizes.AllocatedBytes, sizes.DataBytes}; diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h index d61e8b3c0a3..34fbcdcd613 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.h +++ b/ydb/core/kqp/runtime/kqp_scan_data.h @@ -62,6 +62,8 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType); TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors, const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType); +TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, + const arrow::RecordBatch& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType); void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo type); |