aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-01-30 16:24:03 +0300
committerssmike <ssmike@ydb.tech>2023-01-30 16:24:03 +0300
commit9dcc4b43f691712172d5156d718dc9f0a930af0b (patch)
tree78f8c837d24e88efc3eb1ecd052f4d32f0530152
parentfa320a05d3da5588e60caec22bf721750876384a (diff)
downloadydb-9dcc4b43f691712172d5156d718dc9f0a930af0b.tar.gz
fix non-system column indices
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp33
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp10
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.h2
3 files changed, 32 insertions, 13 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 2c20e32a4b9..5b227e0ec30 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -647,11 +647,14 @@ public:
NMiniKQL::TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row) {
NMiniKQL::TBytesStatistics rowStats{0, 0};
- for (size_t i = 0; i < Settings.ColumnsSize(); ++i) {
- if (IsSystemColumn(Settings.GetColumns(i).GetId())) {
+ size_t columnIndex = 0;
+ for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) {
+ if (IsSystemColumn(Settings.GetColumns(resultColumnIndex).GetId())) {
rowStats.AllocatedBytes += sizeof(NUdf::TUnboxedValue);
} else {
- rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(row[i], NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(i).GetType())));
+ rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(
+ row[columnIndex], NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType())));
+ columnIndex += 1;
}
}
if (Settings.ColumnsSize() == 0) {
@@ -687,19 +690,21 @@ public:
);
}
- for (size_t columnIndex = 0; columnIndex < Settings.ColumnsSize(); ++columnIndex) {
- auto tag = Settings.GetColumns(columnIndex).GetId();
- auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(columnIndex).GetType());
+ size_t columnIndex = 0;
+ for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) {
+ auto tag = Settings.GetColumns(resultColumnIndex).GetId();
+ auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType());
if (IsSystemColumn(tag)) {
for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
- NMiniKQL::FillSystemColumn(editAccessors[rowIndex][columnIndex], shardId, tag, type);
+ NMiniKQL::FillSystemColumn(editAccessors[rowIndex][resultColumnIndex], shardId, tag, type);
stats.AllocatedBytes += sizeof(NUdf::TUnboxedValue);
}
} else {
hasResultColumns = true;
stats.AddStatistics(
- NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, type)
+ NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, resultColumnIndex, type)
);
+ columnIndex += 1;
}
}
}
@@ -722,13 +727,15 @@ public:
const auto& row = result->Get()->GetCells(rowIndex);
NUdf::TUnboxedValue* rowItems = nullptr;
batch.emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems));
- for (size_t i = 0; i < Settings.ColumnsSize(); ++i) {
- auto tag = Settings.GetColumns(i).GetId();
- auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(i).GetType());
+ size_t columnIndex = 0;
+ for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) {
+ auto tag = Settings.GetColumns(resultColumnIndex).GetId();
+ auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType());
if (IsSystemColumn(tag)) {
- NMiniKQL::FillSystemColumn(rowItems[i], shardId, tag, type);
+ NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type);
} else {
- rowItems[i] = NMiniKQL::GetCellValue(row[i], type);
+ rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type);
+ columnIndex += 1;
}
}
stats.AddStatistics(GetRowSize(rowItems));
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index 3ded07d28e3..4ae2eb8b2fb 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -394,6 +394,16 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>&
return WriteColumnValuesFromArrowImpl(accessor, batch, columnIndex, columnType);
}
+TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
+ const arrow::RecordBatch& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType)
+{
+ const auto accessor = [=, &editAccessors](const ui32 rowIndex, const ui32 colIndex) -> NUdf::TUnboxedValue& {
+ YQL_ENSURE(colIndex == columnIndex);
+ return editAccessors[rowIndex][resultColumnIndex];
+ };
+ return WriteColumnValuesFromArrowImpl(accessor, batch, columnIndex, columnType);
+}
+
std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) {
auto sizes = GetUnboxedValueSize(value, type);
return {sizes.AllocatedBytes, sizes.DataBytes};
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h
index d61e8b3c0a3..34fbcdcd613 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.h
+++ b/ydb/core/kqp/runtime/kqp_scan_data.h
@@ -62,6 +62,8 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>&
const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType);
TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors,
const arrow::RecordBatch& batch, i64 columnIndex, const ui32 columnsCount, NScheme::TTypeInfo columnType);
+TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
+ const arrow::RecordBatch& batch, i64 columnIndex, i64 resultColumnIndex, NScheme::TTypeInfo columnType);
void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo type);