aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-09-10 00:14:40 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-09-10 00:14:40 +0300
commit80d869bfc780d641d663536dc871e9e65589c6ad (patch)
treef449c94b2e19c44cde60d06631dd0285de0275bd
parent21c067632afdb06d2eac52b071552f469ce738af (diff)
downloadydb-80d869bfc780d641d663536dc871e9e65589c6ad.tar.gz
Fix coredump on read from columnshard.
Add ResultColumns field to ScanData. ScanData's Columns field is used to generate EvKqpRead event to describe what columns should be read. Add result type to OlapRangeRead.
-rw-r--r--ydb/core/kqp/compile/kqp_compile.cpp10
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp52
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.h1
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp64
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.h8
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data_ut.cpp147
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp6
-rw-r--r--ydb/core/protos/kqp_physical.proto4
-rw-r--r--ydb/core/protos/tx_datashard.proto1
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp8
-rw-r--r--ydb/library/mkql_proto/mkql_proto.cpp3
-rw-r--r--ydb/library/mkql_proto/mkql_proto_ut.cpp9
12 files changed, 218 insertions, 95 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp
index 6504ab68109..753873491dd 100644
--- a/ydb/core/kqp/compile/kqp_compile.cpp
+++ b/ydb/core/kqp/compile/kqp_compile.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/kqp/compile/kqp_olap_compiler.h>
#include <ydb/core/kqp/opt/kqp_opt.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
+#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/core/tx/schemeshard/schemeshard_utils.h>
#include <ydb/library/mkql_proto/mkql_proto.h>
@@ -480,6 +481,7 @@ private:
FillColumns(readTableRanges.Columns(), *tableMeta, tableOp, true);
FillReadRanges(readTableRanges, *tableMeta, *tableOp.MutableReadOlapRange());
FillOlapProgram(readTableRanges.Process(), *tableMeta, *tableOp.MutableReadOlapRange());
+ FillResultType(readTableRanges.Process().Ref().GetTypeAnn(), *tableOp.MutableReadOlapRange());
} else if (node.Maybe<TCoSort>()) {
hasSort = true;
} else if (node.Maybe<TCoMapJoinCore>()) {
@@ -729,6 +731,14 @@ private:
YQL_ENSURE(false, "Unexpected connection type: " << connection.CallableName());
}
+ void FillResultType(const TTypeAnnotationNode* resultType, NKqpProto::TKqpPhyOpReadOlapRanges& opProto)
+ {
+ YQL_ENSURE(resultType->GetKind() == NYql::ETypeAnnotationKind::Flow, "Unexpected type: " << NYql::FormatType(resultType));
+ TProgramBuilder pgmBuilder(TypeEnv, FuncRegistry);
+ const auto resultItemType = resultType->Cast<TFlowExprType>()->GetItemType();
+ ExportTypeToProto(CompileType(pgmBuilder, *resultItemType), *opProto.MutableResultType());
+ }
+
private:
TString Cluster;
const TIntrusivePtr<TKikimrTablesData> TablesData;
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 8785fafd8f7..af65c7240d3 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -16,9 +16,11 @@
#include <ydb/core/kqp/node/kqp_node.h>
#include <ydb/core/kqp/runtime/kqp_transport.h>
#include <ydb/core/kqp/prepare/kqp_query_plan.h>
+#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/yql/dq/runtime/dq_columns_resolve.h>
#include <ydb/library/yql/dq/tasks/dq_connection_builder.h>
+#include <ydb/library/yql/minikql/mkql_node_serialization.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -378,7 +380,7 @@ private:
private:
void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse, bool sorted,
- const TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>& readOlapRange)
+ NKikimr::NMiniKQL::TType* resultType, const TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>& readOlapRange)
{
if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) {
// Validate parameters
@@ -398,6 +400,26 @@ private:
taskMeta.ReadInfo.Reverse = reverse;
taskMeta.ReadInfo.Sorted = sorted;
+ if (resultType) {
+ YQL_ENSURE(resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct
+ || resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Tuple);
+
+ auto* resultStructType = static_cast<NKikimr::NMiniKQL::TStructType*>(resultType);
+ ui32 resultColsCount = resultStructType->GetMembersCount();
+
+ taskMeta.ReadInfo.ResultColumnsTypes.reserve(resultColsCount);
+ for (ui32 i = 0; i < resultColsCount; ++i) {
+ auto memberType = resultStructType->GetMemberType(i);
+ if (memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Optional) {
+ memberType = static_cast<NKikimr::NMiniKQL::TOptionalType*>(memberType)->GetItemType();
+ }
+ YQL_ENSURE(memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data,
+ "Expected simple data types to be read from column shard");
+ auto memberDataType = static_cast<NKikimr::NMiniKQL::TDataType*>(memberType);
+ taskMeta.ReadInfo.ResultColumnsTypes.push_back(memberDataType->GetSchemeType());
+ }
+ }
+
if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) {
return;
}
@@ -476,6 +498,7 @@ private:
TString itemsLimitParamName;
NDqProto::TData itemsLimitBytes;
NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
+ NKikimr::NMiniKQL::TType* resultType = nullptr;
// TODO: Support reverse, skipnull and limit for kReadRanges
if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadRange) {
@@ -492,6 +515,9 @@ private:
reverse = op.GetReadOlapRange().GetReverse();
ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv,
itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
+ NKikimrMiniKQL::TType minikqlProtoResultType;
+ ConvertYdbTypeToMiniKQLType(op.GetReadOlapRange().GetResultType(), minikqlProtoResultType);
+ resultType = ImportTypeFromProto(minikqlProtoResultType, typeEnv);
}
for (auto& [shardId, shardInfo] : partitions) {
@@ -521,9 +547,9 @@ private:
if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
const auto& readRange = op.GetReadOlapRange();
- FillReadInfo(task.Meta, itemsLimit, reverse, sorted, readRange);
+ FillReadInfo(task.Meta, itemsLimit, reverse, sorted, resultType, readRange);
} else {
- FillReadInfo(task.Meta, itemsLimit, reverse, sorted, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>());
+ FillReadInfo(task.Meta, itemsLimit, reverse, sorted, nullptr, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>());
}
if (!task.Meta.Reads) {
@@ -738,18 +764,17 @@ private:
YQL_ENSURE(task.Meta.Reads);
YQL_ENSURE(!task.Meta.Writes);
- for (auto& column : task.Meta.Reads->front().Columns) {
- auto* protoColumn = protoTaskMeta.AddColumns();
- protoColumn->SetId(column.Id);
- protoColumn->SetType(column.Type);
- protoColumn->SetName(column.Name);
- }
-
if (!task.Meta.Reads->empty()) {
protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse);
protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit);
protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted);
+ for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) {
+ auto* protoResultColumn = protoTaskMeta.AddResultColumns();
+ protoResultColumn->SetId(0);
+ protoResultColumn->SetType(columnType);
+ }
+
if (tableInfo.TableKind == ETableKind::Olap) {
auto* olapProgram = protoTaskMeta.MutableOlapProgram();
olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program);
@@ -762,6 +787,13 @@ private:
} else {
YQL_ENSURE(task.Meta.ReadInfo.OlapProgram.Program.empty());
}
+
+ for (auto& column : task.Meta.Reads->front().Columns) {
+ auto* protoColumn = protoTaskMeta.AddColumns();
+ protoColumn->SetId(column.Id);
+ protoColumn->SetType(column.Type);
+ protoColumn->SetName(column.Name);
+ }
}
for (auto& read : *task.Meta.Reads) {
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h
index 0651b7bde1a..51da52a204d 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.h
@@ -130,6 +130,7 @@ struct TTaskMeta {
bool Reverse = false;
bool Sorted = false;
TKqpOlapProgram OlapProgram;
+ TVector<NUdf::TDataTypeId> ResultColumnsTypes;
};
struct TWriteInfo {
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index c4d8e241966..d4e74b69c0a 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -249,12 +249,14 @@ std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& val
}
TKqpScanComputeContext::TScanData::TScanData(const TTableId& tableId, const TTableRange& range,
- const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys)
+ const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys,
+ const TSmallVec<TColumn>& resultColumns)
: TableId(tableId)
, Range(range)
, SkipNullKeys(skipNullKeys)
, Columns(columns)
, SystemColumns(systemColumns)
+ , ResultColumns(resultColumns)
{}
TKqpScanComputeContext::TScanData::TScanData(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
@@ -280,6 +282,23 @@ TKqpScanComputeContext::TScanData::TScanData(const NKikimrTxDataShard::TKqpTrans
}
}
+ if (meta.GetResultColumns().empty()) {
+ ResultColumns = Columns;
+ } else {
+ ResultColumns.reserve(meta.GetResultColumns().size());
+ for (const auto& resColumn : meta.GetResultColumns()) {
+ NMiniKQL::TKqpScanComputeContext::TColumn c;
+ c.Tag = resColumn.GetId();
+ c.Type = resColumn.GetType();
+
+ if (!IsSystemColumn(c.Tag)) {
+ ResultColumns.emplace_back(std::move(c));
+ } else {
+ SystemColumns.emplace_back(std::move(c));
+ }
+ }
+ }
+
if (statsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
}
@@ -307,13 +326,13 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba
// Convert row into an UnboxedValue
NUdf::TUnboxedValue* rowItems = nullptr;
- rows.emplace_back(holderFactory.CreateDirectArrayHolder(Columns.size() + SystemColumns.size(), rowItems));
- for (ui32 i = 0; i < Columns.size(); ++i) {
- rowItems[i] = GetCellValue(row[i], Columns[i].Type);
+ rows.emplace_back(holderFactory.CreateDirectArrayHolder(ResultColumns.size() + SystemColumns.size(), rowItems));
+ for (ui32 i = 0; i < ResultColumns.size(); ++i) {
+ rowItems[i] = GetCellValue(row[i], ResultColumns[i].Type);
}
- FillSystemColumns(&rowItems[Columns.size()], shardId, SystemColumns);
+ FillSystemColumns(&rowItems[ResultColumns.size()], shardId, SystemColumns);
- stats.AddStatistics(GetRowSize(rowItems, Columns, SystemColumns));
+ stats.AddStatistics(GetRowSize(rowItems, ResultColumns, SystemColumns));
}
RowBatches.emplace(RowBatch{std::move(rows), shardId});
@@ -337,7 +356,7 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch,
TBytesStatistics stats;
TUnboxedValueVector rows;
- if (Columns.empty() && SystemColumns.empty()) {
+ if (ResultColumns.empty() && SystemColumns.empty()) {
rows.resize(batch.num_rows(), holderFactory.GetEmptyContainer());
} else {
TVector<NUdf::TUnboxedValue*> editAccessors(batch.num_rows());
@@ -345,27 +364,27 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch,
for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
rows.emplace_back(holderFactory.CreateDirectArrayHolder(
- Columns.size() + SystemColumns.size(),
+ ResultColumns.size() + SystemColumns.size(),
editAccessors[rowIndex])
);
}
- for (size_t columnIndex = 0; columnIndex < Columns.size(); ++columnIndex) {
+ for (size_t columnIndex = 0; columnIndex < ResultColumns.size(); ++columnIndex) {
stats.AddStatistics(
- WriteColumnValuesFromArrow(editAccessors, batch, columnIndex, Columns[columnIndex].Type)
+ WriteColumnValuesFromArrow(editAccessors, batch, columnIndex, ResultColumns[columnIndex].Type)
);
}
if (!SystemColumns.empty()) {
for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
- FillSystemColumns(&editAccessors[rowIndex][Columns.size()], shardId, SystemColumns);
+ FillSystemColumns(&editAccessors[rowIndex][ResultColumns.size()], shardId, SystemColumns);
}
stats.AllocatedBytes += batch.num_rows() * SystemColumns.size() * sizeof(NUdf::TUnboxedValue);
}
}
- if (Columns.empty()) {
+ if (ResultColumns.empty()) {
stats.AddStatistics({sizeof(ui64) * batch.num_rows(), sizeof(ui64) * batch.num_rows()});
}
@@ -384,7 +403,7 @@ NUdf::TUnboxedValue TKqpScanComputeContext::TScanData::TakeRow() {
YQL_ENSURE(!RowBatches.empty());
auto& batch = RowBatches.front();
auto row = std::move(batch.Batch[batch.CurrentRow++]);
- auto rowStats = GetRowSize(row.GetElements(), Columns, SystemColumns);
+ auto rowStats = GetRowSize(row.GetElements(), ResultColumns, SystemColumns);
StoredBytes -= rowStats.AllocatedBytes;
if (batch.CurrentRow == batch.Batch.size()) {
@@ -394,23 +413,6 @@ NUdf::TUnboxedValue TKqpScanComputeContext::TScanData::TakeRow() {
return row;
}
-void TKqpScanComputeContext::AddTableScan(ui32, const TTableId& tableId, const TTableRange& range,
- const TSmallVec<TColumn>& columns, const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys)
-{
- auto scanData = TKqpScanComputeContext::TScanData(tableId, range, columns, systemColumns, skipNullKeys);
-
- if (Y_UNLIKELY(StatsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC)) {
- scanData.BasicStats = std::make_unique<TScanData::TBasicStats>();
- }
-
- if (Y_UNLIKELY(StatsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE)) {
- scanData.ProfileStats = std::make_unique<TScanData::TProfileStats>();
- }
-
- auto result = Scans.emplace(0, std::move(scanData));
- Y_ENSURE(result.second);
-}
-
void TKqpScanComputeContext::AddTableScan(ui32, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
NYql::NDqProto::EDqStatsMode statsMode)
{
@@ -470,7 +472,7 @@ public:
}
auto row = ScanData.TakeRow();
- for (ui32 i = 0; i < ScanData.GetColumns().size() + ScanData.GetSystemColumns().size(); ++i) {
+ for (ui32 i = 0; i < ScanData.GetResultColumns().size() + ScanData.GetSystemColumns().size(); ++i) {
if (result[i]) {
*result[i] = std::move(row.GetElement(i));
}
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h
index 6bfc6babc5d..0b052f7a151 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.h
+++ b/ydb/core/kqp/runtime/kqp_scan_data.h
@@ -38,7 +38,8 @@ public:
public:
TScanData(TScanData&&) = default; // needed to create TMap<ui32, TScanData> Scans
TScanData(const TTableId& tableId, const TTableRange& range, const TSmallVec<TColumn>& columns,
- const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys);
+ const TSmallVec<TColumn>& systemColumns, const TSmallVec<bool>& skipNullKeys,
+ const TSmallVec<TColumn>& resultColumns);
TScanData(const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, NYql::NDqProto::EDqStatsMode statsMode);
@@ -56,6 +57,10 @@ public:
return SystemColumns;
}
+ const TSmallVec<TColumn>& GetResultColumns() const {
+ return ResultColumns;
+ }
+
ui64 AddRows(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
ui64 AddRows(const arrow::RecordBatch& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
@@ -120,6 +125,7 @@ public:
TSmallVec<TColumn> Columns;
TSmallVec<TColumn> SystemColumns;
+ TSmallVec<TColumn> ResultColumns;
TQueue<RowBatch> RowBatches;
ui64 StoredBytes = 0;
bool Finished = false;
diff --git a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
index daf32d84f3a..43b7c44bf26 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
@@ -85,54 +85,78 @@ struct TDataRow {
}
};
-std::shared_ptr<arrow::RecordBatch> VectorToBatch(const std::vector<struct TDataRow>& rows) {
+std::shared_ptr<arrow::RecordBatch> VectorToBatch(const std::vector<struct TDataRow>& rows, std::shared_ptr<arrow::Schema>&& resultSchema) {
TString err;
std::unique_ptr<arrow::RecordBatchBuilder> batchBuilder = nullptr;
std::shared_ptr<arrow::RecordBatch> batch;
- auto result = arrow::RecordBatchBuilder::Make(rows.front().MakeArrowSchema(), arrow::default_memory_pool(), &batchBuilder);
+ auto result = arrow::RecordBatchBuilder::Make(resultSchema, arrow::default_memory_pool(), &batchBuilder);
UNIT_ASSERT(result.ok());
for (const TDataRow& row : rows) {
- auto result0 = batchBuilder->GetFieldAs<arrow::BooleanBuilder >(0 )->Append(row.Bool );
- UNIT_ASSERT(result.ok());
- auto result1 = batchBuilder->GetFieldAs<arrow::Int8Builder >(1 )->Append(row.Int8 );
- UNIT_ASSERT(result.ok());
- auto result2 = batchBuilder->GetFieldAs<arrow::Int16Builder >(2 )->Append(row.Int16 );
- UNIT_ASSERT(result.ok());
- auto result3 = batchBuilder->GetFieldAs<arrow::Int32Builder >(3 )->Append(row.Int32 );
- UNIT_ASSERT(result.ok());
- auto result4 = batchBuilder->GetFieldAs<arrow::Int64Builder >(4 )->Append(row.Int64 );
- UNIT_ASSERT(result.ok());
- auto result5 = batchBuilder->GetFieldAs<arrow::UInt8Builder >(5 )->Append(row.UInt8 );
- UNIT_ASSERT(result.ok());
- auto result6 = batchBuilder->GetFieldAs<arrow::UInt16Builder >(6 )->Append(row.UInt16 );
- UNIT_ASSERT(result.ok());
- auto result7 = batchBuilder->GetFieldAs<arrow::UInt32Builder >(7 )->Append(row.UInt32 );
- UNIT_ASSERT(result.ok());
- auto result8 = batchBuilder->GetFieldAs<arrow::UInt64Builder >(8 )->Append(row.UInt64 );
- UNIT_ASSERT(result.ok());
- auto result9 = batchBuilder->GetFieldAs<arrow::FloatBuilder >(9 )->Append(row.Float32);
- UNIT_ASSERT(result.ok());
- auto result10 = batchBuilder->GetFieldAs<arrow::DoubleBuilder >(10)->Append(row.Float64);
- UNIT_ASSERT(result.ok());
- auto result11 = batchBuilder->GetFieldAs<arrow::StringBuilder >(11)->Append(row.String.data(), row.String.size());
- UNIT_ASSERT(result.ok());
- auto result12 = batchBuilder->GetFieldAs<arrow::StringBuilder >(12)->Append(row.Utf8.data(), row.Utf8.size());
- UNIT_ASSERT(result.ok());
- auto result13 = batchBuilder->GetFieldAs<arrow::BinaryBuilder >(13)->Append(row.Json.data(), row.Json.size());
- UNIT_ASSERT(result.ok());
- auto result14 = batchBuilder->GetFieldAs<arrow::BinaryBuilder >(14)->Append(row.Yson.data(), row.Yson.size());
- UNIT_ASSERT(result.ok());
- auto result15 = batchBuilder->GetFieldAs<arrow::Date32Builder >(15)->Append(row.Date);
- UNIT_ASSERT(result.ok());
- auto result16 = batchBuilder->GetFieldAs<arrow::TimestampBuilder >(16)->Append(row.Datetime);
- UNIT_ASSERT(result.ok());
- auto result17 = batchBuilder->GetFieldAs<arrow::TimestampBuilder >(17)->Append(row.Timestamp);
- UNIT_ASSERT(result.ok());
- auto result18 = batchBuilder->GetFieldAs<arrow::DurationBuilder >(18)->Append(row.Interval);
- UNIT_ASSERT(result.ok());
- auto result19 = batchBuilder->GetFieldAs<arrow::Decimal128Builder>(19)->Append(reinterpret_cast<const char*>(&row.Decimal));
- UNIT_ASSERT(result.ok());
+ int colIndex = 0;
+ for (auto colName : resultSchema->field_names()) {
+ if (colName == "bool") {
+ auto result = batchBuilder->GetFieldAs<arrow::BooleanBuilder>(colIndex++)->Append(row.Bool);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "i8") {
+ auto result = batchBuilder->GetFieldAs<arrow::Int8Builder>(colIndex++)->Append(row.Int8);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "i16") {
+ auto result = batchBuilder->GetFieldAs<arrow::Int16Builder>(colIndex++)->Append(row.Int16);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "i32") {
+ auto result = batchBuilder->GetFieldAs<arrow::Int32Builder>(colIndex++)->Append(row.Int32);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "i64") {
+ auto result = batchBuilder->GetFieldAs<arrow::Int64Builder>(colIndex++)->Append(row.Int64);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "ui8") {
+ auto result = batchBuilder->GetFieldAs<arrow::UInt8Builder>(colIndex++)->Append(row.UInt8);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "ui16") {
+ auto result = batchBuilder->GetFieldAs<arrow::UInt16Builder>(colIndex++)->Append(row.UInt16);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "ui32") {
+ auto result = batchBuilder->GetFieldAs<arrow::UInt32Builder>(colIndex++)->Append(row.UInt32);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "ui64") {
+ auto result = batchBuilder->GetFieldAs<arrow::UInt64Builder>(colIndex++)->Append(row.UInt64);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "f32") {
+ auto result = batchBuilder->GetFieldAs<arrow::FloatBuilder>(colIndex++)->Append(row.Float32);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "f64") {
+ auto result = batchBuilder->GetFieldAs<arrow::DoubleBuilder>(colIndex++)->Append(row.Float64);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "string") {
+ auto result = batchBuilder->GetFieldAs<arrow::StringBuilder>(colIndex++)->Append(row.String.data(), row.String.size());
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "utf8") {
+ auto result = batchBuilder->GetFieldAs<arrow::StringBuilder>(colIndex++)->Append(row.Utf8.data(), row.Utf8.size());
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "json") {
+ auto result = batchBuilder->GetFieldAs<arrow::BinaryBuilder>(colIndex++)->Append(row.Json.data(), row.Json.size());
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "yson") {
+ auto result = batchBuilder->GetFieldAs<arrow::BinaryBuilder>(colIndex++)->Append(row.Yson.data(), row.Yson.size());
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "date") {
+ auto result = batchBuilder->GetFieldAs<arrow::Date32Builder>(colIndex++)->Append(row.Date);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "datetime") {
+ auto result = batchBuilder->GetFieldAs<arrow::TimestampBuilder>(colIndex++)->Append(row.Datetime);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "ts") {
+ auto result = batchBuilder->GetFieldAs<arrow::TimestampBuilder>(colIndex++)->Append(row.Timestamp);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "ival") {
+ auto result = batchBuilder->GetFieldAs<arrow::DurationBuilder>(colIndex++)->Append(row.Interval);
+ UNIT_ASSERT(result.ok());
+ } else if (colName == "dec") {
+ auto result = batchBuilder->GetFieldAs<arrow::Decimal128Builder>(colIndex++)->Append(reinterpret_cast<const char*>(&row.Decimal));
+ UNIT_ASSERT(result.ok());
+ }
+ }
}
auto resultFlush = batchBuilder->Flush(&batch);
@@ -217,12 +241,12 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
Y_UNIT_TEST(ArrowToUnboxedValueConverter) {
TVector<TDataRow> rows = TestRows();
- std::shared_ptr<arrow::RecordBatch> batch = VectorToBatch(rows);
+ std::shared_ptr<arrow::RecordBatch> batch = VectorToBatch(rows, rows.front().MakeArrowSchema());
NKikimr::NMiniKQL::TScopedAlloc alloc;
TMemoryUsageInfo memInfo("");
THolderFactory factory(alloc.Ref(), memInfo);
- TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, {});
+ TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, {}, rows.front().Columns());
scanData.AddRows(*batch, {}, factory);
@@ -259,12 +283,39 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
scanData.Clear();
}
+ Y_UNIT_TEST(DifferentNumberOfInputAndResultColumns) {
+ TVector<TDataRow> rows = TestRows();
+ std::vector<std::shared_ptr<arrow::Field>> fields = { arrow::field("i8", arrow::int8()) };
+ std::shared_ptr<arrow::RecordBatch> batch = VectorToBatch(rows, std::make_shared<arrow::Schema>(fields));
+ NKikimr::NMiniKQL::TScopedAlloc alloc;
+ TMemoryUsageInfo memInfo("");
+ THolderFactory factory(alloc.Ref(), memInfo);
+
+ TSmallVec<TKqpComputeContextBase::TColumn> resultCols;
+ auto resCol = TKqpComputeContextBase::TColumn {
+ .Type = NTypeIds::Int8
+ };
+ resultCols.push_back(resCol);
+ TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), rows.front().Columns(), {}, {}, resultCols);
+
+ scanData.AddRows(*batch, {}, factory);
+
+ for (auto& row: rows) {
+ auto result_row = scanData.TakeRow();
+ UNIT_ASSERT_EQUAL(result_row.GetElement(0).Get<i8>(), row.Int8);
+ }
+
+ UNIT_ASSERT(scanData.IsEmpty());
+
+ scanData.Clear();
+ }
+
Y_UNIT_TEST(EmptyColumns) {
NKikimr::NMiniKQL::TScopedAlloc alloc;
TMemoryUsageInfo memInfo("");
THolderFactory factory(alloc.Ref(), memInfo);
- TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {});
+ TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {}, {});
TVector<TOwnedCellVec> emptyBatch(1000);
auto bytes = scanData.AddRows(emptyBatch, {}, factory);
UNIT_ASSERT(bytes > 0);
@@ -279,13 +330,13 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
}
Y_UNIT_TEST(EmptyColumnsAndNonEmptyArrowBatch) {
-NKikimr::NMiniKQL::TScopedAlloc alloc;
+ NKikimr::NMiniKQL::TScopedAlloc alloc;
TMemoryUsageInfo memInfo("");
THolderFactory factory(alloc.Ref(), memInfo);
- TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {});
+ TKqpScanComputeContext::TScanData scanData({}, TTableRange({}), {}, {}, {}, {});
TVector<TDataRow> rows = TestRows();
- std::shared_ptr<arrow::RecordBatch> anotherEmptyBatch = VectorToBatch(rows);
+ std::shared_ptr<arrow::RecordBatch> anotherEmptyBatch = VectorToBatch(rows, rows.front().MakeArrowSchema());
auto bytes = scanData.AddRows(*anotherEmptyBatch, {}, factory);
UNIT_ASSERT(bytes > 0);
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 0a9dccdd87a..b78bc3af912 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -1328,6 +1328,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(AggregationSumPushdown) {
+ // Delete return once SUM pushdown will be implemented
+ return;
+
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
@@ -1532,9 +1535,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) {
- // _LIBCPP_ASSERT '__n < size()' failed https://paste.yandex-team.ru/11497970
- return;
-
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 2bf8943a35d..f97d2240ff1 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -6,6 +6,7 @@ option java_package = "ru.yandex.kikimr.proto";
import "ydb/library/mkql_proto/protos/minikql.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
+import "ydb/public/api/protos/ydb_value.proto";
message TKqpPhyExternalBinding {
}
@@ -146,6 +147,9 @@ message TKqpPhyOpReadOlapRanges {
// Sorted sign which indicates that read operation should return a sorted data out of the
// dedicated actors or we can relax constraints here and return unsorted data.
bool Sorted = 6;
+
+ // Stores type of read result from Column Shard
+ Ydb.Type ResultType = 7;
}
message TKqpPhyOpReadRanges {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index eb57564c6ee..e5ae97e3775 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -221,6 +221,7 @@ message TKqpTransaction {
optional EScanDataFormat DataFormat = 9;
optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables
optional bool Sorted = 11;
+ repeated TColumnMeta ResultColumns = 12;
}
optional EKqpTransactionType Type = 1;
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index dcf041b7d06..e0f81839050 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -72,7 +72,13 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescriptio
return {};
}
- if (!out.BlobSchema || !out.ResultSchema) {
+ if (!out.BlobSchema) {
+ error = "Could not get BlobSchema.";
+ return {};
+ }
+
+ if (!out.ResultSchema) {
+ error = "Could not get ResultSchema.";
return {};
}
diff --git a/ydb/library/mkql_proto/mkql_proto.cpp b/ydb/library/mkql_proto/mkql_proto.cpp
index 053892c0281..c09b539a746 100644
--- a/ydb/library/mkql_proto/mkql_proto.cpp
+++ b/ydb/library/mkql_proto/mkql_proto.cpp
@@ -288,8 +288,9 @@ void ExportTypeToProtoImpl(TType* type, Ydb::Type& res) {
case TType::EKind::Struct: {
auto structType = static_cast<TStructType*>(type);
+ auto resStruct = res.mutable_struct_type();
for (ui32 index = 0; index < structType->GetMembersCount(); ++index) {
- auto newMember = res.mutable_struct_type()->add_members();
+ auto newMember = resStruct->add_members();
newMember->set_name(TString(structType->GetMemberName(index)));
ExportTypeToProtoImpl(structType->GetMemberType(index), *newMember->mutable_type());
}
diff --git a/ydb/library/mkql_proto/mkql_proto_ut.cpp b/ydb/library/mkql_proto/mkql_proto_ut.cpp
index 2a589ba0b30..5cabfe8bcf2 100644
--- a/ydb/library/mkql_proto/mkql_proto_ut.cpp
+++ b/ydb/library/mkql_proto/mkql_proto_ut.cpp
@@ -214,6 +214,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) {
"}\n");
}
+ Y_UNIT_TEST(TestExportEmptyStructType) {
+ TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) {
+ std::vector<std::pair<std::string_view, TRuntimeNode>> items;
+ auto pgmReturn = pgmBuilder.NewStruct(items);
+ return pgmReturn;
+ },
+ "Kind: Struct\n");
+ }
+
Y_UNIT_TEST(TestExportDictType) {
TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) {
auto dictType = pgmBuilder.NewDictType(pgmBuilder.NewDataType(NUdf::TDataType<i32>::Id),